当前位置: 首页 > news >正文

杭州网站建设品牌推广一款app的营销方案

杭州网站建设品牌,推广一款app的营销方案,比较好用的网站,wordpress插件内链Table API 与 SQL Flink 对批处理和流处理,提供了统一的上层 API Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询 Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite Table API 是流处理和批处理通用的关…

Table API 与 SQL

Flink 对批处理和流处理,提供了统一的上层 API

Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询

Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite

Table API 是流处理和批处理通用的关系型 API, Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。 Table API 是 SQL 语言的超集并专门为 ApacheFlink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测。

TableApI与SQL目前还处于未完善的开发状态。

需要引入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.10.1</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.10.1</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>1.10.1</version>
</dependency>

基本了解

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从文件中读取数据String path = "/home/lxj/workspace/Flink/src/main/resources/sensor.txt";DataStream<String> inputDataStream = env.readTextFile(path);// 转换成POJODataStream<SensorReading> dataStream = inputDataStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});// 创建表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 基于数据流创建一张表Table dataTable = tableEnv.fromDataStream(dataStream);// 调用tableAPI进行转换操作Table resultTable = dataTable.select("id, temperature").where("id = 'sensor_1'");// 执行SQL,需要提前注册一个视图tableEnv.createTemporaryView("sensor", dataTable);String sql = "select id, temperature from sensor where id = 'sensor_1'";Table resultSqlTable = tableEnv.sqlQuery(sql);// 表中的数据都可以看成是一个Row,也可直接传入对应的Bean对象(需要有对应的构造方法)tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();
}sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547710000,36.8
sensor_1,1547719999,34.8
sensor_1,1547715555,37.8result> sensor_1,35.8
sql> sensor_1,35.8
result> sensor_1,36.8
sql> sensor_1,36.8
result> sensor_1,34.8
sql> sensor_1,34.8
result> sensor_1,37.8
sql> sensor_1,37.8

基本程序的结构如下:

Table API 和 SQL 的程序结构,与流式处理的程序结构十分类似

// 创建表的执行环境
StreamTableEnvironment tableEnv = ...// 创建一张表,连接source,用于读取数据 Source
tableEnv.connect(...).createTemporaryTable("inputTable");// 注册一张表,连接Sink,用于把计算结果输出 Sink
tableEnv.connect(...).createTemporaryTable("outputTable");// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);// 通过 SQL 查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");// 将结果表写入输出表中,输出到Sink中
result.insertInto("outputTable");

各种环境的创建

// 创建表环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// 1.10默认就是planner的流处理;1.10之后默认是blink的流处理
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 基于老版本planner的流处理, 1.10默认就是planner,默认是批流统一
EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);// 基于老版本planner的批处理
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);// 基于Blink的流处理
EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);// 基于Blink的批处理
EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkStreamSettings);

TableEnvironment 是 flink 中集成 Table API 和 SQL 的核心概念,所有对表的操作都基于 TableEnvironment

– 注册 Catalog

– 在 Catalog 中注册表

– 执行 SQL 查询

– 注册用户自定义函数(UDF)

表(Table)

TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表

表(Table)是由一个“标识符”(identifier)来指定的,由3部分组成:Catalog名、数据库(database)名和对象名,默认是defaultCatelog和defaultDatabase

表可以是常规的,也可以是虚拟的(视图,View)

常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来

视图(View)可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果集

要使用SQL查询必须有对应的视图,要使用TableAPI必须要是Table。

创建表

TableEnvironment 可以调用 .connect() 方法,连接外部系统,并调用 .createTemporaryTable() 方法,在 Catalog 中注册表

tableEnv.connect(...)     // 定义表的数据来源,和外部系统建立连接.withFormat(...)  // 定义数据格式化方法.withSchema(...)  // 定义表结构.createTemporaryTable("MyTable");  // 创建临时表

可以创建 Table 来描述文件数据,它可以从文件中读取,或者将数据写入文件

