Need help with FlinkSQL?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.

About the developer

ambition119
196 Stars 150 Forks Apache License 2.0 5 Commits 7 Opened issues

Description

仿照阿里blink使用sql开发flink的实时程序

Services available

!
?

Need anything else?

Contributors list

# 93,339
Shell
C++
C
pubsub
3 commits

一.背景

阿里工作的时候是使用Blink进行流数据处理和计算,通过编写sql实现Blink的计算job,开发简单高效,产品易用。 目前尝试实现Flink产品化,类似Blink。使用SQL为统一开发规范,SQL语言的好处是:声明式,易理解,稳定可靠,自动优化。 如果采用API开发的话,最大的问题是对于job调优依赖程序员经验,比较困难,同时API开发方式侵入性太强(数据安全,集群安全等),而sql可以自动调优,避免这种问题的产生。

二.实现思路:

用户输入sql(ddl,query,dml)  -> ddl对应为Flink的source和sink


                        -> query/dml的insert into数据处理和计算

--> 封装为对应Flink的Job:env.sqlQuery/env.sqlUpdate

--> JobGraph和对应job提交,ClusterClient.submitJob或者ClusterDescriptor.deployJobCluster

三.发布版本:

v3.0.0 2020年1月

     1.使用flink 1.10版本
           1.10之前的版本自带的sql解析功能不完善,如解析function,watermark等,所以比较鸡肋,还不如不用更换以前开发的解析层功能。
     2.使用新接口ClusterClient.submitJob提交job
     3.
     4.

新特性

      1. flink自带的sql解析
      2. 使用新的job提交接口
      2. 流批处理一体化实现
      3. 钉钉/微信告警通知        

v2.0.1
v2.0.0 2019年4月

       blink-client    接口定义
       blink-sql/calcite   stream和batch table的sql解析
       blink-libraries   自定义source, sink, side开发
       blink-batch   BatchTableSource和BatchTableSink
       blink-stream  StreamTableSource和StreamTableSink
       blink-job   batch/stream job 提交

v2.0.1新特性
v2.0.0新特性

    1. 抽取sql层被流和批使用,SQL参考flink issues和对应提供的doc
    2. 增加批处理开发
    3. 增加维表功能
    4. 升级flink版本为1.7.x

v1.0.0 2018年7月

      blink-client 接口定义
      blink-sqlserver  stream table的sql解析
      blink-job  封装为stream job    

新特性

      1. 实现create function
      2. 实现sql开发流处理程序任务  
      3. 更改源码实现sql CEP     

四.样例

v3.0.0 sql开发流任务示例:


v2.0.1 sql开发流任务示例:

batch sql示例: ```sql CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';

CREATE TABLE csv_source ( id int, name varchar,

date
date , age int ) with ( type=source, connect.type=json, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo.json' );

CREATE TABLE csvsink (

date
date, age int, PRIMARY KEY (
date
) ) with ( type=sink, connect.type=csv, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo
out.csv' );

create view viewselect as
SELECT

date
, age FROM csvsource group by

date
,age ;

INSERT INTO csvsink SELECT

date
, sum(age) FROM viewselect group by

date
;
stream sql 示例:
```sql
CREATE FUNCTION demouf AS 
      'ambition.api.sql.function.DemoUDF' 
LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';

CREATE TABLE kafka_source ( date varchar, amount float, proctime timestamp ) with ( type=source, 'connect.type'=kafka, 'flink.parallelism'=1, 'kafka.topic'=topic, 'kafka.group.id'=flinks, 'kafka.enable.auto.commit'=true, 'kafka.bootstrap.servers'='localhost:9092' );

CREATE TABLE mysql_sink ( date varchar, total_amount float, PRIMARY KEY (date) ) with ( type=mysql, 'connect.type'=mysql, 'mysql.connection'='localhost:3306', 'mysql.db.name'=flink, 'mysql.batch.size'=10, 'mysql.table.name'=flink_table, 'mysql.user'=root, 'mysql.pass'=root );

CREATE VIEW view_select AS SELECT date, amount FROM kafka_source GROUP BY date, amount ;

INSERT INTO mysql_sink SELECT date, sum(amount) as total_amount FROM view_select GROUP BY date ;

v2.0.0 sql开发流任务示例:

batch sql示例: ```sql CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';

CREATE SOURCE TABLE json_source ( id int, name varchar,

date
date , age int ) with ( type=json, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo.json' );

CREATE SINK TABLE csvsink (

date
date, totalage int ) with ( type=csv, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo_out.csv' );

CREATE VIEW viewselect as
SELECT

date
, age FROM jsonsource GROUP BY

date
,age;

INSERT INTO csvsink SELECT

date
, sum(age) as totalage FROM view_select GROUP BY

date
;
stream sql 示例:
```sql
CREATE FUNCTION demouf AS 
      'ambition.api.sql.function.DemoUDF' 
LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar';

CREATE SOURCE TABLE kafka_source ( date varchar, amount float, proctime timestamp ) with ( type=kafka, 'flink.parallelism'=1, 'kafka.topic'=topic, 'kafka.group.id'=flinks, 'kafka.enable.auto.commit'=true, 'kafka.bootstrap.servers'='localhost:9092' );

CREATE SINK TABLE mysql_sink ( date varchar, total_amount float, PRIMARY KEY (date) ) with ( type=mysql, 'mysql.connection'='localhost:3306', 'mysql.db.name'=flink, 'mysql.batch.size'=10, 'mysql.table.name'=flink_table, 'mysql.user'=root, 'mysql.pass'=root );

CREATE VIEW view_select AS SELECT date, amount FROM kafka_source GROUP BY date, amount ;

INSERT INTO mysql_sink SELECT date, sum(amount) as total_amount FROM view_select GROUP BY date ;

v1.0.0 sql开发流任务示例:

CREATE FUNCTION demouf AS 
      'ambition.api.sql.function.DemoUDF' 
USING JAR 'hdfs://flink/udf/jedis.jar',
      JAR 'hdfs://flink/udf/customudf.jar';


CREATE TABLE kafka_source ( date string, amount float, proctime timestamp ) with ( type=kafka, 'flink.parallelism'=1, 'kafka.topic'=topic, 'kafka.group.id'=flinks, 'kafka.enable.auto.commit'=true, 'kafka.bootstrap.servers'='localhost:9092' ); CREATE TABLE mysql_sink ( date string, amount float, PRIMARY KEY (date,amount) ) with ( type=mysql, 'mysql.connection'='localhost:3306', 'mysql.db.name'=flink, 'mysql.batch.size'=0, 'mysql.table.name'=flink_table, 'mysql.user'=root, 'mysql.pass'=root ); CREATE VIEW view_select AS SELECT date, amount FROM kafka_source GROUP BY date, amount ; INSERT INTO mysql_sink SELECT date, sum(amount) FROM view_select GROUP BY date ;

五.代码关注

apache flink

apache calcite

uber AthenaX

DTStack flinkStreamSQL

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.