FlinkSQL处理复杂JSON的思路_flinksql kudu

boyanx4个月前技术教程19

导读:在日常开发中常有这么一个场景,采集如日志等数据后以JSON形式存储到Kafka中,再由Flink从Kafka中获取数据并进行处理。但是有时候JSON比较复杂(多层嵌套),在FlinkSQL中解析起来比较麻烦,下面将讨论Flink SQL(1.10版本) 如何解析复杂JSON。

官网Demo

//JSON Format

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#table-formats

首先查看官网给出的一个例子,大致的解决思路为使用 format.json-schema,自定义一个format schema。

//官网例子
CREATE TABLE MyUserTable (
  ...
) WITH (
  'format.type' = 'json',                   -- required: specify the format type
  'format.fail-on-missing-field' = 'true'   -- optional: flag whether to fail if a field is missing or not, false by default

  'format.fields.0.name' = 'lon',           -- optional: define the schema explicitly using type information.
  'format.fields.0.data-type' = 'FLOAT',    -- This overrides default behavior that uses table's schema as format schema.
  'format.fields.1.name' = 'rideTime',
  'format.fields.1.data-type' = 'TIMESTAMP(3)',

  'format.json-schema' =                    -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP.
    '{                                      -- This also overrides the default behavior.
      "type": "object",
      "properties": {
        "lon": {
          "type": "number"
        },
        "rideTime": {
          "type": "string",
          "format": "date-time"
        }
      }
    }'
)

分析:Flink 在解析 JSON 的时候,对于复杂的 JSON 可以通过自定义format schema来支持。如果table schema 和 format schema相同,则可以自动派生 json 的 schema(但这种往往不适用于解析复杂JSON )。

实战例子

了解了官网的例子之后,我们手动试验一下。

1、从Kafka中获取复杂JSON用于测试,JSON 如下:

{"code":0,"data":{"request":{"name":"test","id":"ce1beb37-ed3e-4365-8e44-c3bd1d249cfc"}},"message":"SUCCESS","retryCount":1,"success":true}

2、编写format.json-schema

通过参考官网Demo,发现第一层的 retryCount 可直接就映射到字段上,而 data 是多层嵌套,所以定义data 的类型为object ,而properties则是其json的内层数据。我们的例子中为多层嵌套,那么简化对应的 json-schema 如下:

'format.json-schema' = '{
        "type": "object",
        "properties": {
            "retryCount": {type: "string"},
            "data":{type: "object",
                   "properties" : {
                      "request":{type: "object",
                         "properties" : {
                          "id" : {type:"string"}
                         }
                      }
                   }
             }
        }
    }'

3、定义table schema

从上面的 json schame 和 Flink SQL 的映射关系可以看出,data对应的table 字段的类型是ROW,所以 table schema 应是如下:

CREATE TABLE sourceTable (
retryCount VARCHAR,
data ROW(request ROW(id string))
)
WITH (
  ......
  }

4、使用的时候,直接用 "."的方式即可

Table table = bsTableEnv.sqlQuery("SELECT data.request.id AS ID,retryCount FROM sourceTable");

5、Kafka SourceTable完整例子

CREATE TABLE sourceTable (
retryCount VARCHAR,
data ROW(request ROW(id string))
)
WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'XXXX',
'connector.properties.zookeeper.connect' = 'XXXX:2181',
'connector.properties.bootstrap.servers' = 'XXXX:9092',
'connector.properties.group.id' = 'XXXXX',
'format.json-schema' = '{
        "type": "object",
        "properties": {
            "retryCount": {type: "string"},
            "data":{type: "object",
                   "properties" : {
                      "request":{type: "object",
                         "properties" : {
                          "id" : {type:"string"}
                         }
                      }
                   }
             }
        }
    }',
'format.type' = 'json');

最后

以上就是在FlinkSQL1.10中处理复杂JSON的一种方式,通过定义format.json schema实现。而在查看Flink中文邮件列表中也发现了其他的一些不错思路,如下:

通过在上游时将就转义成一个String放到JSON的一个field中,这样Flink解析出来就是一个String,
然后编写UDTF进行处理,感兴趣的朋友也可以尝试一下。

感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

相关文章

关于 Flink Regular Join 与 TTL 的理解

导读:对于流查询,Regular Join 的语法是最灵活的,它允许任何类型的更新(插入、更新、删除)输入表。Regular Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据...

Spring Boot(十四):集成Swagger2_springboot集成webservice详细教程

Swagger的简介目前大部分的项目都是前后端分离的,后端除了要提供接口外,还需要提供接口文档,有时由于需求、设计或方案的变更,会造成接口变更但是接口文档没有及时更新的情况。Swagger是一个在你写...

Flink SQL Client综合实战_icc client综合管理平台

在《Flink SQL Client初探》一文中,我们体验了Flink SQL Client的基本功能,今天来通过实战更深入学习和体验Flink SQL;实战内容本次实战主要是通过Flink SQL...

保姆级软路由刷机+软路由OpenWRT入门设置,新手轻松搭建软路由

本内容来源于@什么值得买APP,观点仅代表作者本人 |作者:我是阿皮啊-创作立场声明:整理了软路由刷机+入门设置的方法,希望帮到刚入手软路由的小伙伴,轻松搭建软路由~超级详细的搭建过程~开篇碎碎念Hi...

测试员必备:Linux下安装JDK 1.8你必须知道的那些事

1.简介在Oracle收购Sun后,Java的一系列产品就被整合到Oracle官网中,打开官网乍眼一看也不知道去哪里下载,还得一个一个的摸索尝试,而且网上大多数都是一些Oracle收购Sun前,或者就...

后台产品设计规范-Ant Design实践到落地-表单篇

编辑导语:本文作者在对一个电商系统进行系统重构的过程中,发现存在着一些问题,导致各个后台系统的用户体验不统一,与业内优秀的电商系统差距较大。于是决定使用Ant Design Vue这套框架对现有系统再...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。