专业网站改版/百度关键词排名批量查询
在Flink计算中,常见的一些操作是map或者flatmap一些数据之后keyby 开窗口进行计算。那么在这些计算当中有哪些算子呢?
其中我分为两类算子。
增量聚合 有reduce 和aggregate算子,全量聚合 有apply和process。那么今天我们就主要讲解一下常用的增量聚合算子aggregate算子。
aggregate方法签名的三个类型 <数据源类型,累加器类型,输出类型>
WindowFunction 方法签名的四个类型为 <IN, OUT, KEY, W extends Window>
第一步:将dataStream转换城windowedStream
// 从kafka读取数据val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties)).map(data => {val dataArray = data.split(",")UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)}).assignAscendingTimestamps(_.timestamp * 1000L)// 对数据进行窗口聚合处理val aggStream: DataStream[ItemViewCount] = inputStream.filter(_.behavior == "pv") // 过滤出pv数据.keyBy(_.itemId).timeWindow(Time.hours(1), Time.minutes(5)) // 开窗进行统计.aggregate(new CountAgg(), new WindowCountResult()) // 聚合出当前商品在时间窗口内的统计数量
第二步:自定义聚合函数
// 自定义的预聚合函数,来一条数据就加一
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {//add方法为累加器累加的方法,这里为最简单的+1操作override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1//初始化累加值override def createAccumulator(): Long = 0L//最后返回那个值,这里为accumulatoroverride def getResult(accumulator: Long): Long = accumulator//分区处理的归并操作,这里将所有并处理的结果相加override def merge(a: Long, b: Long): Long = a + b
}
第三部:自定义窗口函数
// 自定义window function
class WindowCountResult() extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {//Long类型的Key为上一步的自定义累加器的返回值//Window为差给你扣类型,第一步中的没窗口类型,TimeWindow//input为接收的数据类型,此处为Long类型的迭代器//out为此方法返回的类型,此处为ItemViewCount样例类对象的集合override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {//调用ItemViewCount样例类对象的构造器,依次构造出ItemViewCount样例类并返回out.collect(ItemViewCount(key, window.getEnd, input.iterator.next()))}
}