有没有个人做的网站赚流量费/上海网站优化公司
使用canal实时读取mysql变更数据保存到kafka消息队列
订阅canal server中的binlog数据,之后解析后,以json格式发送到kafka!① canal client 每次从canal server获取的数据会封装在一个Message对象中,一个Message对象包含了这次拉取的多条sql语句写操作的结果! 每个Message中包含了一个 List<Entry>②每个Entry代表一个sql语句影响的行, 包含了这个sql语句操作的表名,这个Entry的类型 EntryType每个Entry的值最终保存在storeValue属性中,这个storeValue是字节数组,必须反序列化后才能使用③ 可以使用RowChange将 Entry中的storeValue进行反序列化为 RowChange对象一个RowChange对象,代表一个sql影响的多行变化的结果可以从RowChange对象中,获取当前sql的操作类型,例如INSERT,UPDATE等④ 从RowChange中获取 RowDataList,打包多行数据变化的集合从RowDataList中获取RowData,代表一行数据的变化!⑤从RowData中获取对应Column变化的值,也可以获取列名
package com.canal.realtime;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.gmall.common.constansts.GmallConstants;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;
import java.util.List;public class CanalClient {public static void main(String[] args) throws InvalidProtocolBufferException {//1.建立和canal的连接CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop102", 11111), "example", "", "");canalConnector.connect();//2.订阅cannal中指定的表的binlog数据//只订阅gmall2021库下的所有表canalConnector.subscribe("gmall2021.*");//3.解析binglog数据//3.1 从binglog中拉取messagewhile (true){//从canal server中拉取100条数据Message message = canalConnector.get(100);//判断是否拿到了数据if (message.getId() == -1){try {System.out.println("当前没有数据,0.5s后再次尝试读取数据");Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}continue;}//拿到了数据,开始进行数据的解析工作List<CanalEntry.Entry> entries = message.getEntries();//每个entry都有entrytype,storevalue,tablename//当前只读取entrytype是ROWDATA的entry,将符合条件的entry的storevalue反序列化for (CanalEntry.Entry entry : entries) {if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)){//获取storevalueByteString storeValue = entry.getStoreValue();//反序列化CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);//获取当前entry是对那个表的操作结果String tableName = entry.getHeader().getTableName();//对获取的数据进行处理handle(tableName, rowChange);}}}}//只需要将order_info发生变化的数据,发送到kafka//需要采集的是order_info当天生成的数据,update和delete不管private static void handle(String tableName, CanalEntry.RowChange rowChange) {if ("order_info".equals(tableName) && rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){//获取所有变化的行List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();//遍历所有行,取出列名和值,封装为json格式for (CanalEntry.RowData rowData : rowDatasList) {//4.以json格式,格式化数据JSONObject jsonObject = new JSONObject();//获取当前行,那些列发生变化后的结果List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {jsonObject.put(column.getName(),column.getValue());}//5. 将数据发送到kafkaMyProducer.send(GmallConstants.KAFKA_TOPIC_NEW_ORDER, jsonObject.toString());}}}}
kefka生产者:
package com.canal.realtime;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class MyProducer {private static Producer<String, String> producer;static {producer = createProducer();}//创建Producerpublic static Producer<String, String> createProducer(){//kafka连接的配置信息Properties properties = new Properties();properties.put("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092");properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);return kafkaProducer;}//将数据写入到kafkapublic static void send(String topic, String value){producer.send(new ProducerRecord<>(topic,value));}}