当前位置: 首页 > news >正文

asp.net网站支持多国语言/进入百度官网首页

asp.net网站支持多国语言,进入百度官网首页,做门窗安装用哪些网站找生意,wordpress文件类型不支持大数据大数据技术文章「Flink」事件时间与水印我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与Flink流处理系统时间特性的关系。获取窗口开始时间Flink源代码获取窗口的开始时间为以下代码:org.apache.flink.streaming.api.windowing.windows.…

大数据

大数据技术文章

「Flink」事件时间与水印

我们先来以滚动时间窗口为例,来看一下窗口的几个时间参数与Flink流处理系统时间特性的关系。

获取窗口开始时间Flink源代码

获取窗口的开始时间为以下代码:

org.apache.flink.streaming.api.windowing.windows.TimeWindow

/**

* Method to get the window start for a timestamp.

*

* @param timestamp epoch millisecond to get the window start.

* @param offset The offset which window start would be shifted by.

* @param windowSize The size of the generated windows.

* @return window start

*/public static long getWindowStartWithOffset(long timestamp, long offset, longwindowSize) {return timestamp - (timestamp - offset + windowSize) %windowSize;

}

这一段代码,我们可以认为Flink并不是把时间戳直接作为窗口的开始时间,而是做了一些“对齐”操作,确保时间能够整除8。

不同时间类型的窗口时间计算

1、当TimeCharacteristic为ProcessingTime时

窗口的开始时间:与窗口接收到的第一条消息的处理时间有关。例如:window operator是2020-02-06 22:02:33接收到的第一条消息,那么窗口的开始时间就是2020-02-06 22:02:33。

窗口的结束时间:一旦窗口的开始时间确定了,因为窗口的长度是固定的。那么窗口的结束时间就确定下来了,例如:假设这里的时间窗口是3秒,那么窗口的结束时间就是2020-02-06 22:02:36。

窗口的触发计算时间:假设有一条新的消息到达window operator,此时如果对应operator的系统时间,大于结束时间,就会触发计算。

一旦窗口的开始时间确定了,那么后续窗口的开始时间,也就都确定下来了。问题:

假设某个时间窗口,2020-2-6 22:12:20 - 2020-2-6 22:12:23,之间没有任何一条数据进来。Flink会如何处理?

Flink会直接抛弃掉这个时间窗口,新来的事件消息会到其他的时间窗口中计算。

2、当TimeCharacteristic为IngestionTime时

窗口的开始时间:与source operator接收到的第一条消息有关。例如:source接收到这条消息的时间是2020-2-6 22:14:50,那么窗口的开始时间就是2020-2-6 22:14:50

窗口的结束时间:与ProcessTime一致

窗口的触发计算时间:假设有一条新的消息到达source operator,那么此时的时间如果大于结束时间,就会触发计算。

除了窗口的开始时间、触发时间都是与source operator算子有关,其他与Processing Time是类似的。

3、但TimeCharacteristic为EventTime时

窗口的开始时间:与window operator接收到的第一条消息的事件时间有关,例如:如果这条消息的水印时间是2020-2-6 22:17:50,那么窗口的的开始时间就是2020-2-6 22:17:50

窗口的结束时间:与ProcessTime一致

窗口的触发计算时间:假设有一条新的消息到达window operator,如果该事件的水印时间大于窗口的结束时间,就会触发计算。

通常,我们会让水印时间比事件时间允许延迟几秒钟。这样,如果是因为网络延迟消息晚到了几秒,也不会影响到统计结果了。

