场景
希望对应用产生的某类型的日志数据,进行实时分析。日志数据以文件形式保存在服务器磁盘中,每一行为一个事件:{"time": 1469501675,"action": "Open"}
, JSON
形式。
方案
使用Filebeat转发数据到Kafka,将Kafka作为输入数据流,由Spark Streaming进行计算。 Filebeat是轻量级的代理,非常简单易用,支持多种安装方式。 Kafka是一个消息队列,也是一个流处理平台。这里作为一个消息队列(负责分发和储存)。 Spark Streaming负责处理流数据
实践
资源准备
- Filebeat 6.3.2
- Kafka 0.10
- Spark 2.4
- Scala 2.11
- CentOS 7.4 64位
安装&配置Kafka
启动Zookeeper Server:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
启动Kafka Server:nohup bin/kafka-server-start.sh config/server.properties &
创建Topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log-topic
安装&配置Filebeat
版本6.3.2 下载地址 直接多种安装方式,这里使用命令行启动。 修改配置文件:(/root/filebeat_kafka_spark/filebeat-6.3.2-linux-x86_64/filebeat.yml
)
filebeat.inputs:
- type: logenabled: truepaths:- /root/logs/*.log#============================= Filebeat modules ===============================filebeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: falsename: "log-shipper"
fields:log_topic: "log-topic"#================================ Outputs =====================================
output.kafka:hosts: ["localhost:9092"]topic: '%{[fields.log_topic]}'partition.round_robin:reachable_only: falserequired_acks: 1compression: gzipmax_message_bytes: 1000000
复制代码
启动Filebeat,开始日志收集、转发。 ./filebeat -e -c filebeat.yml -d "publish"
日志内容
从Filebeat中转发到Kafka日志内容会增加一些信息:
{"@timestamp": "2018-12-14T03:16:14.600Z","@metadata": {"beat": "filebeat","type": "doc","version": "6.3.2"},"prospector": {"type": "log"},"input": {"type": "log"},"fields": {"log_topic": "log-topic"},"beat": {"name": "log-shipper","hostname": "ecs-f3a2.novalocal","version": "6.3.2"},"host": {"name": "log-shipper"},"source": "/root/logs/access.log","offset": 261,"message": "{\"time\":1469501675,\"action\":\"Open\"}"
}
复制代码
每一行日志内容为message
字段。
Spark 代码
import java.sql.Timestampimport com.google.gson.{JsonObject, JsonParser}
import org.apache.spark.sql._case class Event(action: String, time: java.sql.Timestamp)object App {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[2]").appName("log-streaming-app").getOrCreate()import spark.implicits._// 从 Kafka 读取流val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "114.115.241.195:9092").option("subscribe", "nginx-log-topic").load()df.printSchema()/*root|-- key: binary (nullable = true)|-- value: binary (nullable = true) // 从Kafka读取的数据为这个字段|-- topic: string (nullable = true)|-- partition: integer (nullable = true)|-- offset: long (nullable = true)|-- timestamp: timestamp (nullable = true)|-- timestampType: integer (nullable = true)*/// 接下来对 df 进行操作val value = df.selectExpr("CAST(value AS STRING)").as[String]// 把value中的message解析出来,message为原始数据val data = value.map((v: String) => {val json = new JsonParser()val obj = json.parse(v).asInstanceOf[JsonObject]val message = obj.get("message").getAsStringval jsonData = json.parse(message).getAsJsonObjectvar action = jsonData.get("action").getAsStringval time = jsonData.get("time").getAsLongEvent(action, new Timestamp(time * 1000))})// 按 "action" 分组 累计统计val result = data.groupBy($"action").count()// 结果 输出result.writeStream.foreach(new ForeachWriter[Row] {override def open(partitionId: Long, epochId: Long): Boolean = {true}override def process(value: Row): Unit = {println(value.getString(0), value.getLong(1))}override def close(errorOrNull: Throwable): Unit = {}}).outputMode("complete").start().awaitTermination()}}
复制代码
调试运行,结果正确后,打包,在Spark集群中运行。
总结
入门学习 Spark Structured Streaming,里面要掌握的知识还是很多的,包括对 Scala、Kafka、Spark DataFrame/Dataset 操作等。
参考资料
- Real-time Streaming ETL with Structured Streaming in Apache Spark 2.1
- Structured Streaming Programming Guide