想创建一个网站/网站排名seo教程
网站里面,特别是电商场景,一般情况都要统计哪些是热门商品,或者哪个url的访问最大。
前提是根据每个url对应的访问量统计出来,然后再去排序,做其他选择
我们可以将页面访问量按照窗口,基于窗口划分,每个url它们分别被点击访问了几次
这里我们为了方便处理,单独定义了一个 POJO 类 UrlViewCount 来表示聚合输出结果的数据类型,包含了 url、浏览量以及窗口的起始结束时间。
Gitee中
public class UrlViewCount {public String url; //urlpublic Long count; //数量public Long windowStart; //开始时间public Long windowEnd; //结束时间public UrlViewCount() {}public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {this.url = url;this.count = count;this.windowStart = windowStart;this.windowEnd = windowEnd;}@Overridepublic String toString() {return "UrlViewCount[" +"url='" + url + '\'' +", count=" + count +", windowStart=" + new Timestamp(windowStart) +", windowEnd=" + new Timestamp(windowEnd) +']';}
}
我们这里统计 5 秒钟的 url 浏览量,另外为了更加清晰地展示,还应该把窗口的起始结束时间一起输出。并结合增量聚合函数和全窗口函数来得到统计结果。
代码如下:需求实现
public class UrlCountViewExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据,并提取时间戳、生成水位线DataStream<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream.print("data");//统计每个url的访问量stream.keyBy(data -> data.url).window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(new UrlViewCountAgg(), new UrlViewCountResult()).print();env.execute();}//增量聚合,来一条数据就 加 1public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}}//包装窗口信息,输出UrlViewCountpublic static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {@Overridepublic void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {//获取开始时间和结束时间Long start = context.window().getStart();Long end = context.window().getEnd();//UVLong uv = elements.iterator().next();//输出out.collect(new UrlViewCount(url, uv, start, end));}}
}
代码中用一个 AggregateFunction 来实现增量聚合,每来一个数据就计数加一;得到的结果交给 ProcessWindowFunction,结合窗口信息包装成我们想要的 UrlViewCount,最终输出统计结果。
窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。