当前位置: 首页 > news >正文

婚庆网站模板/打开网站搜索

婚庆网站模板,打开网站搜索,哪个网站有卖做一次性口机器的,做现金贷的网站有哪些目录 0. 相关文章链接 1. 使用Java求日活的WindowFunction使用 2. 使用Scala演示WindowFunction的使用 0. 相关文章链接 Flink文章汇总 1. 使用Java求日活的WindowFunction使用 // 设置时间语议&#xff0c;并过滤其中的首页曝光数据 DataStream<AppLogBean> homeE…

目录

0. 相关文章链接

1. 使用Java求日活的WindowFunction使用

2. 使用Scala演示WindowFunction的使用


0. 相关文章链接

Flink文章汇总

1. 使用Java求日活的WindowFunction使用

// 设置时间语议,并过滤其中的首页曝光数据
DataStream<AppLogBean> homeExposureStream = appExposureStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<AppLogBean>(Time.seconds(0)) {@Overridepublic long extractTimestamp(AppLogBean element) {return element.getTime() * 1000;}}).filter(new FilterFunction<AppLogBean>() {@Overridepublic boolean filter(AppLogBean value) throws Exception {return "home_exposure".equals(value.getTopic()) && StringUtils.isNotBlank(value.getScdata());}});// 获取出其中的用户id
SingleOutputStreamOperator<Tuple2<String, String>> userIdStream = homeExposureStream.map(new MapFunction<AppLogBean, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(AppLogBean appLogBean) throws Exception {String resultUserId = "1";JSONObject scdataJson = JSONObject.parseObject(appLogBean.getScdata());String user_id = scdataJson.getString("user_id");resultUserId = user_id;return Tuple2.of("dummy", resultUserId);}});// 对用户id开窗,并统计每天的数据
SingleOutputStreamOperator<String> result = userIdStream.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))).trigger(ContinuousEventTimeTrigger.of(Time.seconds(1))).aggregate(new UniqueVisitorAggregateFunction(), new UniqueVisitorProcessWindowFunction());// 使用print打印数据
result.print("result>>>>>>>>>");}/**
* UV的窗口类
*/
public static class UniqueVisitorProcessWindowFunction extends ProcessWindowFunction<Long, String, String, TimeWindow> {private final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");@Override
public void process(String s, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {System.out.println("##### 当前的watermark为 #####" + df.format(context.currentWatermark()));System.out.println("##### 窗口开始时间 ###########" + df.format(context.window().getStart()));System.out.println("##### 窗口结束时间 ###########" + df.format(context.window().getEnd()));System.out.println("##### 该窗口当前统计的UV #####" + elements.iterator().next());out.collect("UV " + elements.iterator().next());
}
}/**
* UV的聚合类
*/
public static class UniqueVisitorAggregateFunction implements AggregateFunction<Tuple2<String, String>, Tuple2<Set<String>, Long>, Long> {@Override
public Tuple2<Set<String>, Long> createAccumulator() {return Tuple2.of(new HashSet<>(), 0L);
}@Override
public Tuple2<Set<String>, Long> add(Tuple2<String, String> value, Tuple2<Set<String>, Long> accumulator) {if (!accumulator.f0.contains(value.f1)) {accumulator.f0.add(value.f1);accumulator.f1 += 1;}return accumulator;
}@Override
public Long getResult(Tuple2<Set<String>, Long> accumulator) {return accumulator.f1;
}@Override
public Tuple2<Set<String>, Long> merge(Tuple2<Set<String>, Long> a, Tuple2<Set<String>, Long> b) {return null;
}

2. 使用Scala演示WindowFunction的使用

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)val sensorStream: WindowedStream[SensorReading, String, TimeWindow] = env.socketTextStream("localhost", 9999).map(new MyMapToSensorReading).keyBy(_.id).timeWindow(Time.seconds(5))// 1、incremental aggregation functions(增量聚合函数)(来一条数据,计算一次)
// 1.1、ReduceFunction 增量集合函数(使用匿名内部类)
val reduceResult: DataStream[SensorReading] = sensorStream.reduce(new ReduceFunction[SensorReading] {override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = {SensorReading(value2.id, value2.timestamp, value2.temperature + value2.temperature)}
})
// 1.2、AggregateFunction(相比reduce,优势是可以指定累加值类型,输入类型和输出类型也可以不一样)
val aggregateResult: DataStream[Long] = sensorStream.aggregate(new AggregateFunction[SensorReading, Long, Long] {// 初始化累加值override def createAccumulator(): Long = 0L// 累加方法override def add(value: SensorReading, accumulator: Long): Long = accumulator + 1// 获取结果override def getResult(accumulator: Long): Long = accumulator// 分区的归并操作override def merge(a: Long, b: Long): Long = a + b
})// 2、full window functions(全窗口函数)
/*** 知识点:*  1、apply方法中,可以添加WindowFunction对象,会将该窗口中所有的数据先缓存,当时间到了一次性计算*  2、需要设置4个类型,分别是:输入类型,输出类型,keyBy时key的类型(如果用字符串来划分key类型为Tuple,窗口类型*  3、所有的计算都在apply中进行,可以通过window获取窗口的信息,比如开始时间,结束时间*/
val applyResult: DataStream[(Long, Int)] = sensorStream.apply(new WindowFunction[SensorReading, (Long, Int), String, TimeWindow] {override def apply(key: String, window: TimeWindow, input: Iterable[SensorReading], out: Collector[(Long, Int)]): Unit = {out.collect((window.getStart, input.size))}
})// 3、窗口函数中其他API
val otherResult: DataStream[SensorReading] = sensorStream.allowedLateness(Time.seconds(1))                       // 允许处理迟到的数据.sideOutputLateData(new OutputTag[SensorReading]("late"))    // 将迟到的数据放入侧输出流.reduce((x, y) => SensorReading(y.id, y.timestamp, x.temperature + y.temperature))
// 获取侧输出流(侧输出流为迟到很久的数据,当allowedLateness和watermark之后还是没到的数据会放入侧输出流,可以在最后统一处理)
val sideOutputStream: DataStream[SensorReading] = otherResult.getSideOutput(new OutputTag[SensorReading]("late"))// 打印输出
applyResult.print()env.execute("WindowFunctionDemo")

注:其他相关文章链接由此进 -> Flink文章汇总


http://www.lbrq.cn/news/1297657.html

相关文章:

