关于 Flink Regular Join 与 TTL 的理解

boyanx4个月前技术教程12

导读:对于流查询,Regular Join 的语法是最灵活的,它允许任何类型的更新(插入、更新、删除)输入表。

Regular Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):

  • Inner Join(Inner Equal Join):当两条流 Join 到才会输出 +[L, R]
  • Left Join(Outer Equal Join):左流数据到达之后 Join 到 R 流数据则输出 +[L, R],没 Join 到输出 +[L, null])。如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null],然后输出 +[L, R]。
  • Right Join(Outer Equal Join):与 Left Join 逻辑相反
  • Full Join(Outer Equal Join):流任务中,左流或者右流的数据到达之后,无论有没有 Join 到另外一条流的数据,都会输出(对右流来说:Join 到输出 +[L, R],没 Join 到输出 +[null, R];对左流来说:Join 到输出 +[L, R],没 Join 到输出 +[L, null])。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R],输出 +[L, R],右流数据到达为例:回撤 -[L, null],输出 +[L, R])。

Regular Inner Join

Flink SQL

CREATE TABLE matchResult (
    guid STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'match_result_log_test',
    'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
    'properties.group.id' = 'flinkTestGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

CREATE TABLE readRecord (
    guid STRING,
    book_name STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'read_record_log_test',
    'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
    'properties.group.id' = 'flinkTestGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

CREATE TABLE sink_table (
    guid STRING,
    book_name STRING
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT
    matchResult.guid,
    readRecord.book_name
FROM matchResult
INNER JOIN readRecord ON  matchResult.guid = readRecord.guid;

输出结果解析

															 -- L 流数据达到,由于没有 Join 到 R 流数据而且是 inner join 便不输出结果
+I[111, book1]  		 -- R 流数据达到, Join 到 L 流数据,便输出 +I[111, book1]
															 -- R 流数据达到,由于没有 Join 到 L 流数据而且是 inner join 便不输出结果
+I[222, book2]			 -- L 流数据达到, Join 到 R 流数据便输出结果

Regular Left Join(Right join 则相反)

Flink SQL

CREATE TABLE matchResult (
    guid STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'match_result_log_test',
    'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
    'properties.group.id' = 'flinkTestGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

CREATE TABLE readRecord (
    guid STRING,
    book_name STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'read_record_log_test',
    'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
    'properties.group.id' = 'flinkTestGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

CREATE TABLE sink_table (
    guid STRING,
    book_name STRING
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT
    matchResult.guid,
    readRecord.book_name
FROM matchResult
LEFT JOIN readRecord ON  matchResult.guid = readRecord.guid;

输出结果解析

+I[111, null]  					-- L 流数据达到,没有 Join 到 R 流数据,便输出 +[L, null]
-D[111, null] 				 -- R 流的数据到达,发现 L 流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null]
+I[111, book1] 			-- 再输出 +[L, R]
															-- 这里模拟一条 R 流 guid = 222 的数据到达,由于是 left join 且没有 join 到 L 流,因此不做输出
+I[222, book2]			-- 当 L 流 guid = 222 的数据达到 join  R 流 后输出结果 +[L, R]

Regular Full Join

Flink SQL

CREATE TABLE matchResult (
    guid STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'match_result_log_test',
    'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
    'properties.group.id' = 'flinkTestGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

CREATE TABLE readRecord (
    guid STRING,
    book_name STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'read_record_log_test',
    'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxxxxxx',
    'properties.group.id' = 'flinkTestGroup',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

CREATE TABLE sink_table (
    guid STRING,
    book_name STRING
) WITH (
  'connector' = 'print'
);

INSERT INTO sink_table
SELECT
    matchResult.guid,
    readRecord.book_name
FROM matchResult
FULL JOIN readRecord ON  matchResult.guid = readRecord.guid;

输出结果解析

+I[111, null]					-- L 流数据达到,没有 Join 到 R 流数据,便输出 +I[L, null]	
+I[null, book2]		 -- R 流数据达到,没有 Join 到 R 流数据,便输出 +I[null, R]
-D[null, book2]		 -- L 流新数据到达,发现之前 R 流之前输出过没有 Join 到的数据,则发起回撤流,先输出 -D[null, R]
+I[222, book2]		-- 再输出 +I[L, R]
-D[111, null]				 -- 反之同理
+I[111, book1]

TTL 概念

在 Regular Join 时 Flink 会将两条没有时间窗口限制的流的所有数据存储在 State 中,由于流是无穷无尽持续流入的,随着时间的不断推进,内存中积累的状态会越来越多。

针对这个问题,Flink 提出了空闲状态保留时间(Idle State Retention Time)的概念。通过为每个状态设置 Timer,如果这个状态中途被访问过,则重新设置 Timer;否则(如果状态一直未被访问,长期处于 Idle 状态)则在 Timer 到期时做状态清理。这样,就可以确保每个状态都能得到及时的清理,可以通过 table.exec.state.ttl 参数进行控制(注意:这同时也会对结果的准确性有所影响,因此需要合理的权衡)。


最后

感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

参考文章


https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/concepts/overview/#idle-state-retention-time State 概念

https://cloud.tencent.com/developer/article/1452844?from=10680 Flink State TTL


https://cloud.tencent.com/developer/article/1452854 Idle State Retention Time 特性


https://mp.weixin.qq.com/s/zR2ukRjiw-IqUDX894NyGw 大数据羊说

相关文章

Netty服务端启动全流程源码分析_netty搭建http服务器

想要阅读Netty源码的同学,建议从GitHub上把源码拉下来,方便写注释、Debug调试哦~点我去下载!先来看一个简单的Echo服务端程序,监听本地的9999端口,有客户端接入时控制台输出一句话,接...

如何引用bootstrap没有的字体图标

今天做一个项目,用到了微信图标,要求用图标字体,而bootstrap字体文件中并没有微信的字体图标,怎么办? 一、使用 http://fontawesome.io/icons/网站中的字体:引入cs...

保姆级软路由刷机+软路由OpenWRT入门设置,新手轻松搭建软路由

本内容来源于@什么值得买APP,观点仅代表作者本人 |作者:我是阿皮啊-创作立场声明:整理了软路由刷机+入门设置的方法,希望帮到刚入手软路由的小伙伴,轻松搭建软路由~超级详细的搭建过程~开篇碎碎念Hi...

后台产品设计规范-Ant Design实践到落地-表单篇

编辑导语:本文作者在对一个电商系统进行系统重构的过程中,发现存在着一些问题,导致各个后台系统的用户体验不统一,与业内优秀的电商系统差距较大。于是决定使用Ant Design Vue这套框架对现有系统再...

测试员必备:Linux下安装JDK 1.8你必须知道的那些事

1.简介在Oracle收购Sun后,Java的一系列产品就被整合到Oracle官网中,打开官网乍眼一看也不知道去哪里下载,还得一个一个的摸索尝试,而且网上大多数都是一些Oracle收购Sun前,或者就...

FlinkSQL处理复杂JSON的思路_flinksql kudu

导读:在日常开发中常有这么一个场景,采集如日志等数据后以JSON形式存储到Kafka中,再由Flink从Kafka中获取数据并进行处理。但是有时候JSON比较复杂(多层嵌套),在FlinkSQL中解析...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。