您好,欢迎访问代理记账网站
移动应用 微信公众号 联系我们

咨询热线 -

电话 15988168888

联系客服
  • 价格透明
  • 信息保密
  • 进度掌控
  • 售后无忧

Flink实践:FlinkSQL中的join

1.以简单的FlinkSQL demo为例,进行Join的测试:

object FlinkJoinDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(10)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tableEnv = StreamTableEnvironment.create(env, settings)
    tableEnv.executeSql(
      """
        |CREATE TABLE input (
        |    id BIGINT,
        |    name STRING,
        |    proctime AS PROCTIME()   -- generates processing-time attribute using computed column
        |) WITH (
        |    'connector' = 'kafka',  -- using kafka connector
        |    'topic' = 'flinksource',  -- kafka topic
        |    'scan.startup.mode' = 'latest-offset',  -- reading from the beginning
        |    'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092',  -- kafka broker address
        |    'format' = 'json'  -- the data format is json
        |)
        |
      """.stripMargin)


    tableEnv.executeSql(
      """
        |CREATE TABLE input2 (
        |    id BIGINT,
        |    age Int,
        |    proctime AS PROCTIME()   -- generates processing-time attribute using computed column
        |) WITH (
        |    'connector' = 'kafka',  -- using kafka connector
        |    'topic' = 'flinksource2',  -- kafka topic
        |    'scan.startup.mode' = 'latest-offset',  -- reading from the beginning
        |    'properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092',  -- kafka broker address
        |    'format' = 'json'  -- the data format is json
        |)
        |
      """.stripMargin)



    tableEnv.executeSql(
      """
        |CREATE TABLE output (
        |    id BIGINT,
        |    name STRING,
        |    age int
        |) WITH (
        |    'connector' = 'print'
        |)
        |
      """.stripMargin)

    tableEnv.executeSql(
      """
        |
        | insert into output select a.id,a.name,b.age from input a right join input2 b on a.id = b.id
        |
      """.stripMargin)
  }
}

1.1.left join

         使用left join,以a表为主表,b为维度表,当a表的数据到来,会立即输出,如果b表中的数据到来,会到a表中找相对应的主键,并关联进行数据的输出。

A:{"id":2,"name":"张三"}
O: {"id":2,"name":"张三",age:null}

B:{"id":2,"age":200}
O: -D{"id":2,"name":"张三",age:null}
O: +I{"id":2,"name":"张三",age:200}

B:{"id":3,"age":100}
O:无输出

A:{"id":3,"name":"李四"}
O:{"id":3,"name":"李四",age:100}

1.2.inner join

        使用inner join。2张表的数据都会实时监听并关联输出。即join上的数据会进行输出

1.3.right join

        真好与left join相对。以b为主表,a为维度表。

1.4.interval join

1.5.Lateral


分享:

低价透明

统一报价,无隐形消费

金牌服务

一对一专属顾问7*24小时金牌服务

信息保密

个人信息安全有保障

售后无忧

服务出问题客服经理全程跟进