当前位置: 首页 > news >正文

网站建设 响应式 北京/建立网站流程

网站建设 响应式 北京,建立网站流程,萧云建设网站,小电影网站怎么做的文章目录1)窗口函数(WindowFunction)2)处理窗口函数(ProcessWindowFunction)窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓…

文章目录

  • 1)窗口函数(WindowFunction)
  • 2)处理窗口函数(ProcessWindowFunction)

窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

很明显,这就是典型的批处理思路了— —先攒数据,等一批都到齐了再正式启动处理流程。这样做毫无疑问是低效的:因为窗口全部的计算任务都积压在了要输出结果的那一瞬间,而在之前收集数据的漫长过程中却无所事事。这就好比平时不用功,到考试之前通宵抱佛脚,肯定不如把工夫花在日常积累上。
那为什么还需要有全窗口函数呢?这是因为有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。所以,我们还需要有更丰富的窗口计算方式,这就可以用全窗口函数来实现。
在 Flink 中,全窗口函数也有两种:WindowFunction 和ProcessWindowFunction。

1)窗口函数(WindowFunction)

WindowFunction 字面上就是“窗口函数”,它其实是老版本的通用窗口函数接口。我们可以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。

stream.keyBy(<key selector>).window(<window assigner>).apply(new MyWindowFunction());

这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。WindowFunction 接口在源码中实现如下:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

当窗口到达结束时间需要触发计算时,就会调用这里的 apply 方法。我们可以从 input 集合中取出窗口收集的数据,结合 key 和 window 信息,通过收集器(Collector)输出结果。这里 Collector 的用法,与 FlatMapFunction 中相同。
不过我们也看到了,WindowFunction 能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被 ProcessWindowFunction 全覆盖,所以之后可能会逐渐弃用。一般在实际应用,直接使用 ProcessWindowFunction 就可以了。

2)处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员,当 然 , 这 些 好 处 是 以 牺 牲 性 能 和 资 源 为 代 价 的 。 作 为 一 个 全 窗 口 函 数 ,ProcessWindowFunction 同样需要将所有数据缓存下来、等到窗口触发计算时才使用。它其实就是一个增强版的WindowFunction。

需求:统计每5s中UV的次数

代码:需求实现

public class WindowProcessTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据,并提取时间戳、生成水位线DataStream<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream.print("data");// 统计每5秒的UV次数stream.keyBy(data -> true).window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new UvCountByWindow()).print();env.execute();}//实现自定义ProcessWindowFunction,输出一条统计信息public static class UvCountByWindow extends ProcessWindowFunction<Event,String,Boolean, TimeWindow>{@Overridepublic void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {//用HashSet保存userHashSet<String> userSet = new HashSet<>();//遍历数据,去重for (Event event:elements) {userSet.add(event.user);}//获取UV信息Integer uv = userSet.size();//获取开始时间和结束时间Long start = context.window().getStart();Long end = context.window().getEnd();//打印out.collect("窗口 "+new Timestamp(start) + " ~ " + new Timestamp(end) + " UV值为:" + uv);}}
}

这里我们使用的是事件时间语义。定义 5 秒钟的滚动事件窗口后,直接使用ProcessWindowFunction 来定义处理的逻辑。我们可以创建一个 HashSet,将窗口所有数据的user 写入实现去重,最终得到 HashSet 的元素个数就是 UV 值。
当 然 , 这 里 我 们 并 没 有 用 到 上 下 文 中 其 他 信 息 , 所 以 其 实 没 有 必 要 使 用ProcessWindowFunction。全窗口函数因为运行效率较低,很少直接单独使用,往往会和增量聚合函数结合在一起,共同实现窗口的处理计算。

全窗口函数和增量聚合函数各有优缺点,增量聚合函数速度更快,更高效,延迟小,但是它包装不了想要的窗口信息,全窗口函数能拿到对应的窗口信息,但是,它是把所有数据都攒起来做了一个批处理,这个效率太低,延迟太高。

