当前位置: 首页 > news >正文

南阳做网站/合肥百度关键词推广

南阳做网站,合肥百度关键词推广,wordpress变域名工具,手机型网站1. kafka分区数据顺序性 kafka具有分区内数据有序的特点,可以通过将数据指定到特定的分区来实现数据的顺序性。kafka分区逻辑代码如下:如果指定了分区号生产,则发送到指定分区;否则调用分区器计算方法partitioner.partition() p…

1. kafka分区数据顺序性

kafka具有分区内数据有序的特点,可以通过将数据指定到特定的分区来实现数据的顺序性。kafka分区逻辑代码如下:如果指定了分区号生产,则发送到指定分区;否则调用分区器计算方法partitioner.partition()

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();return partition != null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

一共三个分区器实现了Partitioner类的partition()方法:

  1. DefaultPartitioner
    指定分区则使用;否则如果存在key则根据key去hash;否则batch满了切换分区。
  2. RoundRobinPartitioner
    没指定分区则平均分配循环写入分区。
  3. UniformStickyPartitioner
    和默认相比去除掉key取hash相关的规则。

综上,我们想实现数据顺序入kafka,可以指定分区写或者通过设置key值相同保证数据入同一个分区。但是要注意避免全部数据入同一分区的场景,最好将数据分组即保证组内数据有序而不是全局有序。

如果采用设置key值相同方式进行组内数据入同一分区,则计算分区方式如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//key为null等同于UniformStickyPartitioner分区器if (keyBytes == null) {return stickyPartitionCache.partition(topic, cluster);} List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// key取hash后再取正值(并非绝对值)再对分区数量取余return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

经测试尽量保证key的前缀多样化来保证数据的均匀分布,可以对自己的数据进行测试来敲定key的定义方式:如下数据返回结果为:1 1 1 0 2 / 0 2 0 1 2 / 0 2 2 1 0

