一、背景
Flink 由于阿里在国内的助推,火爆程度可以想象,且目前Flink 有非常明显的趋势是往SQL 方向进行的。很多大厂已经实现了Flink SQL化,那我们怎么去实现一个流式计算平台呢?
二、Flink SQL 初探以及代码实现
连接kafka 对数据进行处理写入mysql
package org.example;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SqlDemo {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
//把kafka 中的topic映射成一个输入临时表
tableEnv.executeSql(
\"create table sensor_source (id string,name string) with (\" +
\" \'connector\' = \'kafka\',\" +
\" \'topic\' = \'test_info_test\',\" +
\" \'properties.bootstrap.servers\' = \'localhost:9092\',\" +
\" \'properties.group.id\' = \'testGroup\',\" +
\" \'scan.startup.mode\' = \'earliest-offset\',\" +
\" \'format\' = \'json\')\"
);
//把mysql 中的表映射成一个输出临时表
String sql = \"CREATE TABLE print_table (\\n\" +
\" id STRING,\\n\" +
\" name STRING\\n\" +
\") WITH (\\n\" +
\" \'connector\' = \'print\'\\n\" +
\")\";
String mysql_sql = \"CREATE TABLE mysql_sink (\\n\" +
\" id string,\\n\" +
\" name string\\n\" +
\" ) WITH (\\n\" +
\" \'connector\' = \'jdbc\',\\n\" +
\" \'url\' = \'jdbc:mysql://ip:8081/kafka?serverTimezone=UTC\',\\n\" +
\" \'table-name\' = \'test_info\',\\n\" +
\" \'username\' = \'kafka\',\\n\" +
\" \'password\' = \'Bonc123\'\\n\" +
\" )\";
String kafka_sink_sql=
\"create table kafka_sink (id string,name string) with (\" +
\" \'connector\' = \'kafka\',\" +
\" \'topic\' = \'test_info_2\',\" +
\" \'properties.bootstrap.servers\' = \'localhost:9092\',\" +
\" \'format\' = \'json\')\";
tableEnv.executeSql(mysql_sql);
//tableEnv.executeSql(kafka_sink_sql);
//tableEnv.executeSql(sql);
//插入数据的sql语句
//tableEnv.executeSql(\"insert into print_table select * from sensor_source\");
tableEnv.executeSql(\"insert into mysql_sink select * from sensor_source\");
//tableEnv.executeSql(\"insert into kafka_sink select * from sensor_source\");
}
}
运行之后mysql里面数据就有了
三、Flink 实时计算平台
依据上面的代码,我们可以抽象出一层Flink 实时计算平台。
文章来源于诸葛子房