一般增量聚合函数调用getResult 方法之后,不是直接输出,而是传递给后面的全窗口函数中的process方法中的 element 传递,当前的 element 其实就是增量聚合的结果,这样就给两者的优势放在一起,变成了一个通用强大的用法

举例:结合增量聚合函数与全窗口函数一起统计每5秒UV出现的次数

代码如下:需求实现

public class UvCountExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据,并提取时间戳、生成水位线DataStream<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));//使用AggregateFunction 和 ProcessWindowFunction结合计算UVstream.keyBy(data -> true).window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(new UvAgg(), new UvCountResult()).print();stream.print("data");env.execute();}//自定义实现 AggregateFunction 增量聚合计算UV值public static class UvAgg implements AggregateFunction<Event, HashSet<String>, Long> {@Overridepublic HashSet<String> createAccumulator() {return new HashSet<>();}@Overridepublic HashSet<String> add(Event value, HashSet<String> accumulator) {accumulator.add(value.user);return accumulator;}@Overridepublic Long getResult(HashSet<String> accumulator) {return (long) accumulator.size();}@Overridepublic HashSet<String> merge(HashSet<String> a, HashSet<String> b) {return null;}}//自定义实现 ProcessWindowFunction 包装窗口信息输出public static class UvCountResult extends ProcessWindowFunction<Long, String, Boolean, TimeWindow> {@Overridepublic void process(Boolean aBoolean, ProcessWindowFunction<Long, String, Boolean, TimeWindow>.Context context, Iterable<Long> elements, Collector<String> out) throws Exception {//获取UV信息Long uv = elements.iterator().next();//获取开始时间和结束时间Long start = context.window().getStart();Long end = context.window().getEnd();//打印out.collect("窗口 " + new Timestamp(start) + " ~ " + new Timestamp(end) + " UV值为:" + uv);}}
}
http://www.lbrq.cn/news/745453.html

相关文章:

  • 网页游戏大全官网/济南seo排行榜
  • 整站wordpress下载/百度app关键词优化
  • 王健林亏60亿做不成一个网站/用asp做的网站
  • qq客服代码放在网站哪里/建什么网站可以长期盈利
  • 什么网站出项目找人做/微信怎么推广
  • 广州网站建设公司排行/seo流量排行榜神器
  • 盐城网页制作哪家好/快速排名优化推广手机
  • 网站推广策划包含的内容/无锡seo关键词排名
  • 建网站怎么做报分系统/长安seo排名优化培训
  • 谷歌seo技巧/郑州seo外包顾问
  • 贾汪区人民政府门户网站建设/淘宝关键词指数
  • 网页模板下载网站10/线下引流的八种推广方式
  • 扬州外贸网站建设/app平台搭建
  • 关于设计的网站/西安百度推广排名
  • 常熟网络推广/seo是对网站进行什么优化
  • 湖北专业网站建设市面价/网络营销工具与方法
  • 建设通网站查询单位/百度认证营销推广师
  • wordpress插件直播/搜索引擎关键词优化有哪些技巧
  • 青岛微网站开发/有没有专门帮人推广的公司
  • 北京城乡建设官方网站/公司网站设计制作
  • 珠海做网站哪家专业/百度云搜索
  • 培训网络营销的机构/北京seo优化wyhseo
  • 响应式网站设计案例/b站视频推广网站400
  • 网站的在线客服系统/网站建设平台哪家好
  • 中央疫情二十条措施最新/爱站网seo查询
  • 青岛市建设厅网站/简短的软文范例
  • wordpress博客类似/seo推广有哪些
  • 自己做行程的网站/宁波seo排名外包公司
  • 奇米网怎么做网站/被国家禁止访问的网站怎么打开
  • 做网站备案要多久/长沙今日头条新闻
  • MCP(模型上下文协议):是否是 AI 基础设施中缺失的标准?
  • 比赛准备之环境配置
  • JVM垃圾回收(GC)深度解析:原理、调优与问题排查
  • 大语言模型中的归一化实现解析
  • 大数据计算引擎(四)—— Impala
  • AI 对话高效输入指令攻略(五):AI+PicDoc文生图表工具:解锁高效图表创作新范式