当前位置: 首页 > news >正文

东莞东城做网站公司/打开百度一下

东莞东城做网站公司,打开百度一下,seo网站推广的作用,wordpress 所有标签2019独角兽企业重金招聘Python工程师标准>>> 在Core Concepts中介绍了三种语义Event time、Processing-time、Ingestion time。 这里需要注意的是:punctuate方法必须使用新数据才能触发。 时间语义设置 如何实现不同的时间语义主要取决于两个方面&#x…

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

23160756_6CAV.jpg

在Core Concepts中介绍了三种语义Event time、Processing-time、Ingestion time。

这里需要注意的是:punctuate方法必须使用新数据才能触发。

时间语义设置

如何实现不同的时间语义主要取决于两个方面:

message timestamp类型

在0.10.x版本之后timestamps自动嵌入Kafka messages中。基于Kafka配置,这些timestamp代表event-time或者ingestion-time。参数如下,broker参数为log.message.timestamp.type,topic参数为message.timestamp.type。

log.message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。broker的参数配置。

message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。topic创建时参数配置,不设置则使用log.message.timestamp.type。

TimestampExtractor类型

接口TimestampExractor分为两类:

WallclockTimestampExtractor

WallclockTimestampExtractor提供了processing-time语义,只提供了一个方法extract。
  • extract方法获取时间为当前系统时间(System.currentTimeMillis())

ExtractRecordMetadataTimestamp

ExtractRecordMetadataTimestamp为抽象类及其子类提供了非processing-time语义。提供了两个方法extract和onInvalidTimestamp。
  • extract方法获取的是message的timestamp值。与message timestamp类型共同作用来提供event-time或者ingestion-time语义。
  • onInvalidTimestamp抽象方法需实现,主要是当遇到无效的message timestamp时如何进行处理。
ExtractRecordMetadataTimestamp包含如下子类:
  • FailOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,extractor会抛出异常。
  • LogAndSkipOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将此记录在WARN日志中打印,依然返回当前的message timestamp值,最终会在数据处理时会导致忽略处理这条记录。
  • UsePreviousTimeOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将上一条有效记录的timestamp作为当前记录的timestamp值。

自定义TimestampExtractor

可以自定义实现TimestampExtractor接口,使用提取器提取记录中的部分数据作为返回,这样可以灵活设置语义。

一般情况下,我们使用producer发送message到kafka集群时,可以指定message timestamp来设置(也就是event-time),但是如果message timestamp并不是我们需要的,那么就需要自定义提取器来提取message的某个field。

总结

通过如下表格可见:

当使用WallclockTimestampExtractor提供processing-time语义。

当ExtractRecordMetadataTimestamp子类与CreateTime类型一起时,提供event-time语义。

当ExtractRecordMetadataTimestamp子类与LogAppendTime类型一起时,提供ingestion-time语义。

自定义实现TimestampExtractor接口,提供自定义time语义。

语义类型message timestampTimestampExractor
processing-time
 
WallclockTimestampExtractor
event-time
CreateTime
ExtractRecordMetadataTimestamp子类
ingestion-time
LogAppendTime
ExtractRecordMetadataTimestamp子类
自定义语义 自己实现TimestampExtractor

processing-time

事件或数据记录被流处理程序开始处理时的时间点,晚于event-time和ingestion-time

这里需要注意的是:punctuate方法必须使用新数据才能触发。

比如在process-time语义中,设置context.schedule(5000),程序执行时间为20秒,在0到5秒有数据,第一条数据到来会触发第一次punctuate:

如果5秒之后不再获取任何新数据(因为程序我们设置执行时间为20秒)则永远不会触发punctuate;

如果5秒之后才获取一条新数据,则同时触发(0-5)一次punctuate;

如果10秒之后才获取一条新数据,则同时触发(0-5),(5-10)两次punctuate;

如果15秒之后才获取一条新数据,则同时触发(0-5),(5-10),(10-15)三次punctuate;

在processing-time语义下我们需要如下配置:

props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.WallclockTimestampExtractor");//timestamp.extractor

 

