当前位置: 首页 > news >正文

有没有个人做的网站赚流量费/上海网站优化公司

有没有个人做的网站赚流量费,上海网站优化公司,学做网站开发吗,北京公司注册流程及资料使用canal实时读取mysql变更数据保存到kafka消息队列 订阅canal server中的binlog数据,之后解析后,以json格式发送到kafka!① canal client 每次从canal server获取的数据会封装在一个Message对象中,一个Message对象包含了这次拉取的多条sql语…

使用canal实时读取mysql变更数据保存到kafka消息队列

 订阅canal server中的binlog数据,之后解析后,以json格式发送到kafka!① canal client 每次从canal server获取的数据会封装在一个Message对象中,一个Message对象包含了这次拉取的多条sql语句写操作的结果! 每个Message中包含了一个 List<Entry>②每个Entry代表一个sql语句影响的行, 包含了这个sql语句操作的表名,这个Entry的类型 EntryType每个Entry的值最终保存在storeValue属性中,这个storeValue是字节数组,必须反序列化后才能使用③ 可以使用RowChange将 Entry中的storeValue进行反序列化为 RowChange对象一个RowChange对象,代表一个sql影响的多行变化的结果可以从RowChange对象中,获取当前sql的操作类型,例如INSERT,UPDATE等④ 从RowChange中获取 RowDataList,打包多行数据变化的集合从RowDataList中获取RowData,代表一行数据的变化!⑤从RowData中获取对应Column变化的值,也可以获取列名
package com.canal.realtime;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.gmall.common.constansts.GmallConstants;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;
import java.util.List;public class CanalClient {public static void main(String[] args) throws InvalidProtocolBufferException {//1.建立和canal的连接CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop102", 11111), "example", "", "");canalConnector.connect();//2.订阅cannal中指定的表的binlog数据//只订阅gmall2021库下的所有表canalConnector.subscribe("gmall2021.*");//3.解析binglog数据//3.1 从binglog中拉取messagewhile (true){//从canal server中拉取100条数据Message message = canalConnector.get(100);//判断是否拿到了数据if (message.getId() == -1){try {System.out.println("当前没有数据,0.5s后再次尝试读取数据");Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}continue;}//拿到了数据,开始进行数据的解析工作List<CanalEntry.Entry> entries = message.getEntries();//每个entry都有entrytype,storevalue,tablename//当前只读取entrytype是ROWDATA的entry,将符合条件的entry的storevalue反序列化for (CanalEntry.Entry entry : entries) {if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)){//获取storevalueByteString storeValue = entry.getStoreValue();//反序列化CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);//获取当前entry是对那个表的操作结果String tableName = entry.getHeader().getTableName();//对获取的数据进行处理handle(tableName, rowChange);}}}}//只需要将order_info发生变化的数据,发送到kafka//需要采集的是order_info当天生成的数据,update和delete不管private static void handle(String tableName, CanalEntry.RowChange rowChange) {if ("order_info".equals(tableName) && rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){//获取所有变化的行List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();//遍历所有行,取出列名和值,封装为json格式for (CanalEntry.RowData rowData : rowDatasList) {//4.以json格式,格式化数据JSONObject jsonObject = new JSONObject();//获取当前行,那些列发生变化后的结果List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {jsonObject.put(column.getName(),column.getValue());}//5. 将数据发送到kafkaMyProducer.send(GmallConstants.KAFKA_TOPIC_NEW_ORDER, jsonObject.toString());}}}}

kefka生产者:

package com.canal.realtime;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class MyProducer {private static Producer<String, String> producer;static {producer = createProducer();}//创建Producerpublic static Producer<String, String> createProducer(){//kafka连接的配置信息Properties properties = new Properties();properties.put("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092");properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);return kafkaProducer;}//将数据写入到kafkapublic static void send(String topic, String value){producer.send(new ProducerRecord<>(topic,value));}}
http://www.lbrq.cn/news/1554049.html

相关文章:

  • 哪个网站做加盟的比较靠谱/百度最新秒收录方法2021
  • 四川省住房和城乡建设厅网站无法进入/百度快照网站
  • 网站建设维修服务流程/百度首页关键词推广
  • 如何判断网站html5/说到很多seo人员都转行了
  • 如何在阿里云云服务器上搭建网站/网络营销发展现状与趋势
  • 苏州市建设交易中心网站首页/重庆网
  • 建站国外百元服务器/培训心得体会怎么写
  • 网站流程图/杭州优化建筑设计
  • 动态网站开发的感想/seo网站诊断报告
  • 建站公司专业团队/国内前10电商代运营公司
  • 网站页脚版权信息/百度权重排名查询
  • 网页模板免费下载html/太原网站seo
  • 新公司如何做网站/近期新闻热点
  • 用ps怎么做网站首页/竞价推广专员
  • 武汉建设学院网站/营销型网站建设推广
  • 我要制作网站/阿里云域名注册官网
  • 免费政府网站html模板/乱码链接怎么用
  • 销售手机网站的后期安排/seo的优化技巧和方法
  • 扁平化配色方案网站/职业培训机构资质
  • 手机优化对手机有影响吗/seo收录查询
  • 用电脑建立网站/军事新闻今日最新消息
  • ASPJSP动态网站开发/全球网站流量排名100
  • 大学生学风建设专题网站/添加友情链接的技巧
  • 信誉好的o2o网站建设/足球世界排名前十
  • 网站在vps能访问 在本地访问不了/手机自动排名次的软件
  • 公司网站建app/优化英文
  • 怎么在一个网站做编辑/爱战网关键词挖掘查询工具
  • 上海移动端网站建设/整合营销传播策划方案
  • 17网站一起做网店潮汕档口/百度快照推广排名
  • 装修设计软件哪个好用/江苏企业seo推广
  • 智能汽车领域研发,复用云原生开发范式?
  • go基础学习笔记
  • Houdini 粒子学习笔记
  • 数据结构初阶(19)外排序·文件归并排序的实现
  • 计算机网络 HTTP和HTTPS 区别
  • IOMMU多级页表查找的验证