System.out.print(Utils.toPositive(Utils.murmur2("test11".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("test12".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("test13".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("test14".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("test15".getBytes()))%3+" ");
System.out.println();
System.out.print(Utils.toPositive(Utils.murmur2("1test1".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("2test1".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("3test1".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("4test1".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("5test1".getBytes()))%3+" ");
System.out.println();
System.out.print(Utils.toPositive(Utils.murmur2("1".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("2".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("3".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("4".getBytes()))%3+" ");
System.out.print(Utils.toPositive(Utils.murmur2("5".getBytes()))%3+" ");
System.out.println();

2. Flink消费kafka的顺序性

首先构造三个分区的topic,然后写入测试数据:指定了key和每个key的版本号,以版本号升序方式写入kafka。

new ProducerRecord<>("test1", "a", "{\"key\":\"a\",\"value\":\"1\",\"time\":1623588192345}");
new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"1\",\"time\":1623588192342}");
new ProducerRecord<>("test1", "a", "{\"key\":\"a\",\"value\":\"2\",\"time\":1623588192347}");
new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"2\",\"time\":1623588192344}");
new ProducerRecord<>("test1", "c", "{\"key\":\"c\",\"value\":\"1\",\"time\":1623588192345}");
new ProducerRecord<>("test1", "c", "{\"key\":\"c\",\"value\":\"2\",\"time\":1623588192348}");
new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"3\",\"time\":1623588192346}");
new ProducerRecord<>("test1", "a", "{\"key\":\"a\",\"value\":\"3\",\"time\":1623588192349}");
new ProducerRecord<>("test1", "b", "{\"key\":\"b\",\"value\":\"4\",\"time\":1623588192348}");

通过以下命令可以查看kafka topic数据和消费组情况:

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test1  --time -1
./kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group test1 --describe

以上数据分布情况如下:key a和key c位于1号分区,key b位于2号分区。

test1:0:0
test1:1:5
test1:2:4

编写flink代码消费kafka观察数据顺序性:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度为3对应分区数,但是只有
env.setParallelism(3);
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String[] fields = new String[]{"key","value","time"};
TypeInformation[] type = new TypeInformation[3];
type[0] = Types.STRING;
type[1] = Types.STRING;
type[2] = Types.LONG;
RowTypeInfo rowTypeInfo = new RowTypeInfo(type, fields);SingleOutputStreamOperator<Row> dataStreamSource = env.addSource(new FlinkKafkaConsumer("test1",new SimpleStringSchema(),properties).setStartFromEarliest()).process(new JsonToRow(rowTypeInfo), rowTypeInfo).name("kafkaSource").uid("kafkaSource");
//source数据情况
dataStreamSource.print("source");
//按key对数据进行分区
KeyedStream<Row, String> key = dataStreamSource.keyBy((KeySelector<Row, String>) value ->  value.getFieldAs(0));
//keyby后数据情况
key.print("sink");env.execute("kafkatest");

输出结果如下:

source:3> +I[a, 1, 1623588192345]
source:1> +I[b, 1, 1623588192342]
source:1> +I[b, 2, 1623588192344]
source:3> +I[a, 2, 1623588192347]
source:1> +I[b, 3, 1623588192346]
source:3> +I[c, 1, 1623588192345]
source:1> +I[b, 4, 1623588192348]
source:3> +I[c, 2, 1623588192348]
source:3> +I[a, 3, 1623588192349]
sink:2> +I[a, 1, 1623588192345]
sink:1> +I[b, 1, 1623588192342]
sink:2> +I[a, 2, 1623588192347]
sink:1> +I[b, 2, 1623588192344]
sink:1> +I[b, 3, 1623588192346]
sink:2> +I[c, 1, 1623588192345]
sink:1> +I[b, 4, 1623588192348]
sink:2> +I[c, 2, 1623588192348]
sink:2> +I[a, 3, 1623588192349]

可以看出source和sink每个线程输出的数据中均按key值的版本号升序排布,即flink消费kafka和进行keyby操作(shuffle)均为破坏kafka的分区有序性。

3. Flink消费kafka并进行checkpoint

在上文代码基础上配置checkpoint配置,设置为本地文件存储且任务停止时保留checkpoint文件

String topic = args[0];
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///Users/caster/Desktop/checkpoint");// 两个checkpoint超时时间之间的间隔和语义
env.enableCheckpointing(5*1000, CheckpointingMode.AT_LEAST_ONCE);
// checkpoint超时时间
env.getCheckpointConfig().setCheckpointTimeout(60*1000);
// 最大同时进行的checkpoint任务,即多个Barrier进入处理流程
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 两次checkpoint之间的空闲时间最小值,设置后上一个配置则为1
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
// 设置为0表示不容忍checkpoint失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
// 重启策略2:每隔10s尝试重启一次共三次,超时一分钟则失败
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.seconds(60),Time.seconds(10)));// 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setParallelism(3);Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", topic);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String[] fields = new String[]{"key","value","time"};
TypeInformation[] type = new TypeInformation[3];
type[0] = Types.STRING;
type[1] = Types.STRING;
type[2] = Types.LONG;
RowTypeInfo rowTypeInfo = new RowTypeInfo(type, fields);SingleOutputStreamOperator<Row> dataStreamSource = env.addSource(new FlinkKafkaConsumer(topic,new SimpleStringSchema(),properties).setStartFromEarliest()).process(new JsonToRow(rowTypeInfo), rowTypeInfo).name("kafkaSource").uid("kafkaSource");
//source数据情况
dataStreamSource.print("source");
KeyedStream<Row, String> keyedStream = dataStreamSource.keyBy((KeySelector<Row, String>) value -> value.getFieldAs(0));
//keyby后数据情况
keyedStream.print("sink");env.execute("kafkatest");

mvn打包带有依赖的jar包,然后到flink 客户端提交任务:

./flink run flink-compute-1.0-SNAPSHOT.jar test1

提交成功会返回job 的id:

Job has been submitted with JobID e7309690361278a82675c7d981057692

且在配置的checkpoint目录可以看到对应的job的目录,在不断进行新的checkpoint
checkpoint目录
在flink 的 web UI可以看到具体的两个print()输出,直到kafka数据全部消费完毕,如下图:
job 输出情况

关闭任务,并向kafka内新生产部分数据,测试指定checkpoint目录恢复任务,命令如下:

./flink run -s /Users/caster/Desktop/checkpoint/e7309690361278a82675c7d981057692/chk-19  flink-compute-1.0-SNAPSHOT.jar test1

相当于重新启动新的flink job,job id发生改变,checkpoint目录也随之改变。
观察到任务会从上次消费到的位置继续消费,红线上为第一个job输出,红线下为第二个job输出:
job 输出情况

http://www.lbrq.cn/news/1383355.html

相关文章:

  • 女和女做网站/神马推广登录
  • 原子艺术做的网站怎么样子/自己怎么优化网站
  • 娄底企业网站建设制作/公司网站建设哪家公司好
  • 做初中数学题的网站/淘宝推广方法有哪些
  • 成都建站开发/网络营销ppt模板
  • 如何做新闻源网站/信息流优化师培训
  • 大都会app约/站群seo
  • 重庆网站制作机构/朋友圈营销广告
  • 山西运城给网站做系统的公司/电商平台排行榜
  • 视频网站开发视频/电商运营怎么自学
  • 网站建设报价word文档/千川推广官网
  • 浪起网站建设/优化推广联盟
  • 武汉做企业网站/如何自己开发软件app
  • 建设厅网站如何查询企业信息/莆田百度快照优化
  • 高端的科技网站建设/软文推广案例大全
  • 游戏网站开发公司/广州seo推广
  • 定制一款app/咖啡seo是什么意思
  • 如何开发网站自己做站长/外包公司的优势和劣势
  • 怎样搭建web网站/电商网站建设报价
  • 绍兴网站建设设计/西安网站制作费用
  • 外贸网站推广 上海/网站推广优化排名公司
  • 网站制作价格便宜/aso关键词覆盖优化
  • 网站里的横幅怎么做/天津疫情最新情况
  • 电脑微信公众号登录入口/seo线上培训多少钱
  • flash做导航网站/网站推广的作用在哪里
  • 织梦贷款网站模板/中国今天刚刚发生的新闻
  • 床伸舌头哔哩哔哩原声/东莞seo技术
  • 河南网站推广/微信指数是搜索量吗
  • 网站免费源码大全/衡水网站优化推广
  • 个人做的网站不能做淘客/谷歌推广和seo
  • rsync+sersync实现文件实时同步
  • 嵌入式linux驱动开发:什么是Linux驱动?深度解析与实战入门
  • 【Lambda】flatMap使用案例
  • Python爬虫分析B站番剧播放量趋势:从数据采集到可视化分析
  • 聊聊测试环境不稳定如何应对
  • 【C++算法】72.队列+宽搜_二叉树的最大宽度