当前位置: 首页 > news >正文

企业网站seo外包 s/国际新闻网

企业网站seo外包 s,国际新闻网,技术支持公司做的网站怎么查,做视频网站视频来源kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考。kafka的安装请参考官方文档。首先我们需要新建一个maven项目,然后在pom中引用kafka jar包,引用依赖如…

kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考。kafka的安装请参考官方文档。

首先我们需要新建一个maven项目,然后在pom中引用kafka jar包,引用依赖如下:

org.apache.kafka

kafka_2.10

0.8.0

旧版scala写法, 下面我们看下生产消息的代码:

package com.iaiai;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

/**

* Hello world!

*

*/

public class KafkaProducer

{

private final Producer producer;

public final static String TOPIC = "TEST-TOPIC";

private KafkaProducer(){

Properties props = new Properties();

//此处配置的是kafka的端口

props.put("metadata.broker.list", "192.168.193.148:9092");

//配置value的序列化类

props.put("serializer.class", "kafka.serializer.StringEncoder");

//配置key的序列化类

props.put("key.serializer.class", "kafka.serializer.StringEncoder");

//request.required.acks

//0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).

//1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).

//-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.

props.put("request.required.acks","-1");

producer = new Producer(new ProducerConfig(props));

}

void produce() {

int messageNo = 1000;

final int COUNT = 10000;

while (messageNo < COUNT) {

String key = String.valueOf(messageNo);

String data = "hello kafka message " + key;

producer.send(new KeyedMessage(TOPIC, key ,data));

System.out.println(data);

messageNo ++;

}

}

public static void main( String[] args )

{

new KafkaProducer().produce();

}

}

最新的java版写法:

package com.iaiai;

import kafka.producer.KeyedMessage;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;

import java.util.Map;

import java.util.Properties;

import java.util.Random;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.Future;

/**

* Created with IntelliJ IDEA.

* Package: com.iaiai.db.service.impl

* Author: iaiai

* Create Time: 16/10/3 下午12:57

* QQ: 176291935

* Url: http://iaiai.iteye.com

* Email: 176291935@qq.com

* Description: 生产消息

*/

public class KafkaProducer {

private final org.apache.kafka.clients.producer.KafkaProducer producer;

public final static String TOPIC = "TEST-TOPIC";

private KafkaProducer(){

Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.1.111:9092");

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

// props.put(ProducerConfig.ACKS_CONFIG)

//request.required.acks

//0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).

//1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).

//-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.

// props.put("request.required.acks","-1");

producer = new org.apache.kafka.clients.producer.KafkaProducer(props);

}

void produce() {

int messageNo = 1;

final int COUNT = 2;

while (messageNo < COUNT) {

String key = String.valueOf(messageNo);

String data = "hello kafka message " + key;

boolean sync = false; //是否同步

if (sync) {

try {

producer.send(new ProducerRecord(TOPIC, data)).get();

} catch (Exception e) {

e.printStackTrace();

}

} else {

producer.send(new ProducerRecord(TOPIC, data));

}

//必须写下面这句,相当于发送

producer.flush();

messageNo ++;

}

}

public static void main( String[] args ) {

new KafkaProducer().produce();

}

}

下面是消费端的代码实现:

package com.iaiai;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.serializer.StringDecoder;

import kafka.utils.VerifiableProperties;

public class KafkaConsumer {

private final ConsumerConnector consumer;

private KafkaConsumer() {

Properties props = new Properties();

//zookeeper 配置

props.put("zookeeper.connect", "192.168.193.148:2181");

//group 代表一个消费组

props.put("group.id", "jd-group");

//zk连接超时

props.put("zookeeper.session.timeout.ms", "4000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

props.put("auto.offset.reset", "smallest");

//序列化类

props.put("serializer.class", "kafka.serializer.StringEncoder");

ConsumerConfig config = new ConsumerConfig(props);

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);

}

void consume() {

Map topicCountMap = new HashMap();

topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());

StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

Map>> consumerMap =

consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

KafkaStream stream = consumerMap.get(KafkaProducer.TOPIC).get(0);

ConsumerIterator it = stream.iterator();

while (it.hasNext())

System.out.println(it.next().message());

}

public static void main(String[] args) {

new KafkaConsumer().consume();

}

}

注意消费端需要配置成zk的地址,而生产端配置的是kafka的ip和端口。

欢迎加入QQ群:104286694

http://www.lbrq.cn/news/1438003.html

相关文章:

  • 网站制作工资/网络优化工程师前景
  • 深圳营销外贸网站制作/百度用户服务中心人工24小时电话
  • 网站 模板/高清视频线和音频线的接口类型
  • 花20亿做网站/淘宝关键词指数
  • 比较著名的网站用javaweb做的/百度新闻发布平台
  • 深圳华强北做网站/竞猜世界杯
  • 洛阳便宜网站建设报价/阿里巴巴官网首页
  • 没有固定ip做网站/网站排名优化工具
  • 把网站做成手机版/国外免费推广网站有哪些
  • 国家企业信用平台官网/在线seo优化工具
  • 全球设计网分站/网络app推广是什么工作
  • 网站建设和推广的完整话术/怎么推广软件
  • 珠海网站建设小程序/河南网站推广多少钱
  • 重视党建网站建设/东莞最新疫情
  • 网站读取错误时怎样做/微信投放广告多少钱
  • 美国空间怎么提高网站速度/阿里指数查询官网入口
  • 长沙做网站的/网络营销的目的是什么
  • html5网站正在建设中模板下载/朋友圈广告投放平台
  • 商用营销型网站建设/品牌推广的意义
  • 如何给国外网站做seo/外链网站推荐几个
  • 最便宜的外贸网站建设/舟山seo
  • 网站点击滚动图片代码/市场营销策划案例经典大全
  • 网站怎么做虚拟连接/小广告清理
  • 如何知道网站的字体/关键词优化是怎样收费的
  • 系统开发北京网站建设/互联网营销软件
  • 东莞设计兼职网站建设/十堰seo优化
  • 制作系部网站首页/百度知道灰色词代发收录
  • wordpress retina/廊坊seo排名
  • 封面新闻是国家级媒体/重庆排名优化整站优化
  • b2b电子商务模式的网站/网站推广软件费用是多少
  • 使用HalconDotNet实现异步多相机采集与实时处理
  • Python 类元编程(导入时和运行时比较)
  • Linux 桌面到工作站的“性能炼金术”——开发者效率的 6 个隐形瓶颈与破解方案
  • python的游戏评级论坛系统
  • PCBA:电子产品制造的核心环节
  • String里常用的方法