当前位置: 首页 > news >正文

网站建设的价值/网页设计培训

网站建设的价值,网页设计培训,html菜鸟教程css,男科是去私立还是公立一 Producer的API 1 消息发送流程 Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,S…

一 Producer的API

1 消息发送流程

Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

kafkaProoducer消息发送流程:
在这里插入图片描述

主线程不负责消息传递的具体过程,以提高效率,main将消息封装成ProducerRecord,经过一系列处理分门别类地将消息放到RecordAccumulator(主线程和sender线程共享的资源池)中,sender根据RA中已经分好区的数据发送到topic。

2 实现步骤

导入依赖

    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>

发送者发送数据程序代码

package com.hike.producer;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class Producer {public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {//1 实例化kafka集群(创建对象)Properties properties = new Properties();//通过配置文件配置//properties.load(Producer.class.getClassLoader().getResourceAsStream("kafka.properties"));//通过程序代码设置properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("acks","all");properties.setProperty("bootstrap.servers","hadoop101:9092");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);//2 用集群对象发送数据for (int i = 0; i < 10; i++) {Future<RecordMetadata> future = producer.send(//2.1 封装ProducerRecordnew ProducerRecord<String, String>("hello",Integer.toString(i),"Value" + i),//2.2 回调函数new Callback() {//当sender收到服务器的ack之后,sender线程会调用onCompletion方法public void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println(recordMetadata);}}});//RecordMetadata recordMetadata = future.get(); //添加此语句,发送方式变为同步操作System.out.println("第" + i + "条发送成功");}//3 关闭资源producer.close();}
}

二 Consumer的API

1 自动提交offset

Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

所以offset的维护是Consumer消费数据必须考虑的问题。

消费者消费数据程序代码

package com.hike.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.io.IOException;
import java.util.Collections;
import java.util.Properties;public class Consumer {public static void main(String[] args) throws IOException, InterruptedException {//1 新建一个consumer对象Properties properties = new Properties();properties.load(Consumer.class.getClassLoader().getResourceAsStream("consumer1.properties"));KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//2 用这个对象接收消息//发布订阅模式接收消息,先订阅consumer.subscribe(Collections.singleton("hello"));while(true){//从订阅的话题中拉取数据ConsumerRecords<String, String> poll = consumer.poll(2000);if(poll.count() == 0){Thread.sleep(100);}//消费拉取到的数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//3 关闭资源//consumer.close();}
}

consumer1.properties文件

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
bootstrap.servers=hadoop101:9092
enable.auto.commit=true
group.id=test2
auto.offset.reset=earliest

2 手动提交offset

虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka提供了手动提交offset的API。

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。

修改配置文件中的enable.auto.commit=false选项并在消费拉取到的数据之后添加consumer.commitSync();语句即可。由于同步提交offset有失败重试机制,故更加可靠。

			 //消费拉取到的数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}consumer.commitSync();

同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。在更多的情况下,会选用异步提交offset的方式。

http://www.lbrq.cn/news/767323.html

相关文章:

  • 企业网站一般用什么框架做/广东深圳龙华区
  • 网站建设属于应用软件吗/seo综合查询中的具体内容有哪些
  • 做网站多少分辨率就可以/免费招聘信息发布平台
  • 如何利用视频网站做推广/域名大全查询
  • 数据库和网站开发/策划网络营销方案
  • 佛山网站优化效果/青岛疫情最新情况
  • vps如何限制网站网速/抖音营销推广怎么做
  • 做柱状图好看的网站/企业关键词优化专业公司
  • 淘宝代运营公司一般怎么收费的/seo服务公司
  • 网站建设的大公司好/外链优化
  • 网站建设销售好做吗/推荐6个免费国外自媒体平台
  • 合肥专业做网站的公司哪家好/百度成都总部
  • 建设公司网站费用多少/如何做好企业推广
  • 秦皇岛网站建设多少钱/今日热搜榜排行榜
  • 太仓公司网站建设电话/全国疫情最新
  • 男女做那个那个的视频网站/如何查看百度指数
  • 网站建设申请理由/aso优化
  • 义乌市做网站/百度关键词快速优化
  • 杭州网站建设培训班/网络推广主要是做什么工作
  • 网站设置多少个关键词/百度平台官网
  • 用asp.net做购物网站/优化疫情政策
  • 做翻页电子书的网站/互动营销案例100
  • 安徽php网站建设/优化网站做什么的
  • 网购哪个网站好又便宜/百度一下官方网页
  • 太原网站建设哪家最好/淘宝店铺怎么免费推广
  • 衡水精品网站建设价格/网络广告类型
  • 网站建设功能需求/北京网站seo费用
  • 网站搭建者/网站查询备案信息
  • 做平面的素材网站/网络营销的手段有哪些
  • 温州做网站优化/高端企业网站定制公司
  • Spring Boot项目调用第三方接口的三种方式比较
  • 人工智能入门①:AI基础知识(上)
  • 机器学习阶段性总结:对深度学习本质的回顾 20250813
  • 《Python学习之基础语法1:从零开始的编程之旅》
  • 【物联网】基于树莓派的物联网开发【26】——树莓派开启串口并配置串口助手Minicom
  • 前后端分离项目中Spring MVC的请求执行流程