当前位置: 首页 > news >正文

西安北郊网站开发网站推广包括

西安北郊网站开发,网站推广包括,手机pc微信三合一网站,网站改版 被百度k一、kafka集群搭建   至于kafka是什么我都不多做介绍了,网上写的已经非常详尽了。 (没安装java环境的需要先安装 yum -y install java-1.8.0-openjdk*) 1. 下载zookeeper https://zookeeper.apache.org/releases.html 2. 下载kafka http://kafka.apache.org/down…

一、kafka集群搭建  

  至于kafka是什么我都不多做介绍了,网上写的已经非常详尽了。

 (没安装java环境的需要先安装 yum -y install java-1.8.0-openjdk*)

1. 下载zookeeper  https://zookeeper.apache.org/releases.html

2. 下载kafka http://kafka.apache.org/downloads

3. 启动zookeeper集群(我的示例是3台机器,后面的kafka也一样,这里就以1台代指3台,当然你也可以只开1台

  1)配置zookeeper。 修改复制一份 zookeeper-3.4.13/conf/zoo_sample.cfg 改名成zoo.cfg。修改以下几个参数,改成适合自己机器的。

  dataDir=/home/test/zookeeper/datadataLogDir=/home/test/zookeeper/logserver.1=10.22.1.1:2888:3888server.2=10.22.1.2:2888:3888server.3=10.22.1.3:2888:3888

  2) 创建myid文件,确定机器编号。分别在3台机器的/home/test/zookeeper/data目录执行分别执行命令 echo 1 > myid(注意ip为10.22.1.2把1改成2,见上面的配置)

  3) 启动zookeeper集群。分别进入目录zookeeper-3.4.13/bin 执行 sh zkServer.sh start

4. 启动kafka集群

  1) 配置kafka。进入kafka_2.11-2.2.0/config。复制3份,分别为server1.properties,server2.properties,server3.properties。修改以下几项(注意对应的机器id)

log.dirs和zookeeper.connect 是一样的。broker.id和listeners分别填对应的id和ip
broker.id=1
listeners=PLAINTEXT://10.22.1.1:9092
log.dirs=/home/test/kafka/log
zookeeper.connect=10.22.1.1:2181,10.22.1.2:2181,10.22.1.3:2181

  2) 启动kafka集群。分别进入kafka_2.11-2.2.0/bin目录,分别执行sh kafka-server-start.sh ../config/server1.properties (第2台用server2.properties配置文件)

 

二、Golang生产者和消费者

  目前比较流行的golang版的kafka客户端库有两个:

  1. https://github.com/Shopify/sarama

  2. https://github.com/confluentinc/confluent-kafka-go

  至于谁好谁坏自己去分辨,我用的是第1个,star比较多的。

 

1. kafka生产者代码

  这里有2点要说明:

  1)  config.Producer.Partitioner = sarama.NewRandomPartitioner,我分partition用的是随机,如果你想稳定分paritition的话可以自定义,还有轮询和hash方式

  2) 我的topic是走的外部配置,可以根据自己的需求修改