  • 求可以做问卷测试的网站/黄页88网站推广方案
  • 网站建设需要平台/百度西安
  • 常用网站图标/如何优化关键词
  • 江苏建科建设监理有限公司网站/网络营销的未来发展趋势论文
  • 三亚网站制/优化大师电脑版官网
  • 天津医疗行业网站建设/手机百度推广怎么打广告
  • jsp做的知名网站/百度推广登录平台客服
  • 张掖响应式建站平台/西安百度关键词包年
  • ecommercial+wordpress/杭州关键词推广优化方案
  • 南昌做网站电话/搜索引擎优化举例说明
  • 网站快速收录技术/恶意点击竞价是用的什么软件
  • 石家庄做网站建设的公司排名/重庆电子商务网站seo
  • 重庆seo网站策划/营销咨询师
  • 珲春网站建设/第三方网络营销平台有哪些
  • 网站开发协议/线下推广宣传方式有哪些
  • 做的最好的微电影网站有哪些/百度竞价点击神器奔奔
  • 公司招商型网站建设/平台seo什么意思
  • python做网站需要什么/公司推广咨询
  • 刚做淘客没有网站/网站推广的方法有哪些?
  • 绍兴h5建站/百度网页版链接地址
  • 钟祥网站建设/seo免费诊断电话
  • 做一般的公司门户网站投资额/东莞网站优化
  • 网上有做logo的网站吗/百度灰色关键词排名代做
  • 哪些行业做网站推广的多/谷歌账号注册入口官网
  • 犀牛云做网站怎么这么贵/app下载免费安装
  • 1小时赚5000元游戏/aso优化费用
  • 泰州网站建设服务好/开创集团与百度
  • 能下短视频网站做牙/百度竞价是seo还是sem
  • 做旅游网站需要什么/搭建一个网站的流程
  • 泸州网站建设/网络推广网站推广方法
  • 二、Spark 开发环境搭建 IDEA + Maven 及 WordCount 案例实战
  • Node.js 中基于请求 ID 实现简单队列(即时阻止策略/排队等待策略)
  • Cosmos:构建下一代互联网的“区块链互联网
  • 滤波电路Multisim电路仿真实验汇总——硬件工程师笔记
  • Python Pandas读取Excel表格中数据并根据时间字段筛选数据
  • 多表查询-8-练习总结