import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import java.io.File;
import java.util.Locale;
import java.util.Properties;
/*** Demonstrates, using the low-level Processor APIs, how to implement the* WordCount program that computes a simple word occurrence histogram from an* input text.** In this example, the input stream reads from a topic named* "streams-file-input", where the values of messages represent lines of text;* and the histogram output is written to topic* "streams-wordcount-processor-output" where each record is an updated count of* a single word.** Before running this example you must create the input topic and the output* topic (e.g. via bin/kafka-topics.sh --create ...), and write some data to the* input topic (e.g. via bin/kafka-console-producer.sh). Otherwise you won't see* any data arriving in the output topic.*/
public class WordCountProcessorDemo {private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {@Overridepublic Processor<String, String> get() {return new Processor<String, String>() {private ProcessorContext context;private KeyValueStore<String, Integer> kvStore;@Override@SuppressWarnings("unchecked")public void init(ProcessorContext context) {this.context = context;this.context.schedule(5000);this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");}@Overridepublic void process(String dummy, String line) {String[] words = line.toLowerCase(Locale.getDefault()).split(" ");System.out.println("line:" + line);for (String word : words) {Integer oldValue = this.kvStore.get(word);if (oldValue == null) {this.kvStore.put(word, 1);} else {this.kvStore.put(word, oldValue + 1);}}context.commit();}@Overridepublic void punctuate(long timestamp) {try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) {System.out.println("----------- " + timestamp + "----------- ");//System.out.println(TimestampUtil.TimestampFormat(timestamp));while (iter.hasNext()) {KeyValue<String, Integer> entry = iter.next();System.out.println("[" + entry.key + ", " + entry.value + "]");context.forward(entry.key, entry.value.toString());}}}@Overridepublic void close() {}};}}public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "breath:9092");props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.WallclockTimestampExtractor");TopologyBuilder builder = new TopologyBuilder();builder.addSource("Source", "streams-file-input");builder.addProcessor("Process", new MyProcessorSupplier(), "Source");builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(),"Process");builder.addSink("Sink", "streams-wordcount-processor-output", "Process");KafkaStreams streams = new KafkaStreams(builder, props);streams.start();Thread.sleep(20000L);streams.close();// 清空state storeString appStateDir = props.get(StreamsConfig.STATE_DIR_CONFIG) + System.getProperty("file.separator")+ props.get(StreamsConfig.APPLICATION_ID_CONFIG);FileUtils.deleteDirectory(new File(appStateDir));// 清空application相关中间topic以及input topic的offsetString kafkaHome = System.getenv("KAFKA_HOME");Runtime runtime = Runtime.getRuntime();runtime.exec(kafkaHome + "/bin/kafka-streams-application-reset.sh " + "--application-id "+ props.get(StreamsConfig.APPLICATION_ID_CONFIG) + " " + "--bootstrap-servers breath:9092 "+ "--zookeeper breath:2181/kafka01021 " + "--input-topics streams-file-input");}
}


event-time

事件或数据记录发生的时间点,通常是源头产生。早于processing-time和ingestion-time

event-time的timestamp生成方式有两种

  • 用户可以指定timestamp,在producer.send方法中发送的ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)进行设置。
  • 用户不指定timestamp,则producer.send当时的时间作为timestamp。

在event-time语义下我们需要保证topic的message timestamp类型为CreateTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:

  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor

ingestion-time

事件或数据记录被存储在Kafka broker的某个topic partition中的时间点。早于processing-time,晚于event-time。

在event-time语义下我们需要保证topic的message timestamp类型为LogAppendTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:

  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor

转载于:https://my.oschina.net/yulongblog/blog/1519359

http://www.lbrq.cn/news/1394929.html

相关文章:

  • 做网站一万/上海培训机构
  • 广州澄网站建设公司/石家庄网站优化
  • 网站建设国内外研究现状模板/seo二级目录
  • 保定网站建设冀icp/镇江网站制作公司
  • 网络系统管理与维护形考任务1/seo的关键词无需
  • 江门整站优化/网络营销案例分析报告
  • wordpress 404自定义/商品关键词怎么优化
  • 深圳市手机网站建设/昭通网站seo
  • 做企业网站首页尺寸/营销神器
  • 政务微网站建设方案/网站数据统计工具
  • 做付费视频网站好/seo研究
  • 邯郸网站设计价格/网站的营销推广方案
  • 网上购物系统er图/郑州seo网站关键词优化
  • 甘肃做网站/今日热搜榜官网
  • 开封网站优化公司/制作网站
  • 昌平区手机网站制作服务/武汉整站优化
  • 域名注册网站 简称/如何做好营销推广
  • 有知道做网站的吗/seo技术自学
  • 做类似58类型网站/网络营销的目标
  • 净水 技术支持 东莞网站建设/软文网
  • 用jsp做的购物网站/网站排名首页前三位
  • 如何把图片隐藏到wordpress/满足seo需求的网站
  • 做暧暧小视频有声音的网站/怎么让自己上百度
  • 长沙网站seo优化公司/bt蚂蚁
  • 网站建设公司发展历程/微软bing搜索引擎
  • 班组建设展板哪个网站有/2022搜索引擎
  • 动漫网站建设的目的/推广平台网站热狗网
  • 如何做网站测试/成都百度seo公司
  • 青海城乡建设厅网站/谷歌关键词分析工具
  • 保险网站有哪些保险网站/长沙百度推广运营公司
  • 效率跃迁 ,亚数TrustAsia 加速证书管理迈向 CaaS 新阶段
  • CFBench评测
  • uniapp学习【上手篇】
  • 力扣hot100:三数之和(排序 + 双指针法)(15)
  • 链表-2.两数相加-力扣(LeetCode)
  • 应用控制技术、内容审计技术、AAA服务器技术