中国icp备案网站/app拉新推广平台渠道商
1.简介
Table API 是流处理和批处理通用的关系型API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是SQL 语言的超集并专门为ApacheFlink 设计的,Table API 是Scala 和Java 语言集成式的API。与常规SQL 语言中将查询指定为字符串不同,Table API 查询是以Java 或Scala 中的语言嵌入样式来定义的,具有IDE 支持如:自动完成和语法检测。
而对于Flink SQL,就是直接可以在代码中写SQL,来实现一些查询(Query)操作。Flink 的SQL 支持,基于实现了SQL 标准的Apache Calcite(Apache 开源SQL 解析工具)。
2.实际开发
2.1.需要引入的依赖
Table API 和SQL 需要引入的依赖有两个:planner 和bridge。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
flink-table-planner:planner 计划器,是table API 最主要的部分,提供了运行时环境和生成程序执行计划的planner;
flink-table-api-scala-bridge:bridge 桥接器,主要负责table API 和DataStream/DataSet API的连接支持,按照语言分java 和scala。
这里的两个依赖,是IDE 环境下运行需要添加的;如果是生产环境,lib 目录下默认已经有了planner,就只需要有bridge 就可以了。
当然,如果想使用用户自定义函数,或是跟kafka 做连接,需要有一个SQL client,这个包含在flink-table-common 里。
2.1.1.两种planner(old & blink)的区别
1. 批流统一:Blink 将批处理作业,视为流式处理的特殊情况。所以,blink 不支持表和DataSet 之间的转换,批处理作业将不转换为DataSet 应用程序,而是跟流处理一样,转换为DataStream 程序来处理。
2. 因为批流统一, Blink planner 也不支持BatchTableSource , 而使用有界的StreamTableSource 代替。
3. Blink planner 只支持全新的目录,不支持已弃用的ExternalCatalog。
4. 旧planner 和Blink planner 的FilterableTableSource 实现不兼容。旧的planner 会把PlannerExpressions 下推到filterableTableSource 中,而blink planner 则会把Expressions 下推。
5. 基于字符串的键值配置选项仅适用于Blink planner。
6. PlannerConfig 在两个planner 中的实现不同。
7. Blink planner 会将多个sink 优化在一个DAG 中(仅在TableEnvironment 上受支持,而在StreamTableEnvironment 上不受支持)。而旧planner 的优化总是将每一个sink 放在一个新的DAG 中,其中所有DAG 彼此独立。
8. 旧的planner 不支持目录统计,而Blink planner 支持。
2.2.API 调用
1.创建表环境
// 1.1 基于老版本planner的流处理
val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val oldStreamTableEnv = StreamTableEnvironment.create(env, settings)// 1.2 基于老版本的批处理
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val oldBatchTableEnv = BatchTableEnvironment.create(batchEnv)// 1.3 基于blink planner的流处理
val blinkStreamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings)// 1.4 基于blink planner的批处理
val blinkBatchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings)
2.Table API开发
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv = StreamTableEnvironment.create(env, settings)// 读取数据val inputStream = env.socketTextStream("localhost", 7777)// 先转换成样例类类型(简单转换操作)val dataStream = inputStream.map(data => {val arr = data.split(",")SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L})// 基于流创建一张表。// 如果流中的数据类型是case class 可以直接根据case class 的结构生成table。// 或者根据字段顺序单独命名val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)// 调用table api进行转换val middleTable= dataTable.select("id, temperature").filter("id == 'sensor_1'")// 1. Group Window// 1.1 table apival resultTable = middleTable.window(Tumble over 10.seconds on 'ts as 'tw) // 每10秒统计一次,滚动时间窗口.groupBy('id, 'tw).select('id, 'id.count, 'temperature.avg, 'tw.end)// 1.2 sqltableEnv.createTemporaryView("sensor", sensorTable)val resultSqlTable = tableEnv.sqlQuery("""|select| id,| count(id),| avg(temperature),| tumble_end(ts, interval '10' second)|from sensor|group by| id,| tumble(ts, interval '10' second)""".stripMargin)// 2. Over window:统计每个sensor每条数据,与之前两行数据的平均温度// 2.1 table apival overResultTable = sensorTable.window(Over partitionBy 'id orderBy 'ts preceding 2.rows as 'ow).select('id, 'ts, 'id.count over 'ow, 'temperature.avg over 'ow)// 2.2 sqlval overResultSqlTable = tableEnv.sqlQuery("""|select| id,| ts,| count(id) over ow,| avg(temperature) over ow|from sensor|window ow as (| partition by id| order by ts| rows between 2 preceding and current row|)""".stripMargin)// 转换成流打印输出overResultTable.toAppendStream[Row].print("result")overResultSqlTable.toRetractStream[Row].print("sql")env.execute("time and window test")}
3.Flink SQL开发
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv = StreamTableEnvironment.create(env,settings)tableEnv.executeSql("""|CREATE TABLE user_behavior (| user_id BIGINT,| item_id BIGINT,| category_id BIGINT,| behavior STRING,| ts TIMESTAMP(3),| proctime AS PROCTIME(), -- generates processing-time attribute using computed column| WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- defines watermark on ts column, marks ts as event-time attribute|) WITH (| 'connector' = 'kafka', -- using kafka connector| 'topic' = 'user_behavior', -- kafka topic| 'scan.startup.mode' = 'earliest-offset', -- reading from the beginning| 'properties.bootstrap.servers' = 'kafka:9094', -- kafka broker address| 'format' = 'json' -- the data format is json|)|""".stripMargin)// print the schematableEnv.executeSql("DESCRIBE user_behavior").print()
4.注意使用Flink SQL中的rowtime与proctime。
- 'timestamp.rowtime 不能在处理时间环境中使用,而且一定要在有assignTimestampsAndWatermarks情况下才能使用rowtime,因为需要提取时间戳。
- 可以使用 ts.rowtime,timestamp.rowtime 不能使用 pt.rowtime
- 'pt.proctime 在事件事件下可以提取处理事件 可以使用'pt.proctime 或者 'ts.proctime 也可以获取处理时间
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv = StreamTableEnvironment.create(env, settings)// 读取数据val inputPath = "E:\\workspace\\FlinkCourse\\FlinkTable\\src\\main\\resources\\sensor.txt"val inputStream = env.readTextFile(inputPath)// 先转换成样例类类型(简单转换操作)val dataStream = inputStream.map(data => {val arr = data.split(",")SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L})// 'timestamp.rowtime 不能在处理时间环境中使用,而且一定要在有assignTimestampsAndWatermarks情况下才能使用rowtime,因为需要提取时间戳。// 可以使用 ts.rowtime,timestamp.rowtime 不能使用 pt.rowtime// 'pt.proctime 在事件事件下可以提取处理事件 可以使用'pt.proctime 或者 'ts.proctime 也可以获取处理时间//OKval sensorTable1 = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'ts.proctime as 'pt, 'timestamp.rowtime as 'ts1)//基于DataStream创建Tableval sensorTable2 = tableEnv.fromDataStream(dataStream, 'id, 'timestamp, 'temperature, 'ts.proctime, 'timestamp.rowtime as 'ts1)val sensorTable3 = tableEnv.fromDataStream(dataStream, 'id, 'timestamp, 'temperature, 'pt.rowtime as 'ts2)// sensorTable1.execute().print()sensorTable3.execute().print()env.execute()}