仿照阿里blink使用sql开发flink的实时程序
一.背景
阿里工作的时候是使用Blink进行流数据处理和计算,通过编写sql实现Blink的计算job,开发简单高效,产品易用。 目前尝试实现Flink产品化,类似Blink。使用SQL为统一开发规范,SQL语言的好处是:声明式,易理解,稳定可靠,自动优化。 如果采用API开发的话,最大的问题是对于job调优依赖程序员经验,比较困难,同时API开发方式侵入性太强(数据安全,集群安全等),而sql可以自动调优,避免这种问题的产生。
二.实现思路:
用户输入sql(ddl,query,dml) -> ddl对应为Flink的source和sink-> query/dml的insert into数据处理和计算
--> 封装为对应Flink的Job:env.sqlQuery/env.sqlUpdate
--> JobGraph和对应job提交,ClusterClient.submitJob或者ClusterDescriptor.deployJobCluster
三.发布版本:
v3.0.0 2020年1月
1.使用flink 1.10版本 1.10之前的版本自带的sql解析功能不完善,如解析function,watermark等,所以比较鸡肋,还不如不用更换以前开发的解析层功能。 2.使用新接口ClusterClient.submitJob提交job 3. 4.
1. flink自带的sql解析 2. 使用新的job提交接口 2. 流批处理一体化实现 3. 钉钉/微信告警通知
blink-client 接口定义 blink-sql/calcite stream和batch table的sql解析 blink-libraries 自定义source, sink, side开发 blink-batch BatchTableSource和BatchTableSink blink-stream StreamTableSource和StreamTableSink blink-job batch/stream job 提交
1. 抽取sql层被流和批使用,SQL参考flink issues和对应提供的doc 2. 增加批处理开发 3. 增加维表功能 4. 升级flink版本为1.7.x
v1.0.0 2018年7月
blink-client 接口定义 blink-sqlserver stream table的sql解析 blink-job 封装为stream job
1. 实现create function 2. 实现sql开发流处理程序任务 3. 更改源码实现sql CEP
四.样例
batch sql示例: ```sql CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';
CREATE TABLE csv_source ( id int, name varchar,
datedate , age int ) with ( type=source, connect.type=json, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo.json' );
CREATE TABLE csvsink (
datedate, age int, PRIMARY KEY (
date) ) with ( type=sink, connect.type=csv, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demoout.csv' );
create view viewselect as
SELECT
date, age FROM csvsource group by
date,age ;
INSERT INTO csvsink SELECT
date, sum(age) FROM viewselect group by
date;
stream sql 示例: ```sql CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';CREATE TABLE kafka_source (
date
varchar, amount float, proctime timestamp ) with ( type=source, 'connect.type'=kafka, 'flink.parallelism'=1, 'kafka.topic'=topic, 'kafka.group.id'=flinks, 'kafka.enable.auto.commit'=true, 'kafka.bootstrap.servers'='localhost:9092' );CREATE TABLE mysql_sink (
date
varchar, total_amount float, PRIMARY KEY (date
) ) with ( type=mysql, 'connect.type'=mysql, 'mysql.connection'='localhost:3306', 'mysql.db.name'=flink, 'mysql.batch.size'=10, 'mysql.table.name'=flink_table, 'mysql.user'=root, 'mysql.pass'=root );CREATE VIEW view_select AS SELECT
date
, amount FROM kafka_source GROUP BYdate
, amount ;INSERT INTO mysql_sink SELECT
date
, sum(amount) as total_amount FROM view_select GROUP BYdate
;
batch sql示例: ```sql CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';
CREATE SOURCE TABLE json_source ( id int, name varchar,
datedate , age int ) with ( type=json, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo.json' );
CREATE SINK TABLE csvsink (
datedate, totalage int ) with ( type=csv, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo_out.csv' );
CREATE VIEW viewselect as
SELECT
date, age FROM jsonsource GROUP BY
date,age;
INSERT INTO csvsink SELECT
date, sum(age) as totalage FROM view_select GROUP BY
date;
stream sql 示例: ```sql CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';CREATE SOURCE TABLE kafka_source (
date
varchar, amount float, proctime timestamp ) with ( type=kafka, 'flink.parallelism'=1, 'kafka.topic'=topic, 'kafka.group.id'=flinks, 'kafka.enable.auto.commit'=true, 'kafka.bootstrap.servers'='localhost:9092' );CREATE SINK TABLE mysql_sink (
date
varchar, total_amount float, PRIMARY KEY (date
) ) with ( type=mysql, 'mysql.connection'='localhost:3306', 'mysql.db.name'=flink, 'mysql.batch.size'=10, 'mysql.table.name'=flink_table, 'mysql.user'=root, 'mysql.pass'=root );CREATE VIEW view_select AS SELECT
date
, amount FROM kafka_source GROUP BYdate
, amount ;INSERT INTO mysql_sink SELECT
date
, sum(amount) as total_amount FROM view_select GROUP BYdate
;
CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' USING JAR 'hdfs://flink/udf/jedis.jar', JAR 'hdfs://flink/udf/customudf.jar';CREATE TABLE kafka_source (
date
string, amount float, proctime timestamp ) with ( type=kafka, 'flink.parallelism'=1, 'kafka.topic'=topic, 'kafka.group.id'=flinks, 'kafka.enable.auto.commit'=true, 'kafka.bootstrap.servers'='localhost:9092' ); CREATE TABLE mysql_sink (date
string, amount float, PRIMARY KEY (date
,amount) ) with ( type=mysql, 'mysql.connection'='localhost:3306', 'mysql.db.name'=flink, 'mysql.batch.size'=0, 'mysql.table.name'=flink_table, 'mysql.user'=root, 'mysql.pass'=root ); CREATE VIEW view_select AS SELECTdate
, amount FROM kafka_source GROUP BYdate
, amount ; INSERT INTO mysql_sink SELECTdate
, sum(amount) FROM view_select GROUP BYdate
;
五.代码关注