// Package kafka_producer kafka 生产者的包装
package kafka_producerimport ("github.com/Shopify/sarama""strings""sync""time""github.com/alecthomas/log4go"
)// Config 配置
type Config struct {Topic      string `xml:"topic"`Broker     string `xml:"broker"`Frequency  int    `xml:"frequency"`MaxMessage int    `xml:"max_message"`
}type Producer struct {producer sarama.AsyncProducertopic     stringmsgQ      chan *sarama.ProducerMessagewg        sync.WaitGroupcloseChan chan struct{}
}// NewProducer 构造KafkaProducer
func NewProducer(cfg *Config) (*Producer, error) {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.NoResponse                                  // Only wait for the leader to ackconfig.Producer.Compression = sarama.CompressionSnappy                            // Compress messagesconfig.Producer.Flush.Frequency = time.Duration(cfg.Frequency) * time.Millisecond // Flush batches every 500msconfig.Producer.Partitioner = sarama.NewRandomPartitionerp, err := sarama.NewAsyncProducer(strings.Split(cfg.Broker, ","), config)if err != nil {return nil, err}ret := &Producer{producer:  p,topic:     cfg.Topic,msgQ:      make(chan *sarama.ProducerMessage, cfg.MaxMessage),closeChan: make(chan struct{}),}return ret, nil
}// Run 运行
func (p *Producer) Run() {p.wg.Add(1)go func() {defer p.wg.Done()LOOP:for {select {case m := <-p.msgQ:p.producer.Input() <- mcase err := <-p.producer.Errors():if nil != err && nil != err.Msg {l4g.Error("[producer] err=[%s] topic=[%s] key=[%s] val=[%s]", err.Error(), err.Msg.Topic, err.Msg.Key, err.Msg.Value)}case <-p.closeChan:break LOOP}}}()for hasTask := true; hasTask; {select {case m := <-p.msgQ:p.producer.Input() <- mdefault:hasTask = false}}}// Close 关闭
func (p *Producer) Close() error {close(p.closeChan)l4g.Warn("[producer] is quiting")p.wg.Wait()l4g.Warn("[producer] quit over")return p.producer.Close()
}// Log 发送log
func (p *Producer) Log(key string, val string) {msg := &sarama.ProducerMessage{Topic: p.topic,Key:   sarama.StringEncoder(key),Value: sarama.StringEncoder(val),}select {case p.msgQ <- msg:returndefault:l4g.Error("[producer] err=[msgQ is full] key=[%s] val=[%s]", msg.Key, msg.Value)}
}

 

2. kafka消费者

  几点说明:

  1) kafka一定要选用支持集群的版本

  2) 里面带了创建topic,删除topic,打印topic的工具

  3) replication是外面配置的

  4) 开多个consumer需要在创建topic时设置多个partition。官方的示例当开多个consumer的时候会崩溃,我这个版本不会,我给官方提交了一个PR,还不知道有没有采用

 

