当前位置: 首页 > news >正文

xuzhou公司网站制作/域名排名查询

xuzhou公司网站制作,域名排名查询,政府农业网站模板,新品发布会文案文章目录1 Kafka安装1.1 下载安装1.2 配置启动zookeeper1.3 配置kafka1.3.1 修改配置文件1.3.2 配置环境变量1.3.3 配置服务启动脚本1.3.4 启动kafka服务1.4 kafka使用简单入门1.4.1 创建主题topics1.4.2 发送一些消息1.4.3 启动消费者1.5 设置多代理kafka群集1.5.1 准备配置文…

文章目录

  • 1 Kafka安装
    • 1.1 下载安装
    • 1.2 配置启动zookeeper
    • 1.3 配置kafka
      • 1.3.1 修改配置文件
      • 1.3.2 配置环境变量
      • 1.3.3 配置服务启动脚本
      • 1.3.4 启动kafka服务
    • 1.4 kafka使用简单入门
      • 1.4.1 创建主题topics
      • 1.4.2 发送一些消息
      • 1.4.3 启动消费者
    • 1.5 设置多代理kafka群集
      • 1.5.1 准备配置文件
      • 1.5.2 开启集群另2个kafka服务
      • 1.5.3 在集群中进行操作
      • 1.5.4 测试集群的容错性
    • 1.6 使用Kafka Connect导入/导出数据

1 Kafka安装

1.1 下载安装

到官网http://kafka.apache.org/downloads.html下载想要的版本

注:由于Kafka控制台脚本对于基于UnixWindows的平台是不同的,因此在Windows平台上使用bin\windows\ 而不是bin/ 将脚本扩展名更改为.bat。

