婚庆网站模板/打开网站搜索
目录
0. 相关文章链接
1. 使用Java求日活的WindowFunction使用
2. 使用Scala演示WindowFunction的使用
0. 相关文章链接
Flink文章汇总
1. 使用Java求日活的WindowFunction使用
// 设置时间语议,并过滤其中的首页曝光数据
DataStream<AppLogBean> homeExposureStream = appExposureStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<AppLogBean>(Time.seconds(0)) {@Overridepublic long extractTimestamp(AppLogBean element) {return element.getTime() * 1000;}}).filter(new FilterFunction<AppLogBean>() {@Overridepublic boolean filter(AppLogBean value) throws Exception {return "home_exposure".equals(value.getTopic()) && StringUtils.isNotBlank(value.getScdata());}});// 获取出其中的用户id
SingleOutputStreamOperator<Tuple2<String, String>> userIdStream = homeExposureStream.map(new MapFunction<AppLogBean, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(AppLogBean appLogBean) throws Exception {String resultUserId = "1";JSONObject scdataJson = JSONObject.parseObject(appLogBean.getScdata());String user_id = scdataJson.getString("user_id");resultUserId = user_id;return Tuple2.of("dummy", resultUserId);}});// 对用户id开窗,并统计每天的数据
SingleOutputStreamOperator<String> result = userIdStream.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).trigger(ContinuousEventTimeTrigger.of(Time.seconds(1))).aggregate(new UniqueVisitorAggregateFunction(), new UniqueVisitorProcessWindowFunction());// 使用print打印数据
result.print("result>>>>>>>>>");}/**
* UV的窗口类
*/
public static class UniqueVisitorProcessWindowFunction extends ProcessWindowFunction<Long, String, String, TimeWindow> {private final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");@Override
public void process(String s, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {System.out.println("##### 当前的watermark为 #####" + df.format(context.currentWatermark()));System.out.println("##### 窗口开始时间 ###########" + df.format(context.window().getStart()));System.out.println("##### 窗口结束时间 ###########" + df.format(context.window().getEnd()));System.out.println("##### 该窗口当前统计的UV #####" + elements.iterator().next());out.collect("UV " + elements.iterator().next());
}
}/**
* UV的聚合类
*/
public static class UniqueVisitorAggregateFunction implements AggregateFunction<Tuple2<String, String>, Tuple2<Set<String>, Long>, Long> {@Override
public Tuple2<Set<String>, Long> createAccumulator() {return Tuple2.of(new HashSet<>(), 0L);
}@Override
public Tuple2<Set<String>, Long> add(Tuple2<String, String> value, Tuple2<Set<String>, Long> accumulator) {if (!accumulator.f0.contains(value.f1)) {accumulator.f0.add(value.f1);accumulator.f1 += 1;}return accumulator;
}@Override
public Long getResult(Tuple2<Set<String>, Long> accumulator) {return accumulator.f1;
}@Override
public Tuple2<Set<String>, Long> merge(Tuple2<Set<String>, Long> a, Tuple2<Set<String>, Long> b) {return null;
}
2. 使用Scala演示WindowFunction的使用
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)val sensorStream: WindowedStream[SensorReading, String, TimeWindow] = env.socketTextStream("localhost", 9999).map(new MyMapToSensorReading).keyBy(_.id).timeWindow(Time.seconds(5))// 1、incremental aggregation functions(增量聚合函数)(来一条数据,计算一次)
// 1.1、ReduceFunction 增量集合函数(使用匿名内部类)
val reduceResult: DataStream[SensorReading] = sensorStream.reduce(new ReduceFunction[SensorReading] {override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {SensorReading(value2.id, value2.timestamp, value2.temperature + value2.temperature)}
})
// 1.2、AggregateFunction(相比reduce,优势是可以指定累加值类型,输入类型和输出类型也可以不一样)
val aggregateResult: DataStream[Long] = sensorStream.aggregate(new AggregateFunction[SensorReading, Long, Long] {// 初始化累加值override def createAccumulator(): Long = 0L// 累加方法override def add(value: SensorReading, accumulator: Long): Long = accumulator + 1// 获取结果override def getResult(accumulator: Long): Long = accumulator// 分区的归并操作override def merge(a: Long, b: Long): Long = a + b
})// 2、full window functions(全窗口函数)
/*** 知识点:* 1、apply方法中,可以添加WindowFunction对象,会将该窗口中所有的数据先缓存,当时间到了一次性计算* 2、需要设置4个类型,分别是:输入类型,输出类型,keyBy时key的类型(如果用字符串来划分key类型为Tuple,窗口类型* 3、所有的计算都在apply中进行,可以通过window获取窗口的信息,比如开始时间,结束时间*/
val applyResult: DataStream[(Long, Int)] = sensorStream.apply(new WindowFunction[SensorReading, (Long, Int), String, TimeWindow] {override def apply(key: String, window: TimeWindow, input: Iterable[SensorReading], out: Collector[(Long, Int)]): Unit = {out.collect((window.getStart, input.size))}
})// 3、窗口函数中其他API
val otherResult: DataStream[SensorReading] = sensorStream.allowedLateness(Time.seconds(1)) // 允许处理迟到的数据.sideOutputLateData(new OutputTag[SensorReading]("late")) // 将迟到的数据放入侧输出流.reduce((x, y) => SensorReading(y.id, y.timestamp, x.temperature + y.temperature))
// 获取侧输出流(侧输出流为迟到很久的数据,当allowedLateness和watermark之后还是没到的数据会放入侧输出流,可以在最后统一处理)
val sideOutputStream: DataStream[SensorReading] = otherResult.getSideOutput(new OutputTag[SensorReading]("late"))// 打印输出
applyResult.print()env.execute("WindowFunctionDemo")
注:其他相关文章链接由此进 -> Flink文章汇总