// Package main Kafka消费者
package mainimport ("context""encoding/xml""flag""fmt""io/ioutil""log""os""os/signal""runtime""strings""syscall""time""github.com/Shopify/sarama""github.com/alecthomas/log4go"
)// Consumer Consumer配置
type ConsumerConfig struct {Topic       []string `xml:"topic"`Broker      string   `xml:"broker"`Partition   int32    `xml:"partition"`Replication int16    `xml:"replication"`Group       string   `xml:"group"`Version     string   `xml:"version"`
}var (configFile = "" // 配置路径initTopic  = falselistTopic  = falsedelTopic   = ""cfg        = &Config{}
)// Config 配置
type Config struct {Consumer ConsumerConfig `xml:"consumer"`
}func init() {flag.StringVar(&configFile, "config", "../config/consumer.xml", "config file ")flag.BoolVar(&initTopic, "init", initTopic, "create topic")flag.BoolVar(&listTopic, "list", listTopic, "list topic")flag.StringVar(&delTopic, "del", delTopic, "delete topic")}func main() {runtime.GOMAXPROCS(runtime.NumCPU())defer func() {time.Sleep(time.Second)log4go.Warn("[main] consumer quit over!")log4go.Global.Close()}()contents, _ := ioutil.ReadFile(configFile)xml.Unmarshal(contents, cfg)// sarama的loggersarama.Logger = log.New(os.Stdout, fmt.Sprintf("[%s]", "consumer"), log.LstdFlags)// 指定kafka版本,一定要支持kafka集群version, err := sarama.ParseKafkaVersion(cfg.Consumer.Version)if err != nil {panic(err)}config := sarama.NewConfig()config.Version = versionconfig.Consumer.Offsets.Initial = sarama.OffsetOldest// 工具if tool(cfg, config) {return}// kafka consumer clientctx, cancel := context.WithCancel(context.Background())client, err := sarama.NewConsumerGroup(strings.Split(cfg.Consumer.Broker, ","), cfg.Consumer.Group, config)if err != nil {panic(err)}consumer := Consumer{}go func() {for {err := client.Consume(ctx, cfg.Consumer.Topic, &consumer)if err != nil {log4go.Error("[main] client.Consume error=[%s]", err.Error())// 5秒后重试time.Sleep(time.Second * 5)}}}()// os signalsigterm := make(chan os.Signal, 1)signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)<-sigtermcancel()err = client.Close()if err != nil {panic(err)}log4go.Info("[main] consumer is quiting")
}func tool(cfg *Config, config *sarama.Config) bool {if initTopic || listTopic || len(delTopic) > 0 {ca, err := sarama.NewClusterAdmin(strings.Split(cfg.Consumer.Broker, ","), config)if nil != err {panic(err)}if len(delTopic) > 0 { // 删除Topicif err := ca.DeleteTopic(delTopic); nil != err {panic(err)}log4go.Info("delete ok topic=[%s]\n", delTopic)} else if initTopic { // 初始化Topicif detail, err := ca.ListTopics(); nil != err {panic(err)} else {for _, v := range cfg.Consumer.Topic {if d, ok := detail[v]; ok {if cfg.Consumer.Partition > d.NumPartitions {if err := ca.CreatePartitions(v, cfg.Consumer.Partition, nil, false); nil != err {panic(err)}log4go.Info("alter topic ok", v, cfg.Consumer.Partition)}} else {if err := ca.CreateTopic(v, &sarama.TopicDetail{NumPartitions: cfg.Consumer.Partition, ReplicationFactor: cfg.Consumer.Replication}, false); nil != err {panic(err)}log4go.Info("create topic ok", v)}}}}// 显示Topic列表if detail, err := ca.ListTopics(); nil != err {log4go.Info("ListTopics error", err)} else {for k := range detail {log4go.Info("[%s] %+v", k, detail[k])}}if err := ca.Close(); nil != err {panic(err)}return true}return false
}type Consumer struct {
}func (consumer *Consumer) Setup(s sarama.ConsumerGroupSession) error {return nil
}func (consumer *Consumer) Cleanup(s sarama.ConsumerGroupSession) error {return nil
}func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for message := range claim.Messages() {key := string(message.Key)val := string(message.Value)log4go.Info("%s-%s", key, val)session.MarkMessage(message, "")}return nil
}

 

转载于:https://www.cnblogs.com/mrblue/p/10770651.html

http://www.lbrq.cn/news/2761039.html

相关文章:

  • 国外个人网站模板站长工具查询域名
  • 做知乎网站的图片2023最火的十大新闻
  • 最新网站建设常见问题环球网
  • html5网站免费开发软件制作平台
  • 关于微网站策划ppt怎么做网络营销有哪些推广方式
  • 网站建设虚拟优秀营销软文范例800字
  • 做网站php和java区别长沙网站快速排名提升
  • web制作网页代码排名优化公司口碑哪家好
  • 合肥公司建站模板爱站网关键词工具
  • 网页在线生成网站互联网推广是做什么的
  • 做外贸收费的网站广东搜索引擎优化
  • 网站建设推广找stso88效果好直播代运营公司
  • 东莞房价最新消息保定seo排名外包
  • 国贸做网站公司网络广告的发布方式包括
  • 网页小游戏显示插件不支持怎么办公司排名seo
  • 模块化网站建设一般多少钱百度信息流投放技巧
  • 用什么网站做封面最好企业网站推广有哪些方式
  • 门户网站建设平台企业网站建设公司
  • 施工企业组织机构图优化推广联盟
  • 垂直网站怎么做长春网站优化指导
  • 官方网站下载水印相机南宁seo外包平台
  • 做淘宝链接模板网站市场运营和市场营销的区别
  • 怎么扫码进入公众号刷网站seo排名软件
  • 昆明旅行社网站开发广点通广告投放平台
  • cnnic可信网站必须做吗企业全网推广公司
  • 网站降权怎么处理如何让网站被百度收录
  • 杭州网站建设设计百度搜索引擎
  • 东莞市住房和城乡建设局网站自媒体135网站免费下载安装
  • 温州网站建设接单网络推广大概需要多少钱
  • 网络营销策划创意案例点评百度关键词seo外包
  • 音频算法工程师技能1
  • 云计算- KubeVirt 实操指南:VM 创建 、存储挂载、快照、VMI全流程 | 容器到虚拟机(镜像转换/资源调度)
  • 【Golang】:函数和包
  • 海洋牧场助力可持续发展,保护海洋生态平衡
  • 如何做HTTP优化
  • Houdini 粒子学习笔记