网站建设的价值/网页设计培训
一 Producer的API
1 消息发送流程
Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
kafkaProoducer消息发送流程:
主线程不负责消息传递的具体过程,以提高效率,main将消息封装成ProducerRecord,经过一系列处理分门别类地将消息放到RecordAccumulator(主线程和sender线程共享的资源池)中,sender根据RA中已经分好区的数据发送到topic。
2 实现步骤
导入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>
发送者发送数据程序代码
package com.hike.producer;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class Producer {public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {//1 实例化kafka集群(创建对象)Properties properties = new Properties();//通过配置文件配置//properties.load(Producer.class.getClassLoader().getResourceAsStream("kafka.properties"));//通过程序代码设置properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.setProperty("acks","all");properties.setProperty("bootstrap.servers","hadoop101:9092");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);//2 用集群对象发送数据for (int i = 0; i < 10; i++) {Future<RecordMetadata> future = producer.send(//2.1 封装ProducerRecordnew ProducerRecord<String, String>("hello",Integer.toString(i),"Value" + i),//2.2 回调函数new Callback() {//当sender收到服务器的ack之后,sender线程会调用onCompletion方法public void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null) {System.out.println(recordMetadata);}}});//RecordMetadata recordMetadata = future.get(); //添加此语句,发送方式变为同步操作System.out.println("第" + i + "条发送成功");}//3 关闭资源producer.close();}
}
二 Consumer的API
1 自动提交offset
Consumer消费数据时的可靠性是很容易保证的,因为数据在Kafka中是持久化的,故不用担心数据丢失问题。
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
所以offset的维护是Consumer消费数据必须考虑的问题。
消费者消费数据程序代码
package com.hike.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.io.IOException;
import java.util.Collections;
import java.util.Properties;public class Consumer {public static void main(String[] args) throws IOException, InterruptedException {//1 新建一个consumer对象Properties properties = new Properties();properties.load(Consumer.class.getClassLoader().getResourceAsStream("consumer1.properties"));KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//2 用这个对象接收消息//发布订阅模式接收消息,先订阅consumer.subscribe(Collections.singleton("hello"));while(true){//从订阅的话题中拉取数据ConsumerRecords<String, String> poll = consumer.poll(2000);if(poll.count() == 0){Thread.sleep(100);}//消费拉取到的数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//3 关闭资源//consumer.close();}
}
consumer1.properties文件
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
bootstrap.servers=hadoop101:9092
enable.auto.commit=true
group.id=test2
auto.offset.reset=earliest
2 手动提交offset
虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka提供了手动提交offset的API。
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
修改配置文件中的enable.auto.commit=false选项并在消费拉取到的数据之后添加consumer.commitSync();语句即可。由于同步提交offset有失败重试机制,故更加可靠。
//消费拉取到的数据for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}consumer.commitSync();
同步提交offset更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。在更多的情况下,会选用异步提交offset的方式。