[root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
[root@along ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz
[root@along ~]# cd /data/kafka_2.11-2.1.0/

1.2 配置启动zookeeper

kafka正常运行,必须配置zookeeper,否则无论是kafka集群还是客户端的生存者和消费者都无法正常的工作的;所以需要配置启动zookeeper服务。
点击了解zookeeper安装步骤

1.3 配置kafka

1.3.1 修改配置文件

[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/server.properties
broker.id=0  
listeners=PLAINTEXT://localhost:9092  
num.network.threads=3  
num.io.threads=8  
socket.send.buffer.bytes=102400  
socket.receive.buffer.bytes=102400  
socket.request.max.bytes=104857600  
log.dirs=/tmp/kafka-logs
num.partitions=1  
num.recovery.threads.per.data.dir=1  
offsets.topic.replication.factor=1  
transaction.state.log.replication.factor=1  
transaction.state.log.min.isr=1  
log.retention.hours=168  
log.segment.bytes=1073741824  
log.retention.check.interval.ms=300000  
zookeeper.connect=localhost:2181  
zookeeper.connection.timeout.ms=6000  
group.initial.rebalance.delay.ms=0

注:可根据自己需求修改配置文件

broker.id:#唯一标识ID
listeners=PLAINTEXT://localhost:9092:#kafka服务监听地址和端口
log.dirs:#日志存储目录
zookeeper.connect:#指定zookeeper服务

1.3.2 配置环境变量

[root@along ~]# vim /etc/profile.d/kafka.sh  
export KAFKA_HOME="/data/kafka_2.11-2.1.0"  
export PATH="${KAFKA_HOME}/bin:$PATH"  
[root@along ~]# source /etc/profile.d/kafka.sh

1.3.3 配置服务启动脚本

[root@along ~]# vim /etc/init.d/kafka
#!/bin/sh
#
# chkconfig: 345 99 01
# description: Kafka
#
# File : Kafka
#
# Description: Starts and stops the Kafka server
#source /etc/rc.d/init.d/functions  KAFKA_HOME=/data/kafka_2.11-2.1.0
KAFKA_USER=root
export LOG_DIR=/tmp/kafka-logs[ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka# See how we were called.
case "$1" in  start)echo -n "Starting Kafka:"  /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"  echo " done."  exit 0;;stop)echo -n "Stopping Kafka: "  /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill \-9"  echo " done."  exit 0;;hardstop)echo -n "Stopping (hard) Kafka: "  /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9"  echo " done."  exit 0;;status)c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`if [ "$c_pid" = "" ] ; then  echo "Stopped"  exit 3else  echo "Running $c_pid"  exit 0fi  ;;restart)stopstart;;*)echo "Usage: kafka {start|stop|hardstop|status|restart}"  exit 1;;esac

1.3.4 启动kafka服务

后台启动zookeeper服务

[root@along ~]# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &

启动kafka服务

[root@along ~]# service kafka start  
Starting kafka (via systemctl): [ OK ]  
[root@along ~]# service kafka status  
Running 86018  
[root@along ~]# ss -nutl  
Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                                
tcp   LISTEN     0      50                    :::9092                              :::*
tcp   LISTEN     0      50                    :::2181                              :::*

1.4 kafka使用简单入门

1.4.1 创建主题topics

创建一个名为along的主题,它只包含一个分区,只有一个副本:

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along
Created topic "along".

如果我们运行list topic命令,我们现在可以看到该主题:

[root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181  
along

1.4.2 发送一些消息

Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息以发送到服务器。

[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along
>This is a message
>This is another message

1.4.3 启动消费者

Kafka还有一个命令行使用者,它会将消息转储到标准输出。

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --from-beginning
This is a message
This is another message

所有命令行工具都有其他选项; 运行不带参数的命令将显示更详细地记录它们的使用信息。

1.5 设置多代理kafka群集

到目前为止,我们一直在与一个broker运行,但这并不好玩。对于Kafka,单个代理只是一个大小为1的集群,因此除了启动一些代理实例之外没有太多变化。但是为了感受它,让我们将我们的集群扩展到三个节点(仍然在我们的本地机器上)。

1.5.1 准备配置文件

[root@along kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties
[root@along kafka_2.11-2.1.0]# vim config/server-1.propertiesbroker.id=1  listeners=PLAINTEXT://:9093log.dirs=/tmp/kafka-logs-1  
[root@along kafka_2.11-2.1.0]# vim config/server-2.propertiesbroker.id=2  listeners=PLAINTEXT://:9094log.dirs=/tmp/kafka-logs-2

注:该broker.id 属性是群集中每个节点的唯一且永久的名称。我们必须覆盖端口和日志目录,因为我们在同一台机器上运行这些,并且我们希望让所有代理尝试在同一端口上注册或覆盖彼此的数据。

1.5.2 开启集群另2个kafka服务

[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties &  
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties &  
[root@along ~]# ss -nutl  
Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                            
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9094                              :::*

1.5.3 在集群中进行操作

现在创建一个复制因子为3的新主题my-replicated-topic

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

在一个集群中,运行describe topics命令查看哪个broker正在做什么

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

#注释:第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由于我们只有一个分区用于此主题,因此只有一行。
#“leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
#“replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
#“isr”是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。
#请注意,Leader: 2,在我的示例中,节点2 是该主题的唯一分区的Leader。

可以在我们创建的原始主题上运行相同的命令,以查看它的位置

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along  
Topic:along PartitionCount:1    ReplicationFactor:1 Configs:Topic: along    Partition: 0 Leader: 0 Replicas: 0 Isr: 0

向我们的新主题发布一些消息:

[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test message 1
>my test message 2

现在让我们使用这些消息:

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

1.5.4 测试集群的容错性

现在让我们测试一下容错性。Broker 2 充当leader 所以让我们杀了它:

[root@along ~]# ps aux | grep server-2.properties |awk '{print $2}'
106737  
[root@along ~]# kill -9 106737
[root@along ~]# ss -nutl
tcp LISTEN 0      50      ::ffff:127.0.0.1:9092                              :::*                         
tcp LISTEN 0      50      ::ffff:127.0.0.1:9093                              :::*

leader 已切换到其中一个从属节点,节点2不再位于同步副本集中:

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic  
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:Topic: my-replicated-topic  Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1

即使最初接受写入的leader 已经失败,这些消息仍可供消费:

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

1.6 使用Kafka Connect导入/导出数据

从控制台写入数据并将其写回控制台是一个方便的起点,但有时候可能希望使用其他来源的数据或将数据从Kafka导出到其他系统。对于许多系统,您可以使用Kafka Connect导入或导出数据,而不是编写自定义集成代码。

Kafka ConnectKafka附带的工具,用于向Kafka导入和导出数据。它是一个可扩展的工具,运行连接器,实现与外部系统交互的自定义逻辑。我们将了解如何使用简单的连接器运行Kafka Connect,这些连接器将数据从文件导入Kafka主题并将数据从Kafka主题导出到文件。

首先创建一些种子数据进行测试:

[root@along ~]# echo -e "foo\nbar" > test.txt
或者在Windows上:
> echo foo> test.txt
> echo bar>> test.txt

接下来,启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。提供三个配置文件作为参数。

第一个始终是Kafka Connect流程的配置,包含常见配置,例如要连接的Kafka代理和数据的序列化格式。

其余配置文件均指定要创建的连接器。这些文件包括唯一的连接器名称,要实例化的连接器类以及连接器所需的任何其他配置。

[root@along ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2019-01-16 16:16:31,884] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
[2019-01-16 16:16:31,903] INFO WorkerInfo values:  
... ...

#注:Kafka附带的这些示例配置文件使用您之前启动的默认本地群集配置并创建两个连接器:第一个是源连接器,它从输入文件读取行并生成每个Kafka主题,第二个是宿连接器从Kafka主题读取消息并将每个消息生成为输出文件中的一行。

验是否导入成功(另起终端)
在启动过程中,您将看到许多日志消息,包括一些指示正在实例化连接器的日志消息。

  • 一旦Kafka Connect进程启动,源连接器应该开始从test.txt主题读取行并将其生成到主题connect-test,并且接收器连接器应该开始从主题读取消息connect-test 并将它们写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传递:
[root@along ~]# cat test.sink.txt  
foo  
bar

请注意,数据存储在Kafka主题中connect-test,因此我们还可以运行控制台使用者来查看主题中的数据(或使用自定义使用者代码来处理它):

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

继续追加数据,验证

[root@along ~]# echo Another line>> test.txt
[root@along ~]# cat test.sink.txt
foo
bar
Another line
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"
http://www.lbrq.cn/news/1468153.html

相关文章:

  • 湘潭找工作网站/竞价广告
  • 淘宝做海淘产品 网站折扣变化快/上海百度搜索排名优化
  • 手机网站制作费用多少/seo排名优化点击软件有哪些
  • 做电影网站要怎么拿到版权/人民日报新闻
  • 乌鲁木齐百度seo/seo快速优化软件
  • 花瓣网是仿国外那个网站做的/宁德市
  • 晋城手机网站建设/站长工具ip查询
  • 做实验用哪些国外网站/外贸推广方式
  • 网站建设主流技术/玄幻小说排行榜百度风云榜
  • 网站建设和维护怎么学/我赢网seo优化网站
  • 今日兰州疫情最新消息/谷歌seo博客
  • 男人和女人做性网站/爱站网seo工具
  • dede网站迁移步骤/平台推广员是做什么的
  • 武汉网站建设优化/网站推广方案有哪些
  • 企业3合1网站建设/网站推广seo方法
  • 怎么做网站把图片发到网上/网站建设服务公司
  • 深圳做网站价格/网站你应该明白我的意思吗
  • 做网站策划案/百度快速排名优化技术
  • 阿里云备案网站建设方案书案例/营销型公司网站建设
  • 北京网站设计公司新鸿儒/近期国内外重大新闻10条
  • 国内net开发的网站建设/网络营销的产品策略
  • 织梦可以做大型网站吗/百度seo优化是什么
  • 企业网查询四六级/深圳seo优化电话
  • 开源房产网站源码/设计公司网站设计
  • 运营网站团队建设/app营销十大成功案例
  • 企业网站建设注意/营销策略ppt
  • 梅林网站建设/保定网站seo
  • 自己做网站想更换网址/整合营销沟通
  • 阿里企业邮箱客服人工电话/使用 ahrefs 进行 seo 分析
  • 现在哪些做进口商品的电商网站/如何做百度免费推广
  • C++ 指针常量 常量指针
  • Allegro降版本工具
  • AG32mcu通过寄存器方式操作cpld
  • Node.js 路由与中间件
  • TwinCAT3示例项目1
  • 从姑苏区人工智能大模型基础设施招标|学习服务器、AI处理器、GPU