大数据流式计算架构(深入理解flink实时大数据 pdf)

一、背景Flink由于阿里在国内的助推,火爆程度可以想象,且目前Flink有非常明显的趋势是往SQL方向进行的。很多大厂已经实现了FlinkSQL化,那我们怎么去实现一个流式计算平台呢?二、FlinkSQL初探以及代码实现连接kafka

一、背景

Flink 由于阿里在国内的助推,火爆程度可以想象,且目前Flink 有非常明显的趋势是往SQL 方向进行的。很多大厂已经实现了Flink SQL化,那我们怎么去实现一个流式计算平台呢?

大数据流式计算架构(深入理解flink实时大数据 pdf)

二、Flink SQL 初探以及代码实现

连接kafka 对数据进行处理写入mysql

package org.example;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class SqlDemo {

public static void main(String[] args) throws Exception {

//创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings.newInstance()

.useBlinkPlanner()

.inStreamingMode()

.build();

TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

//把kafka 中的topic映射成一个输入临时表

tableEnv.executeSql(

\"create table sensor_source (id string,name string) with (\" +

\" \'connector\' = \'kafka\',\" +

\" \'topic\' = \'test_info_test\',\" +

\" \'properties.bootstrap.servers\' = \'localhost:9092\',\" +

\" \'properties.group.id\' = \'testGroup\',\" +

\" \'scan.startup.mode\' = \'earliest-offset\',\" +

\" \'format\' = \'json\')\"

);

//把mysql 中的表映射成一个输出临时表

String sql = \"CREATE TABLE print_table (\\n\" +

\" id STRING,\\n\" +

\" name STRING\\n\" +

\") WITH (\\n\" +

\" \'connector\' = \'print\'\\n\" +

\")\";

String mysql_sql = \"CREATE TABLE mysql_sink (\\n\" +

\" id string,\\n\" +

\" name string\\n\" +

\" ) WITH (\\n\" +

\" \'connector\' = \'jdbc\',\\n\" +

\" \'url\' = \'jdbc:mysql://ip:8081/kafka?serverTimezone=UTC\',\\n\" +

\" \'table-name\' = \'test_info\',\\n\" +

\" \'username\' = \'kafka\',\\n\" +

\" \'password\' = \'Bonc123\'\\n\" +

\" )\";

String kafka_sink_sql=

\"create table kafka_sink (id string,name string) with (\" +

\" \'connector\' = \'kafka\',\" +

\" \'topic\' = \'test_info_2\',\" +

\" \'properties.bootstrap.servers\' = \'localhost:9092\',\" +

\" \'format\' = \'json\')\";

tableEnv.executeSql(mysql_sql);

//tableEnv.executeSql(kafka_sink_sql);

//tableEnv.executeSql(sql);

//插入数据的sql语句

//tableEnv.executeSql(\"insert into print_table select * from sensor_source\");

tableEnv.executeSql(\"insert into mysql_sink select * from sensor_source\");

//tableEnv.executeSql(\"insert into kafka_sink select * from sensor_source\");

}

}

运行之后mysql里面数据就有了

大数据流式计算架构(深入理解flink实时大数据 pdf)

三、Flink 实时计算平台

依据上面的代码,我们可以抽象出一层Flink 实时计算平台。

大数据流式计算架构(深入理解flink实时大数据 pdf)

大数据流式计算架构(深入理解flink实时大数据 pdf)

文章来源于诸葛子房

本站部分文章来自网络或用户投稿,如无特殊说明或标注,均为本站原创发布。涉及资源下载的,本站旨在共享仅供大家学习与参考,如您想商用请获取官网版权,如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。
投稿

中国短跑100米纪录是多少秒(中国100米短跑世界纪录是多少)

2022-8-5 22:27:04

投稿

香港天水围家暴灭门(香港天水围灭门惨案案发现场)

2022-8-5 22:27:12

搜索