国外有没有专门做靶材的网站网络平台销售
一 ,用时间分块 :
1 ,无状态转换 :
wc 每次只能统计当时的批次数据,不能把两次的统计结果累加。
2 ,有状态转换 : 累加,数据不丢失
可以将多次的数据,做累加
3 ,真正的 wc :
package day06_sparkStreamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object Demo01_wc {def main(args: Array[String]): Unit = {// scval ssWc: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ssWc")// ssc,5 秒val ssc: StreamingContext = new StreamingContext(ssWc,Seconds(5))// 设置缓存 : 累加数据必须设置的ssc.checkpoint("./data/cache/checkpoint01")// 监控端口 : 得到一行一行的数据 ( 跟 textFile 没有本质的区别 )// val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)val lineDStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomerReceiver("node01",9999))// 切割val wordsDStream: DStream[String] = lineDStream.flatMap(_.split(" "))// 映射元组val yuanDStream: DStream[(String, Int)] = wordsDStream.map((_,1))// 累加方法 :// Seq : List 接口// Scala 的 Seq 将是 Java 的 List,Scala 的 List 将是 Java 的 LinkedList// Seq是一个trait,它相当于Java的接口,但相当于即将到来的防御者方法。 Scala的List是一个抽象类,由Nil和::扩展,这是List的具体实现。// Option :有值或者无值// Scala Option(选项)类型用来表示一个值是可选的(有值或无值)。// Some : 有值// 方法 : 传进来的集合,之前的值val updateFunc = (v:Seq[Int],state:Option[Int]) => {val preStates: Int = state.getOrElse(0)// 返回的对象 : 有值,至少是 0 ,但是初始解读那肯定是没有值的Some( preStates + v.sum )}// 累加计算val value: DStream[(String, Int)] = yuanDStream.updateStateByKey(updateFunc)value.print()ssc.start()ssc.awaitTermination()}
}
4 ,测试 :
- 开启 nc
nc -lk 9999
- 启动 idea 程序 :
- 在 nc 输入值
- 正确的结果 : 看到累加结果
二 ,窗口与步长 :
1 ,架构图 :
- 批次 : 每次统计数据的间隔时间
- 窗口 : 多个批次
- 窗口时长 : 批次间隔时间 × n
- 滑动步长 : 默认值 = 批次间隔
- 滑动步长 : 可以设置 = 批次间隔时间 × n
2 ,计算思想 :
- 一个步长 : 触发一次计算
- 一个窗口 : 触发一次数据合并
3 ,滑动步长 ,例子
- 窗口长度 : 4
- 滑动步长 : 2
4 ,窗口目的 :
统计一段时间的数据
5 ,窗口计算 : 两种方式
- 全量累加 : 正常情况使用。
- 减掉重复部分 : 有的批次,会被计算两次,减掉重复的批次。
6 ,窗口计算 1 : 全窗口累加 ( 每 10 秒统计一次前 15 秒的结果 )
- 批次 : 5 秒 ( 这就是 1 步 )
- 步长 : 10 秒 ( 每次滑动 2 步 )
- 窗口长度 : 15 秒 ( 每次统计 3 步的数据 ) : 每个窗口计算返回一次结果
- 伟大的实验 : 每 5 秒,输入 1 个 aa
- 预计结果 :
1 ,2 ( 前 10 秒,只有 2 个 aa )
2 ,4 ( 因为有一个数据被算了两次 ) ,图中的 time3 被算了两次 - 图示 :
7 ,窗口计算 1 : 伟大的实验 ( 每 10 秒统计一次前 15 秒的结果 )
- node01 开端口 : nc -lk 9999
- 启动 idea 程序
- 看着那个时钟,每 5 秒发一次 aa
- 代码 :
package day06_sparkStreamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object Demo02_Wc_Ck {def main(args: Array[String]): Unit = {// scval ssWc: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ssWc")// ssc,5 秒val ssc: StreamingContext = new StreamingContext(ssWc,Seconds(5))// 设置缓存 : 累加数据必须设置的ssc.checkpoint("./data/cache/checkpoint01")// 监控端口 : 得到一行一行的数据 ( 跟 textFile 没有本质的区别 )// val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)val lineDStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomerReceiver("node01",9999))// 切割val wordsDStream: DStream[String] = lineDStream.flatMap(_.split(" "))// 映射元组val yuanDStream: DStream[(String, Int)] = wordsDStream.map((_,1))// 窗口计算 : 全量累加// 运算,窗口长度,步长val res: DStream[(String, Int)] = yuanDStream.reduceByKeyAndWindow((a1:Int, a2:Int)=>a1+a2,Seconds(15),Seconds(10))res.print()ssc.start()ssc.awaitTermination()}
}
- 结果 :
全是 3
8 ,窗口计算 2 : 伟大的实验 ( 每 10 秒统计一次前 15 秒的结果 )
- 代码 :
val res: DStream[(String, Int)] = yuanDStream.reduceByKeyAndWindow((a1:Int, a2:Int)=>a1+a2,(i1:Int,i2:Int)=>i1-i2,Seconds(15),Seconds(10))
- 结果 :
完全一致。
9 ,reduceByKeyAndWindow 算子解析 ( 这里是唯一的正确解释 )
- 为什么我的结论是对的 :
1 ,通过实验证实
2 ,通过查看源码证实 - 得出结论 : 每 4 秒,计算前 8 秒的数据
yuanDStream.reduceByKeyAndWindow((a1:Int, a2:Int)=>a1+a2,Seconds(8),Seconds(4))
yuanDStream.reduceByKeyAndWindow((a1:Int, a2:Int)=>a1+a2,(i1:Int,i2:Int)=>i1-i2,Seconds(8),Seconds(4))
- 得出结论 : 结果没有区别
1 ,前一个 : 浪费资源
2 ,后一个 : 节省资源 - 怎么做到的节省资源 :
1 ,这个函数是上述函数的更高效版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。
2 ,每个批次的数据已经统计好,我们用哪个批次,不用哪个批次,都是可以操作的。