当前位置: 首页 > news >正文

厦门做网站公司排名/免费技能培训网

厦门做网站公司排名,免费技能培训网,网站建设哪个公司,微信公众号优惠劵网站怎么做的场景 希望对应用产生的某类型的日志数据,进行实时分析。日志数据以文件形式保存在服务器磁盘中,每一行为一个事件:{"time": 1469501675,"action": "Open"}, JSON形式。 方案 使用Filebeat转发数据到…

场景

希望对应用产生的某类型的日志数据,进行实时分析。日志数据以文件形式保存在服务器磁盘中,每一行为一个事件:{"time": 1469501675,"action": "Open"}JSON形式。

方案

使用Filebeat转发数据到Kafka,将Kafka作为输入数据流,由Spark Streaming进行计算。 Filebeat是轻量级的代理,非常简单易用,支持多种安装方式。 Kafka是一个消息队列,也是一个流处理平台。这里作为一个消息队列(负责分发和储存)。 Spark Streaming负责处理流数据

实践

资源准备

  • Filebeat 6.3.2
  • Kafka 0.10
  • Spark 2.4
  • Scala 2.11
  • CentOS 7.4 64位

安装&配置Kafka

启动Zookeeper Server:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

启动Kafka Server:nohup bin/kafka-server-start.sh config/server.properties &

创建Topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log-topic

安装&配置Filebeat

版本6.3.2 下载地址 直接多种安装方式,这里使用命令行启动。 修改配置文件:(/root/filebeat_kafka_spark/filebeat-6.3.2-linux-x86_64/filebeat.yml)

filebeat.inputs:
- type: logenabled: truepaths:- /root/logs/*.log#============================= Filebeat modules ===============================filebeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: falsename: "log-shipper"
fields:log_topic: "log-topic"#================================ Outputs =====================================
output.kafka:hosts: ["localhost:9092"]topic: '%{[fields.log_topic]}'partition.round_robin:reachable_only: falserequired_acks: 1compression: gzipmax_message_bytes: 1000000
复制代码

启动Filebeat,开始日志收集、转发。 ./filebeat -e -c filebeat.yml -d "publish"

日志内容

从Filebeat中转发到Kafka日志内容会增加一些信息:

{"@timestamp": "2018-12-14T03:16:14.600Z","@metadata": {"beat": "filebeat","type": "doc","version": "6.3.2"},"prospector": {"type": "log"},"input": {"type": "log"},"fields": {"log_topic": "log-topic"},"beat": {"name": "log-shipper","hostname": "ecs-f3a2.novalocal","version": "6.3.2"},"host": {"name": "log-shipper"},"source": "/root/logs/access.log","offset": 261,"message": "{\"time\":1469501675,\"action\":\"Open\"}"
}
复制代码

每一行日志内容为message 字段。

Spark 代码

import java.sql.Timestampimport com.google.gson.{JsonObject, JsonParser}
import org.apache.spark.sql._case class Event(action: String, time: java.sql.Timestamp)object App {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("log-streaming-app").getOrCreate()import spark.implicits._// 从 Kafka 读取流val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "114.115.241.195:9092").option("subscribe", "nginx-log-topic").load()df.printSchema()/*root|-- key: binary (nullable = true)|-- value: binary (nullable = true)   // 从Kafka读取的数据为这个字段|-- topic: string (nullable = true)|-- partition: integer (nullable = true)|-- offset: long (nullable = true)|-- timestamp: timestamp (nullable = true)|-- timestampType: integer (nullable = true)*/// 接下来对 df 进行操作val value = df.selectExpr("CAST(value AS STRING)").as[String]//  把value中的message解析出来,message为原始数据val data = value.map((v: String) => {val json = new JsonParser()val obj = json.parse(v).asInstanceOf[JsonObject]val message = obj.get("message").getAsStringval jsonData = json.parse(message).getAsJsonObjectvar action = jsonData.get("action").getAsStringval time = jsonData.get("time").getAsLongEvent(action, new Timestamp(time * 1000))})// 按 "action" 分组 累计统计val result = data.groupBy($"action").count()// 结果 输出result.writeStream.foreach(new ForeachWriter[Row] {override def open(partitionId: Long, epochId: Long): Boolean = {true}override def process(value: Row): Unit = {println(value.getString(0), value.getLong(1))}override def close(errorOrNull: Throwable): Unit = {}}).outputMode("complete").start().awaitTermination()}}
复制代码

调试运行,结果正确后,打包,在Spark集群中运行。

总结

入门学习 Spark Structured Streaming,里面要掌握的知识还是很多的,包括对 Scala、Kafka、Spark DataFrame/Dataset 操作等。

参考资料

  • Real-time Streaming ETL with Structured Streaming in Apache Spark 2.1
  • Structured Streaming Programming Guide
http://www.lbrq.cn/news/1583587.html

相关文章:

  • 专门做特卖的网站是什么意思/推广费用一般多少钱
  • 岗顶网站建设/电商运营培训班
  • 怎么给网站做懒加载/账号seo是什么
  • 百事企业的网站建设类型/西安竞价托管公司
  • 网站建设的几个要素/济南seo优化公司助力网站腾飞
  • 企业网站建设制作公司/嘉兴新站seo外包
  • 哪些网站可以接点私活做的/seo推广一年要多少钱
  • 做景观园林的网站是/批量查询权重
  • 网站提交了被收录后改怎么做/免费建站
  • linux系统怎么做网站/aso优化师主要是干嘛的
  • 做博客网站要怎么配置的服/广告宣传费用一般多少
  • 珠海科技网站建设/公司营销策划方案案例
  • 怎么做网站的搜索栏/专业做app软件开发公司
  • 如何自建企业网站/成都seo工程师
  • 哪些网站可以做代理/个人怎么在百度上做推广
  • 网站比较分析/最佳搜索引擎磁力王
  • 网站开发制作公/景德镇seo
  • 建设公司网站的意义/湖北网站seo设计
  • 做私活的网站/免费外链网站
  • 南沙哪有做网站的/品牌营销策划包括哪些内容
  • 今日国内新闻头条15条简短/时空seo助手
  • mvc做的网站怎么连接数据库/我为什么不建议年轻人做销售
  • 下载的网站模板怎么使用/互联网营销专业
  • 网站建设管理系统/廊坊推广seo霸屏
  • 网站建设的学校/网站推广在线推广
  • 外贸仿牌网站建设/优化疫情防控 这些措施你应该知道
  • 常见的静态网站开发技术/页优化软件
  • 网站建设的工资/综合搜索引擎
  • 独立ip访问网站/河南seo外包
  • dw做asp购物网站/b2b平台是什么意思
  • 扣证件照要点
  • 计算机网络:CIDR地址块划分子网可以使用VLSM吗?
  • 2025最新版天猫图片搜索API全解析:从图像识别到商品匹配实战
  • MCU中的晶振(Crystal Oscillator)
  • (附源码)基于Web的物流信息管理系统
  • 【算法训练营Day21】回溯算法part3