// 表的创建:连接外部系统,读取数据
// 读取文件
String path = "/home/lxj/workspace/Flink/src/main/resources/sensor.txt";
tableEnv.connect(new FileSystem().path(path)).withFormat(new Csv()).withSchema(new Schema() // 顺序要一致.field("id", DataTypes.STRING()).field("time", DataTypes.BIGINT()).field("temp", DataTypes.DOUBLE())).createTemporaryTable("inputTable");Table inputTable = tableEnv.from("inputTable");
inputTable.printSchema();
// root
//  |-- id: STRING
//  |-- time: BIGINT
//  |-- temp: DOUBLE
tableEnv.toAppendStream(inputTable, Row.class).print();
// sensor_1,1547718199,35.8
// sensor_6,1547718201,15.4
// sensor_7,1547718202,6.7
// sensor_10,1547718205,38.1
// sensor_1,1547710000,36.8
// sensor_1,1547719999,34.8
// sensor_1,1547715555,37.8

使用新版的CSV需要引入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.10.1</version>
</dependency>

表的查询 – Table API

Table API 是集成在 Scala 和 Java 语言内的查询 API

Table API 基于代表“表”的 Table 类,并提供一整套操作处理的方法 API;这些方法会返回一个新的 Table 对象,表示对输入表应用转换操作的结果

有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构

表的查询 – SQL

Flink 的 SQL 集成,基于实现 了SQL 标准的 Apache Calcite

在 Flink 中,用常规字符串来定义 SQL 查询语句

SQL 查询的结果,也是一个新的 Table

// 查询转换
// 3.1 TableAPI
// 简单转换
Table resultTable = inputTable.select("id, temp").filter("id === 'sensor_1'");// 聚合统计
Table aggTable = inputTable.groupBy("id").select("id, id.count as count, temp.avg as avgTemp");// 3.2 SQL
Table resultSqlTable = tableEnv.sqlQuery("select id, temp from inputTable where id = 'sensor_1'");Table aggSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTmp from inputTable group by id");// tableEnv.toAppendStream(resultTable, Row.class).print("result");
// tableEnv.toAppendStream(resultSqlTable, Row.class).print("resultSql");
// resultSql> sensor_1,35.8
// result> sensor_1,35.8
// resultSql> sensor_1,36.8
// result> sensor_1,36.8
// resultSql> sensor_1,34.8
// result> sensor_1,34.8
// resultSql> sensor_1,37.8
// result> sensor_1,37.8// 此处因为使用了count有更新查询出来的表的内容,因此不能使用toAppendStream,要使用toRetractStream
tableEnv.toRetractStream(aggTable, Row.class).print("agg");
tableEnv.toRetractStream(aggSqlTable, Row.class).print("aggSQL");
// aggSQL> (true,sensor_1,1,35.8)
// aggSQL> (true,sensor_6,1,15.4)
// aggSQL> (true,sensor_7,1,6.7)
// aggSQL> (true,sensor_10,1,38.1)
// aggSQL> (false,sensor_1,1,35.8)    // true表示新数据的插入,false表示旧数据的删除
// aggSQL> (true,sensor_1,2,36.3)
// aggSQL> (false,sensor_1,2,36.3)
// aggSQL> (true,sensor_1,3,35.8)
// aggSQL> (false,sensor_1,3,35.8)
// aggSQL> (true,sensor_1,4,36.3)
// agg> (true,sensor_1,1,35.8)
// agg> (true,sensor_6,1,15.4)
// agg> (true,sensor_7,1,6.7)
// agg> (true,sensor_10,1,38.1)
// agg> (false,sensor_1,1,35.8)
// agg> (true,sensor_1,2,36.3)
// agg> (false,sensor_1,2,36.3)
// agg> (true,sensor_1,3,35.8)
// agg> (false,sensor_1,3,35.8)
// agg> (true,sensor_1,4,36.3)

输出表

表的输出,是通过将数据写入 TableSink 来实现的

TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列

输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入注册过的 TableSink 中

输出到文件

String path = "/home/lxj/workspace/Flink/src/main/resources/sensor.txt";
tableEnv.connect(new FileSystem().path(path)).withFormat(new Csv()).withSchema(new Schema() // 顺序要一致.field("id", DataTypes.STRING()).field("time", DataTypes.BIGINT()).field("temp", DataTypes.DOUBLE())).createTemporaryTable("inputTable");Table inputTable = tableEnv.from("inputTable");Table resultTable = inputTable.select("id, temp").filter("id === 'sensor_1'");// 输出到文件
// 注册外部文件到输出表
String outputPath = "/home/lxj/workspace/Flink/src/main/resources/output.txt";
tableEnv.connect(new FileSystem().path(outputPath)).withFormat(new Csv()).withSchema(new Schema() // 顺序要一致.field("idOut", DataTypes.STRING()).field("tempOut", DataTypes.DOUBLE())).createTemporaryTable("outputTable");// insertInto输出到文件的话是不支持toRetractStream带更新的流
resultTable.insertInto("outputTable");sensor_1,35.8
sensor_1,36.8
sensor_1,34.8
sensor_1,37.8

更新模式

对于流式查询,需要声明如何在表和外部连接器之间执行转换,与外部系统交换的消息类型,由更新模式(Update Mode)指定

追加(Append)模式

表只做插入操作,和外部连接器只交换插入(Insert)消息,所有的外部连接器都支持追加模式

撤回(Retract)模式

– 表和外部连接器交换添加(Add)和撤回(Retract)消息

– 插入操作(Insert)编码为 Add 消息;删除(Delete)编码为 Retract 消息;更新(Update)编码为上一条的 Delete 和下一条的 Add 消息

比如count,avg之类的聚合操作

更新插入(Upsert)模式

更新和插入都被编码为 Upsert 消息;删除编码为 Delete 消息

输出到 Kafka

可以创建 Table 来描述 kafka 中的数据,作为输入或输出的 TableSink

tableEnv.connect(new Kafka().version("0.11").topic("sinkTest").property("zookeeper.connect", "localhost:2181").property("bootstrap.servers", "localhost:9092")
).withFormat( new Csv() ).withSchema( new Schema().field("id", DataTypes.STRING()).field("temp", DataTypes.DOUBLE())).createTemporaryTable("kafkaOutputTable");resultTable.insertInto("kafkaOutputTable");

输出到 ES

可以创建 Table 来描述 ES 中的数据,作为输出的 TableSink

tableEnv.connect(new Elasticsearch().version("6").host("localhost", 9200, "http").index("sensor").documentType("temp")
).inUpsertMode()          // 默认是Append模式,需要显示声明为Upsert.withFormat(new Json()).withSchema( new Schema().field("id", DataTypes.STRING()).field("count", DataTypes.BIGINT())).createTemporaryTable("esOutputTable");
aggResultTable.insertInto("esOutputTable");

输出到 MySql

可以创建 Table 来描述 MySql 中的数据,作为输入和输出,需要引入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.12</artifactId><version>1.10.1</version>
</dependency>
String sinkDDL="create table jdbcOutputTable (" +" id varchar(20) not null, " +" cnt bigint not null " +") with (" +" 'connector.type' = 'jdbc', " +" 'connector.url' = 'jdbc:mysql://localhost:3306/test', " +" 'connector.table' = 'sensor_count', " +" 'connector.driver' = 'com.mysql.jdbc.Driver', " +" 'connector.username' = 'root', " +" 'connector.password' = '123456' )";// 执行 DDL 创建表
tableEnv.sqlUpdate(sinkDDL);aggResultSqlTable.insertInto("jdbcOutputTable");

DataStream 和表的转换

将 Table 转换成 DataStream

表可以转换为 DataStream 或 DataSet ,这样自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了

将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转换成的数据类型

表作为流式查询的结果,是动态更新的

转换有两种转换模式:追加(Append)模式和撤回(Retract)模式

**追加模式(Append Mode)**用于表只会被插入(Insert)操作更改的场景

DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);

**撤回模式(Retract Mode)**用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作;得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert),还是被删除的数据(Delete)

DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable , Row.class);

