Flink SQL
Function 内置函数
FROM_UNIXTIME
FROM_UNIXTIME(BIGINT)
传入一个十位数 转化 时间字符串
TO_TIMESTAMP
- FROM_UNIXTIME(BIGINT) 传入一个十三位数 转化 时间类型
- FROM_TIMESTAMP(DATE) 传入时间字符串 转化 时间类型
WATERMARK
严格递增时间戳:
WATERMARK FOR rowtime_column AS rowtime_column发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到。
递增时间戳:
延时 5 秒 生成 watermark
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND发出到目前为止已观察到的最大时间戳减 1 的 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。
Query 查询
DISTINCT 去重
SELECT DISTINCT id FROM OrdersWINDOW 窗口
TUMBLE 滚动窗口
TUMBLE(TABLE data,DESCRIPTOR(timecol), size)data 表名 e.g. TABLE demo
timecol timestamp 字段 e.g. DESCRIPTOR(bidtime)
size 滚动窗口大小 e.g. INTERVAL '10' MINUTES
e.g.
select * from TABLE( TUMBLE( TABLE demo, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES ))or
select * from TABLE( TUMBLE( DATA => TABLE demo, TIMECOL => DESCRIPTOR(bidtime), SIZE => INTERVAL '10' MINUTES ))实际使用:
// 实际使用中加上window_start, window_endenvTable.sqlQuery( """ |select `window_start`,`window_end`,`UserId` from TABLE( | TUMBLE ( | TABLE add, | DESCRIPTOR(`TimeStamp`), | INTERVAL '10' MINUTES | ) |) |where action = 'pv' |GROUP BY `window_start`, `window_end`,`UserId` |""".stripMargin).limit(1)HOP 滑动窗口
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])data 表名 e.g. `TABLE demo
timecol timestamp 字段 e.g. DESCRIPTOR(bidtime)
slide 滑动大小 e.g. INTERVAL '5' MINUTES
size 滑动窗口大小 e.g. INTERVAL '10' MINUTES
e.g.
select * from TABLE( HOP( TABLE demo, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES ))or
select * from TABLE( HOP( DATA => TABLE demo, TIMECOL => DESCRIPTOR(bidtime), SLIDE => INTERVAL '5' MINUTES, SIZE => INTERVAL '10' MINUTES ))Create 创建
TABLE 表
行
Insert 插入
Flink Table API
Operations 操作API
from
tableEnv.from('Orders')等同于 select * from Orders本身等同于sql中 from
FromValues
val table = tableEnv.fromValues( row(1,"ABC"), row(2,"ABCDE"))效果如下:
root| -- f0 BIGINT NOT NULL| -- f1 VARCHAR(5) NOT NULL默认自动识别类型 可以指定类型 如下:
val table = tableEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("id",DataTypes.DECIMAL(10,2)), DataTypes.FIELD("name",DataTypes.STRING()) ), row(1,"ABC"), row(2,"ABCDE"))结构如下:
root| -- id DECIMAL(10,2)| -- name STRING本身等同于sql中 values
Select
val orders = tableEnv.from("Orders")Table result = orders.select($"a", $"c" as "d")// orTable result = orders.select($"*")本身等同于sql中 select
As
val orders = tableEnv.from("Orders");val result = orders.as("x, y, z, t");Where / Filter
val orders = tableEnv.from("Orders");val result = orders.where($("b").isEqual("red"));// select * from Orders where b = 'red'// orval orders = tableEnv.from("Orders");val result = orders.filter($("b").isEqual("red"));// select * from Orders where b != 'red'列操作
AddColumns
执行字段添加操作。 如果所添加的字段已经存在,将抛出异常。
val orders = tableEnv.from("Orders");val result = orders.addColumns(concat($"c"));AddOrReplaceColumns
执行字段添加操作。 如果添加的列名称和已存在的列名称相同,则已存在的字段将被替换。 此外,如果添加的字段里面有重复的字段名,则会使用最后一个字段。
val orders = tableEnv.from("Orders")val result = orders.addOrReplaceColumns(concat($"c", "Sunny") as "desc")DropColumns
val orders = tableEnv.from("Orders")val result = orders.dropColumns($"b")-RenameColumns
val orders = tableEnv.from("Orders")val result = orders.renameColumns($"b" as "b2")Aggregations 聚合API
GroupBy Aggregation 分组聚合
val orders: Table = tableEnv.from("Orders")val result = orders.groupBy($"a").select($"a", $"b".sum().as("d"))GroupBy Window Aggregation 窗口聚合
val orders: Table = tableEnv.from("Orders")val result: Table = orders .window(Tumble over 5.minutes on $"rowtime" as "w") // 定义窗口 .groupBy($"a", $"w") // 按窗口和键分组 .select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".sum as "d") // 访问窗口属性并聚合Distinct Aggregation 去重聚合
val orders: Table = tableEnv.from("Orders")// 按属性分组后的的互异(互不相同、去重)聚合val groupByDistinctResult = orders .groupBy($"a") .select($"a", $"b".sum.distinct as "d")// 按属性、时间窗口分组后的互异(互不相同、去重)聚合val groupByWindowDistinctResult = orders .window(Tumble over 5.minutes on $"rowtime" as "w").groupBy($"a", $"w") .select($"a", $"b".sum.distinct as "d")// over window 上的互异(互不相同、去重)聚合val result = orders .window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE as $"w") .select($"a", $"b".avg.distinct over $"w", $"b".max over $"w", $"b".min over $"w")Connector 连接器
FileSystem 文件系统
参数
path='file:///'+ 路径
案例
envTable.executeSql(""" |create table add ( |`UserId` bigint |) WITH ( |'connector' = 'filesystem', |'path' = 'file:///D:\project\java\reflink\reflink\source\UserBehavior.csv', |'format' = 'csv' |) |""".stripMargin)JDBC
pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>1.13.6</version></dependency>-
$
urlJDBC 数据库 url。 -
table-name连接到 JDBC 表的名称。 -
driver用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 -
usernameJDBC 用户名 -
passwordJDBC 密码。
注意没有 foramt
案例
envTable.executeSql(""" |create table add ( |`UserId` bigint |) WITH ( |'connector' = 'jdbc', |'url' = 'jdbc:mysql://localhost:3306/test' | |) |""".stripMargin)Kafka
pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version></dependency>参数
-
$
topic Kafka记录的 Topic 名。- -
partition Kafka记录的 partition ID。 -
headers 二进制 Map类型的 Kafka 记录头(Header) -
leader-epochKafka记录的 Leader epoch(如果可用) -
offsetKafka 记录在 partition 中的 offset。 -
timestampKafka 记录的时间戳。 -
timestamp-typeKafka 记录的时间戳类型。可能的类型有 “NoTimestampType”, “CreateTime”(会在写入元数据时设置),或 “LogAppendTime”。 -
$
properties.bootstrap.servers逗号分隔的 Kafka broker 列表。 -
$
properties.group.idkafak 组id -
properties.*可以设置和传递任意 Kafka 的配置项,后缀名必须匹配在 Kafka 配置文档 中定义的配置键
一致性保证 EOS
开启checkpoint
参数 sink.semantic
可选值:
-
none不保证任何语义 -
at-least-once(默认设置) 至少一次 -
exactly-once精确一次
案例
envTable.executeSql(""" |create table add ( |`UserId` bigint |) WITH ( |'connector' = 'kafka', |'topic' = 'demo' , |'format' = 'csv' |) |""".stripMargin)format
Csv
pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version></dependency>参数
-
csv.field-delimiter 字段分隔符 (默认’,’`) -
csv.disable-quote-character是否禁止对引用的值使用引号 (默认是 false) -
csv.quote-character用于围住字段值的引号字符 (默认") -
csv.allow-comments是否允许忽略注释行(默认不允许)— -
csv.ignore-parse-errors当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 -
csv.array-element-delimiter分隔数组和行元素的字符串(默认';'). -
csv.escape-character转义字符(默认关闭). -
csv.null-literal是否将 “null” 字符串转化为 null 值
Json
pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version></dependency>Avro
pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>${flink.version}</version></dependency>Flink Type
char
charchar(n)n 字符串长度
varchar
VARCHARVARCHAR(n)STRINGn 字符串长度
BINARY
BINARYBINARY(n)n 二进制字符串
VARBINARY/BYTES
VARBINARYVARBINARY(n)
BYTESn 二进制字符串
DECIMAL
DECIMALDECIMAL(p)DECIMAL(p, s)
DECDEC(p)DEC(p, s)
NUMERICNUMERIC(p)NUMERIC(p, s)