public classWordCountWindow {public static void main(String[] args) throwsException {//1. 初始化流式运行环境

Configuration conf = newConfiguration();

StreamExecutionEnvironment env=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);//2. 设置时间处理类型,这里设置的方式处理时间

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//3. 定义数据源,每秒发送一个hadoop单词

SingleOutputStreamOperator> wordDSWithWaterMark = env.addSource(new RichSourceFunction>() {private boolean isCanaled = false;private int TOTAL_NUM = 20;

@Overridepublic void run(SourceContext> ctx) throwsException {while (!isCanaled) {

ctx.collect(Tuple2.of("hadooop", System.currentTimeMillis()));//打印窗口开始、结束时间

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println("事件发送时间:" +sdf.format(System.currentTimeMillis()));

Thread.sleep(1000);

}

}

@Overridepublic voidcancel() {

isCanaled= true;

}

}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor>(Time.seconds(5)) {

@Overridepublic long extractTimestamp(Tuple2element) {returnelement.f1;

}

});//4. 每5秒进行一次,分组统计//4.1 转换为元组

wordDSWithWaterMark.map(word ->{return Tuple2.of(word.f0, 1);

})//指定返回类型

.returns(Types.TUPLE(Types.STRING, Types.INT))//按照单词进行分组

.keyBy(t ->t.f0)//滚动窗口,3秒计算一次

.timeWindow(Time.seconds(3))

.reduce(new ReduceFunction>() {

@Overridepublic Tuple2 reduce(Tuple2 value1, Tuple2 value2) throwsException {return Tuple2.of(value1.f0, value1.f1 +value2.f1);

}

},new RichWindowFunction, Tuple2, String, TimeWindow>() {

@Overridepublic void apply(String word, TimeWindow window, Iterable> input, Collector> out) throwsException {//打印窗口开始、结束时间

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println("窗口开始时间:" +sdf.format(window.getStart())+ " 窗口结束时间:" +sdf.format(window.getEnd())+ " 窗口计算时间:" +sdf.format(System.currentTimeMillis()));int sum = 0;

Iterator> iterator =input.iterator();while(iterator.hasNext()) {

Integer count=iterator.next().f1;

sum+=count;

}

out.collect(Tuple2.of(word, sum));

}

}).print();

env.execute("app");

}

}

输出结果如下:事件发送时间:2020-02-06 22:35:08

事件发送时间:2020-02-06 22:35:09

事件发送时间:2020-02-06 22:35:10

事件发送时间:2020-02-06 22:35:11

事件发送时间:2020-02-06 22:35:12

事件发送时间:2020-02-06 22:35:13

事件发送时间:2020-02-06 22:35:14

窗口开始时间:2020-02-06 22:35:06 窗口结束时间:2020-02-06 22:35:09 窗口计算时间:2020-02-06 22:35:14

4> (hadooop,1)

事件发送时间:2020-02-06 22:35:15

事件发送时间:2020-02-06 22:35:16

事件发送时间:2020-02-06 22:35:17

窗口开始时间:2020-02-06 22:35:09 窗口结束时间:2020-02-06 22:35:12 窗口计算时间:2020-02-06 22:35:17

4> (hadooop,3)

参考文件:

内容来源于网络,如有侵权请联系客服删除

http://www.lbrq.cn/news/1464751.html

相关文章:

  • 模板网站也可以做优化/互联网广告是做什么的
  • 一级a做爰片免费视频网站/国外免费建站网站
  • 钢铁网站哪家做的好/互联网关键词优化
  • 个人备案可以做哪些网站/建立网站的流程
  • 张氏万家网站建设/我想做电商怎么加入
  • asp故障解答网站模板/seo优化工作内容
  • 给女朋友做的网站/semester是什么意思
  • 苏州加基森网站建设/海南百度推广代理商
  • 欧洲vodafonewifi巨大app3di/seo 论坛
  • 青岛经济师考试/什么公司适合做seo优化
  • 南京电商网站开发/成都高端品牌网站建设
  • 跨境电商开店详细步骤/沈阳百度seo排名优化软件
  • 资深的金融行业网站开发/安装百度到桌面
  • 长春如何建立一个平台网站/企业网站建设案例
  • 银川网站制作公司/网络营销的12种手段
  • 常州网站制作czyzj/最新军事报道
  • 设计品牌网站公司/自媒体营销模式有哪些
  • 哪些网站做的不好/新闻稿代写平台
  • 设计公司网站设计报价明细表/企业推广方案
  • 佛山 做网站/一个新公众号怎么吸粉
  • 最近做国际网站怎么样/app拉新怎么做
  • 免费域名主机/seo和sem的关系
  • 青海公司网站建设/360营销平台
  • 企业做网站费用/百度推广管理平台登录
  • 贵州网络公司网站建设/百度空间登录入口
  • 企业花钱做的网站出现违禁词/长沙网站seo排名
  • 天津做网站联系方式/百度手机助手应用商店下载
  • 免费最好网站建设/b站推广网站入口2023是什么
  • 四川网站建设服务/百度电脑版官网下载
  • 网站定制与开发/百度手机助手下载
  • SmartCLIP:具有识别保证的模块化视觉-语言对齐
  • 【学习笔记】MySQL技术内幕InnoDB存储引擎——第7章 事务
  • 2025.8.1
  • 【科普】怎么理解Modbus、TCP、UDP
  • Excel数据转化为Xmind思维导图全流程(含Word转化格式),实用
  • 如何快速部署主数据管理解决方案?