懒松鼠Flink-Boot 脚手架让Flink全面拥抱Spring生态体系,使得开发者可以以Java WEB开发模式开发出分布式运行的流处理程序,懒松鼠让跨界变得更加简单。懒松鼠旨在让开发者以更底上手成本(不需要理解分布式计算的理论知识和Flink框架的细节)便可以快速编写业务代码实现。为了进一步提升开发者使用懒松鼠脚手架开发大型项目的敏捷的度,该脚手架默认集成Spring框架进行Bean管理,同时将微服务以及WEB开发领域中经常用到的框架集成进来,进一步提升开发速度。比如集成Mybatis ORM框架,Hibernate Validator校验框架,Spring Retry重试框架等,具体见下面的脚手架特性。
No Data
你的现状
static Map cache=new HashMap();public String findUUID(FlowData flowData) { String value=cache.get(flowData.getSubTestItem()); if(value==null) { String uuid=userMapper.findUUID(flowData); cache.put(uuid,value); return uuid; } return value; }
你想要的是这样
@Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem") public String findUUID(FlowData flowData) { return userMapper.findUUID(flowData); }
你的现状
public void insertFlow(FlowData flowData) { try{ userMapper.insertFlow(flowData); }Cache(Exception e) { Thread.sleep(10000); userMapper.insertFlow(flowData); } }
你想要的是这样
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5)) @Override public void insertFlow(FlowData flowData) { userMapper.insertFlow(flowData); }
你的现状
if(flowData.getSubTestItem().length()<2&&flowData.getSubTestItem().length()>7) { return null; } if(flowData.getBillNumber()==null) { return null; }
你想要的是这样
Map validate = ValidatorUtil.validate(flowData); if (validate != null) { System.out.println(validate); return null; }public class FlowData {
private String uuid; //声明该参数的校验规则字符串长度必须在7到20之间 @Size(min = 7, max = 20, message = "长度必须在{min}-{max}之间") private String subTestItem; //声明该参数的校验规则字符串不能为空 @NotBlank(message = "billNumber不能为空") private String billNumber;
}
Flink-Boot ├── Flink-Base -- Flink-Boot工程基础模块/Engineering basic module ├── Flink-Client -- Flink-Boot 客户端模块/Client module ├── flink-annotation -- 注解生效模块/Annotation effective module ├── flink-mybatis -- mybatis orm模块/mybatis orm module ├── flink-retry -- 注解重试机制模块/Annotation retry mechanism module ├── flink-validate -- 校验模块/validate module ├── flink-sql -- Flink SQL解耦至XML配置模块/SQL decoupling to XML configuration module ├── flink-cache-annotation -- 接口缓冲模块/Interface buffer module ├── flink-dubbo-comsumer -- Dubbo 消费组模块/Dubbo comsumer module ├── flink-other-service -- 组件原生运行模块 ├── flink-junit -- 单元测试模块/Unit test module ├── flink-apollo -- 阿波罗配置客户端模块/Apollo configuration client module
技术 |
名称 | 状态 |
---|---|---|
Spring Framework | 容器 | 已集成 |
Spring 基于XML方式配置Bean | 装配Bean | 已集成 |
Spring 基于注解方式配置Bean | 装配Bean | 已集成 |
Spring 基于注解声明方法重试机制 | Retry注解 | 已集成 |
Spring 基于注解声明方法缓存 | Cache注解 | 已集成 |
Hibernate Validator | 校验框架 | 已集成 |
Dubbole消费者 | 服务消费者 | 已集成 |
Druid | 数据库连接池 | 已集成 |
MyBatis | ORM框架 | 已集成 |
Kafka | 消息队列 | 已集成 |
HDFS | 分布式文件系统 | 已集成 |
Log4J | 日志组件 | 已集成 |
Junit | 单元测试 | 已集成 |
Mybatis-Plus | MyBatis扩展包 | 进行中 |
PageHelper | MyBatis物理分页插件 | 进行中 |
ZooKeeper | 分布式协调服务 | 进行中 |
Dubbo | 分布式服务框架 | 进行中 |
Redis | 分布式缓存数据库 | 进行中 |
Solr & Elasticsearch | 分布式全文搜索引擎 | 进行中 |
Ehcache | 进程内缓存框架 | 进行中 |
sequence | 分布式高效ID生产 | 进行中 |
Spring eurake消费者 | 服务消费者 | 进行中 |
Apollo配置中心 | 携程阿波罗配置中心 | 进行中 |
Spring Config配置中心 | Spring Cloud Config配置中心 | 进行中 |
下面是集成Spring生态的基础手册,加作者微信号获取更详细的开发手册,当然技术过硬自己摸索也只需3小时即可上手所有模块。 1. 生活不易,脑力代码不易,尊重劳动成果,可打赏博主 ~~19.9~~ ~~29.9~~ 36.9元即可获得懒松鼠Flink-Boot相关核心配置文件(以及后续新特性集成代码)。 2. 公开版仅提供了Flink与以上Spring组件集成的所有代码,仅提供Flink与Spring基础集成的配置文件,其他组件的配置文件未提供,一般来说,自行研究框架3小时即可搞定。 3. 也可以选择不打赏博主,懒松鼠Flink-Boot公开了与Spring生产集成的所有代码,仅相关配置文件未公开,一般来说,自行研究框架3小时即可搞定。 4. ~~19.9~~ ~~29.9~~ 36.8元的打赏不是为了挣钱,只是为了让博主看到这个项目的价值有继续迭代的动力,为后续打造一级的开源项目做贡献。 微信号:intsmaze 微信二维码无法显示可跳转该页面扫码,微信转账即可
Flink框架的技术咨询可添加微信进行咨询。
会员享受功能: 1. 框架的详细手册和配置文件 2. 可以免费获取后续新增功能 3. 可以提想要集成的框架,我会根据是否有必要在一个月内集成 4. 框架使用上有问题,我会跟踪解决(PS:因为环境问题导致的不在售后范围) 5. 一杯星巴克的钱,省去三小时自我琢磨
该容器模式配置了JdbcTemplate实例,数据库连接池采用Druid,在业务方法中只需要获取容器中的JdbcTemplate实例便可以快速与关系型数据库进行交互,dataService实例封装了一些访问数据库表的方法。
<property-placeholder location="classpath:config.properties"></property-placeholder> <bean id="druidDataSource" class="com.alibaba.druid.pool.DruidDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver"></property> <property name="url" value="${jdbc.url}"></property> <property name="username" value="${jdbc.user}"></property> <property name="password" value="${jdbc.password}"></property> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <constructor-arg ref="druidDataSource"></constructor-arg> </bean> <bean id="dataService" class="com.intsmaze.flink.base.service.DataService"> <property name="jdbcTemplate" ref="jdbcTemplate"></property> </bean>
jdbc.user = intsmaze jdbc.password = intsmaze jdbc.url = jdbc:mysql://127.0.0.1:3306/flink-boot?useUnicode=true&characterEncoding=UTF-8
如下是SimpleClient(com.intsmaze.flink.client.SimpleClient)类的示例代码,该类继承了BaseFlink,可以看到对应实现的方法中分别设置如下:
/** * github地址: https://github.com/intsmaze * 博客地址:https://www.cnblogs.com/intsmaze/ * 出版书籍《深入理解Flink核心设计与实践原理》 * * @auther: intsmaze(刘洋) * @date: 2020/10/15 18:33 */ public class SimpleClient extends BaseFlink {public static void main(String[] args) throws Exception { SimpleClient topo = new SimpleClient(); topo.run(ParameterTool.fromArgs(args)); } @Override public String getTopoName() { return "SimpleClient"; } @Override public String getConfigName() { return "topology-base.xml"; } @Override public String getPropertiesName() { return "config.properties"; } @Override public void createTopology(StreamExecutionEnvironment builder) { DataStream<string> inputDataStrem = env.addSource(new SimpleDataSource()); DataStream<string> processDataStream = inputDataStrem.flatMap(new SimpleFunction()); processDataStream.print("输出结果"); }
}
采用自定义数据源,用户需要编写自定义DataSource类,该类需要继承XXX抽象类,实现如下方法。
public class SimpleDataSource extends CommonDataSource {private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); ...... @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ...//构造读取各类外部系统数据的连接实例 } @Override public String sendMess() throws InterruptedException { Thread.sleep(1000); ...... MainData mainData = new MainData(); ......//通过外部系统数据的连接实例读取外部系统数据,封装进MainData对象中,然后返回即可。 return gson.toJson(mainData); }
}
本作业计算的业务逻辑在Flink转换操作符中进行实现,一般来说开发者只需要实现flatMap算子即可以满足大部分算子的使用。
用户编写的自定义类需要继承com.intsmaze.flink.base.transform.CommonFunction抽象类,均需实现如下方法。
public class SimpleFunction extends CommonFunction {private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create(); @Override public String execute(String message) throws Exception { FlowData flowData = gson.fromJson(message, new TypeToken<flowdata>() { }.getType()); String flowUUID = dataService.findUUID(flowData); if (StringUtils.isBlank(flowUUID)) { flowUUID = UUID.randomUUID().toString(); flowData.setUuid(flowUUID); dataService.insertFlow(flowData); } return gson.toJson(flowData); }
}
CommonFunction抽象类中默认在open方法中通过BeanFactory对象获取到了Spring容器中对于的dataService实例,对于Spring中的其他实例同理在SimpleFunction类中的open方法中获取即可。
public abstract class CommonFunction extends RichFlatMapFunction {private IntCounter numLines = new IntCounter(); protected DataService dataService; protected ApplicationContext beanFactory; @Override public void open(Configuration parameters) { getRuntimeContext().addAccumulator("num-FlatMap", this.numLines); ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters); dataService = beanFactory.getBean(DataService.class); } @Override public void flatMap(String value, Collector<string> out) throws Exception { this.numLines.add(1); String execute = execute(value); if (StringUtils.isNotBlank(execute)) { out.collect(execute); } } public abstract String execute(String message) throws Exception;
}
可以根据情况选择重写open(Configuration parameters)方法,同时重写的open(Configuration parameters)方法的第一行要调用父类的open(Configuration parameters)方法。
public void open(Configuration parameters){ super.open(parameters); ...... //获取在Spring配置文件中配置的实例 XXX xxx=beanFactory.getBean(XXX.class); }
在自定义的Topology类编写Main方法,创建自定义的Topology对象后,调用对象的run(...)方法。
public class SimpleClient extends BaseFlink {
/** * 本地启动参数 -isLocal local * 集群启动参数 -isIncremental isIncremental */ public static void main(String[] args) throws Exception { SimpleClient topo = new SimpleClient(); topo.run(ParameterTool.fromArgs(args)); }.......
演示地址: 框架快速演示视频
首先谢谢大家支持,如果你希望参与开发,欢迎通过Github上fork本项目,并Pull Request您的commit。
下面是集成Spring生态的基础手册,加作者微信号获取更详细的开发手册,当然技术过硬自己摸索也只需3小时即可上手所有模块。 1. 生活不易,脑力代码不易,尊重劳动成果,可打赏博主 ~~19.9~~ ~~29.9~~ 36.9元即可获得懒松鼠Flink-Boot相关核心配置文件(以及后续新特性集成代码)。 2. 公开版仅提供了Flink与以上Spring组件集成的所有代码,仅提供Flink与Spring基础集成的配置文件,其他组件的配置文件未提供,一般来说,自行研究框架3小时即可搞定。 3. 也可以选择不打赏博主,懒松鼠Flink-Boot公开了与Spring生产集成的所有代码,仅相关配置文件未公开,一般来说,自行研究框架3小时即可搞定。 4. ~~19.9~~ ~~29.9~~ 36.8元的打赏不是为了挣钱,只是为了让博主看到这个项目的价值有继续迭代的动力,为后续打造一级的开源项目做贡献。 微信号:intsmaze 微信二维码无法显示可跳转该页面扫码,微信转账即可
Flink框架的技术咨询可添加微信进行咨询。
会员享受功能: 1. 框架的详细手册和配置文件 2. 可以免费获取后续新增功能 3. 可以提想要集成的框架,我会根据是否有必要在一个月内集成 4. 框架使用上有问题,我会跟踪解决(PS:因为环境问题导致的不在售后范围) 5. 一杯星巴克的钱,省去三小时自我琢磨