将 DataStream 转换成表

对于一个 DataStream,可以直接转换成 Table,进而方便地调用 Table API做转换操作

DataStream<SensorReading> dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream);

默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来

DataStream<SensorReading> dataStream = ...
Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature");

创建临时视图(Temporary View)

基于 DataStream 创建临时视图

tableEnv.createTemporaryView("sensorView", dataStream);tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, timestamp as ts");

基于 Table 创建临时视图

tableEnv.createTemporaryView("sensorView", sensorTable);

查看执行计划

Table API 提供了一种机制来解释计算表的逻辑和优化查询计划

查看执行计划,可以通过 TableEnvironment.explain(table) 方法或TableEnvironment.explain() 方法完成,返回一个字符串,描述三个计划

优化的逻辑查询计划

优化后的逻辑查询计划

实际执行计划。

String explaination = tableEnv.explain(resultTable);
System.out.println(explaination);

流处理和关系代数的区别

在这里插入图片描述

动态表(Dynamic Tables)

动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念

与表示批处理数据的静态表不同,动态表是随时间变化的

持续查询(Continuous Query)

动态表可以像静态的批处理表一样进行查询,查询一个动态表会产生持续查询(Continuous Query)

连续查询永远不会终止,并会生成另一个动态表

查询会不断更新其动态结果表,以反映其动态输入表上的更改
在这里插入图片描述

