DataStream API主要分为3块: DataSource、Transformation、Sink
- DataSource 是程序的数据源输入
- Transformation是具体的操作,它对一个或多个输入数据源进行计算处理
- Sink是程序输出,它可以把Transformation处理之后的数据输出到指定的存储介质中去
DataSource
Flink对DataSource提供了较多已经实现的接口,如:
-
基于文件
readTextFile(path)
读取文本文件,文件遵循TextInputFormat逐行读取规则并返回
-
基于Socket
socketTextStream
从Socket中读取数据,元素可以通过指定的分隔符进行分割
-
基于集合
fromCollection(Collection)
通过JAVA的Collection集合创建一个数据流,集合中所有元素必须相同类型
-
自定义输入
addSource可以实现读取第三方数据源的数据。
Flink提供内置的Connector。Connector会提供对应的Source支持,常用的如下
连接器 是否提供Source支持 是否提供Sink支持 Kafka 是 是 ES 否 是 Hadoop FileSystem 否 是 RabbitMQ 是 是 同时可以使用第三方Apache Bahir组件中提供的 Connector, 如下
连接器 是否提供Source支持 是否提供Sink支持 ActiveMQ 是 是 Flume 否 是 Redis 否 是 Akka 否 是 Netty 是 否 Flink提供的DataSource接口容错性保障如下
DataSource 语义保证 说明 File Exactly-once Collection Exactly-once Socket At-most-once Kafka Exactly-once 自定义数据源主要有两种方式:
-
通过实现SourceFunction接口定义无并行度(并行度只能为1)的数据源
-
通过实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来定义有并行度的数据源
自定义实战:
-
模拟产生从1开始的递增数字,每次加1(并行度只能为1)
package engineer.j.streaming; import org.apache.flink.streaming.api.functions.source.SourceFunction; /** * @author : engineer * @Project: flinkExample * @Package engineer.j.streaming * @Description: 自定义并行度为1的Source SourceFunction和SourceContext都需要制定数据类型,如果不指定,运行会抛异常 * @date Date : 2020-07-04 16:53 */ public class MyNoParalleSource implements SourceFunction<Long> { private long count = 1; private boolean isRunning = true; @Override public void run(SourceContext<Long> sourceContext) throws Exception { while (isRunning){ sourceContext.collect(count); count ++; Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } }
-
模拟产生从1开始的递增数字,每次加1(多并行度的自定义DataSource实现ParallelSourceFunction接口)
package engineer.j.streaming; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; /** * @author : engineer * @Project: flinkExample * @Package engineer.j.streaming * @Description: 自定义实现一个多并行度的source * @date Date : 2020-07-04 17:03 */ public class MyParalleSource implements ParallelSourceFunction<Long> { private long count = 1; private boolean isRunning = true; @Override public void run(SourceContext<Long> sourceContext) throws Exception { while (isRunning){ sourceContext.collect(count); count ++; Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } }
-
模拟产生从1开始的递增数字,每次加1(多并行度的自定义DataSource继承RichParallelSourceFunction类)
package engineer.j.streaming; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; /** * @author : engineer * @Project: flinkExample * @Package engineer.j.streaming * @Description: RichParallelSourceFunction会额外提供open和close方法,如果在source中需要获取其他链接资源,可以在open方法中打开,在close方法中关闭 * @date Date : 2020-07-04 17:07 */ public class MyRichParalleSource extends RichParallelSourceFunction<Long> { private long count = 1; private boolean isRunning = true; @Override public void run(SourceContext<Long> sourceContext) throws Exception { while (isRunning){ sourceContext.collect(count); count ++; Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public void close() throws Exception { super.close(); } }
-
Transformation
Flink对DataSource提供了大量已实现的算子
-
Map: 输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作
-
FlatMap: 输入一个元素,可以返回零个、一个或者多个元素
-
Filter: 过滤函数,对传入的数据进行判断,复合条件的数据留下
-
KeyBy: 根据指定的key进行分组,key相同的数据会进入同一个分区,常用示例如下
DataStream.keyBy("someKey") 指定对象中的someKey段作为分组key DataStream.keyBy(0) 指定Tuple中的第一个元素作为分组key
-
Reduce: 对数据进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值
-
Aggregations: sum()、min()、max()等
-
Union: 合并多个流,新的流会包含所有流中的数据,但Union有一个限制,就是所有合并的流类型必须一致
-
Connect: 和Union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法
-
coMap和coFlatMap: 在ConnectedSteam中需要使用这种函数,类似Map和FlatMap
-
Split: 根据规则把一个数据流切分成多个流
-
Select: 和Split配合使用,选择切分后的流
Flink针对DataStream提供了一些数据分区规则
-
Random partitioning: 随机分区
DataStream.shuffle()
-
Rebalancing: 对数据集进行在平衡、重分区和消除数据倾斜
DataStream.rebalance()
-
Rescaling: 重新调节
DataStream.rescale()
Rescaling与Rebalancing的区别为Rebalancing会产生全量重分区,而Rescaling不会。
-
Custom partitioning: 自定义分区
DataStream.partitionCustom(partitioner,"someKey") 或者 DataStream.partitionCustom(partitioner,0)
自定义实战:
-
根据数字的奇偶性来分区(通过自定义分区规则)
package engineer.j.streaming; import org.apache.flink.api.common.functions.Partitioner; /** * @author : engineer * @Project: flinkExample * @Package engineer.j.streaming * @Description: 根据数值的奇偶性分区 * @date Date : 2020-07-04 17:57 */ public class MyPartition implements Partitioner<Long> { @Override public int partition(Long aLong, int i) { System.out.println("分区总数:" + i); if (aLong % 2 ==0){ return 0; }else{ return 1; } } }
package engineer.j.streaming; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author : engineer * @Project: flinkExample * @Package engineer.j.streaming * @Description: 使用自定义的分区 * @date Date : 2020-07-04 17:59 */ public class StreamingDemoWithMyPartition { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()); // 对数据进行转换,把Long类型转换成Tuple类型 DataStream<Tuple1<Long>> tupleDataStream = text.map(new MapFunction<Long, Tuple1<Long>>() { @Override public Tuple1<Long> map(Long aLong) throws Exception { return new Tuple1<>(aLong); } }); //分区后的数据 DataStream<Tuple1<Long>> partitionData = tupleDataStream.partitionCustom(new MyPartition(),0); DataStream<Long> result = partitionData.map(new MapFunction<Tuple1<Long>, Long>() { @Override public Long map(Tuple1<Long> longTuple1) throws Exception { System.out.println("当前线程id:" + Thread.currentThread().getId() + ",value=" + longTuple1); return longTuple1.getField(0); } }); result.print().setParallelism(1); env.execute("StreamingDemoWithMyPartition"); } }
-
Sink
Flink针对DataStream提供了大量的已经实现的数据Sink,如下:
-
writeAsText() : 将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
-
print() / printToErr(): 打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
-
自定义输出: addSink可以实现把数据输出到第三方存储介质中
Flink提供了内置的Connector和第三方组件Bahir对sink的支持见DataSource自定义输入表格说明。
Flink提供的Sink接口容错性保障如下
DataSource 语义保证 说明 HDFS Exactly-once File At-least-once Redis At-least-once ES At-least-once Kafka At-least-once/Exactly-once Kafka 0.9/0.10 提供At-least-once Kafka 0.11 提供Exactly-once Flink 自定义Sink有两种实现方式:
- 实现SinkFunction接口
- 继承RichSinkFunction接口
自定义Sink代码实现建议参考RedisSink类(org.apache.Flink.streaming.connectors.redis.RedisSink)。
自定义实战(以RedisSink为例的用法):
-
接受Socket传输的数据保存到Redis中
package engineer.j.streaming; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; /** * @author : engineer * @Project: flinkExample * @Package engineer.j.streaming * @Description: * @date Date : 2020-07-04 18:34 */ public class StreamingDemoToRedis { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9000,"\n"); //对数据进行封装,把String转化为Tuple2<String,String> DataStream<Tuple2<String,String>> l_wordData = text.map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { return new Tuple2<>("l_words",s); } }); //创建RedisSink的配置 FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("192.168.100.59").setPort(6379).build(); //创建RedisSink RedisSink<Tuple2<String,String>> redisSink = new RedisSink<>(redisConf, new MyRedisMapper()); l_wordData.addSink(redisSink); env.execute("StreamingDemoToRedis"); } public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>>{ @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.LPUSH); } /** * 从接收的数据中获取需要操作的redis key * @param stringStringTuple2 * @return */ @Override public String getKeyFromData(Tuple2<String, String> stringStringTuple2) { return stringStringTuple2.f0; } /** * 从接收的数据中获取需要操作的redis value * @param stringStringTuple2 * @return */ @Override public String getValueFromData(Tuple2<String, String> stringStringTuple2) { return stringStringTuple2.f1; } } }