流式表查询的处理过程:

  1. 流被转换为动态表
  2. 对动态表计算连续查询,生成新的动态表
  3. 生成的动态表被转换回流

将流转换成动态表

为了处理带有关系查询的流,必须先将其转换为表

从概念上讲,流的每个数据记录,都被解释为对结果表的插入(Insert)修改操作

在这里插入图片描述

持续查询会在动态表上做计算处理,并作为结果生成新的动态表
在这里插入图片描述

将动态表转换成 DataStream

与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改,进行持续的修改

将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码

仅追加(Append-only)流仅通过插入(Insert)更改来修改的动态表,可以直接转换为仅追加流

撤回(Retract)流撤回流是包含两类消息的流:添加(Add)消息和撤回(Retract)消息

Upsert(更新插入)流Upsert 流也包含两种类型的消息:Upsert 消息和删除(Delete)消息。

在这里插入图片描述

http://www.lbrq.cn/news/2702287.html

相关文章:

  • 遵义网站搭建公司哪家好网站建设选亿企网络
  • 巴中网站建设培训班百度分析
  • 网站建设与管理自简历网站建设平台
  • 提升网站权重禁止搜索引擎收录的方法
  • 美食网站要怎么做南京seo
  • 国际贸易英文网站优化网站的步骤
  • 中国各大网站排名网络推广免费网站
  • 石家庄做网站价格谷歌google play官网
  • 个人网站网站建设百度一下网页版搜索引擎
  • 网上推广赌博seo在哪学
  • 大学网站建设招标方案长沙网络推广小公司
  • 漳州 外贸网站建设 SEO广州百度搜索排名优化
  • 新疆建设网二级域名网站青岛网站建设推广公司
  • 泰安网站建设51baiduaso搜索优化
  • 网络营销方案撰写的内容与要求360网站关键词排名优化
  • wordpress多程序用户同步苏州排名搜索优化
  • 网站建设毕业设计中期报告广告软文怎么写
  • 有人知道做网站吗?nba最新交易汇总实时更新
  • 做网站php的作用活动推广宣传方案
  • 淘宝客网站建设教程上海网站优化公司
  • sw网站建设网络营销策划书结构
  • 服装网站建设策划书3000字seo站长综合查询
  • 长沙 建网站免费的h5制作网站模板
  • 网站制作费用价格表企业网站推广策划书
  • 如何加快百度收录网站百度电话销售
  • 人人商城网站开发近期国内外重大新闻10条
  • ui设计需要学哪些课程企业seo服务
  • 用jsp做的网站需要什么工具栏东莞seo广告宣传
  • 南京学习网站建设推广app赚佣金接单平台
  • 论某网站职能建设温州网站优化推广方案
  • 浅层神经网络
  • 【Java web】HTTP 与 Web 基础教程
  • 机器学习项目从零到一:加州房价预测模型(PART 3)
  • 《AVL树的原理与C++实现:详解平衡二叉搜索树的高效构建与操作》
  • LDAP 登录配置参数填写指南
  • 【项目设计】高并发内存池