Flink DataStream的常用API

Flink DataStream的常用API

DataStream API主要分为3块: DataSource、Transformation、Sink

  • DataSource 是程序的数据源输入
  • Transformation是具体的操作,它对一个或多个输入数据源进行计算处理
  • Sink是程序输出,它可以把Transformation处理之后的数据输出到指定的存储介质中去

DataSource

Flink对DataSource提供了较多已经实现的接口,如:

  • 基于文件

    readTextFile(path)
    

    读取文本文件,文件遵循TextInputFormat逐行读取规则并返回

  • 基于Socket

    socketTextStream
    

    从Socket中读取数据,元素可以通过指定的分隔符进行分割

  • 基于集合

    fromCollection(Collection)
    

    通过JAVA的Collection集合创建一个数据流,集合中所有元素必须相同类型

  • 自定义输入

    addSource可以实现读取第三方数据源的数据。

    Flink提供内置的Connector。Connector会提供对应的Source支持,常用的如下

    连接器是否提供Source支持是否提供Sink支持
    Kafka
    ES
    Hadoop FileSystem
    RabbitMQ

    同时可以使用第三方Apache Bahir组件中提供的 Connector, 如下

    连接器是否提供Source支持是否提供Sink支持
    ActiveMQ
    Flume
    Redis
    Akka
    Netty

    Flink提供的DataSource接口容错性保障如下

    DataSource语义保证说明
    FileExactly-once
    CollectionExactly-once
    SocketAt-most-once
    KafkaExactly-once

    自定义数据源主要有两种方式:

    • 通过实现SourceFunction接口定义无并行度(并行度只能为1)的数据源

    • 通过实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来定义有并行度的数据源

    自定义实战:

    • 模拟产生从1开始的递增数字,每次加1(并行度只能为1)

      package engineer.j.streaming;
      
      import org.apache.flink.streaming.api.functions.source.SourceFunction;
      
      /**
       * @author : engineer
       * @Project: flinkExample
       * @Package engineer.j.streaming
       * @Description: 自定义并行度为1的Source SourceFunction和SourceContext都需要制定数据类型,如果不指定,运行会抛异常
       * @date Date : 2020-07-04 16:53
       */
      public class MyNoParalleSource  implements SourceFunction<Long> {
      
          private long count = 1;
          private  boolean isRunning = true;
          @Override
          public void run(SourceContext<Long> sourceContext) throws Exception {
              while (isRunning){
                  sourceContext.collect(count);
                  count ++;
                  Thread.sleep(1000);
              }
          }
      
          @Override
          public void cancel() {
              isRunning = false;
          }
      }
      
    • 模拟产生从1开始的递增数字,每次加1(多并行度的自定义DataSource实现ParallelSourceFunction接口)

      package engineer.j.streaming;
      
      import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
      
      /**
       * @author : engineer
       * @Project: flinkExample
       * @Package engineer.j.streaming
       * @Description: 自定义实现一个多并行度的source
       * @date Date : 2020-07-04 17:03
       */
      public class MyParalleSource implements ParallelSourceFunction<Long> {
      
          private long count = 1;
          private  boolean isRunning = true;
      
          @Override
          public void run(SourceContext<Long> sourceContext) throws Exception {
              while (isRunning){
                  sourceContext.collect(count);
                  count ++;
                  Thread.sleep(1000);
              }
          }
      
          @Override
          public void cancel() {
              isRunning = false;
          }
      }
      
    • 模拟产生从1开始的递增数字,每次加1(多并行度的自定义DataSource继承RichParallelSourceFunction类)

      package engineer.j.streaming;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
      
      /**
       * @author : engineer
       * @Project: flinkExample
       * @Package engineer.j.streaming
       * @Description: RichParallelSourceFunction会额外提供open和close方法,如果在source中需要获取其他链接资源,可以在open方法中打开,在close方法中关闭
       * @date Date : 2020-07-04 17:07
       */
      public class MyRichParalleSource extends RichParallelSourceFunction<Long> {
      
          private long count = 1;
          private  boolean isRunning = true;
      
          @Override
          public void run(SourceContext<Long> sourceContext) throws Exception {
              while (isRunning){
                  sourceContext.collect(count);
                  count ++;
                  Thread.sleep(1000);
              }
          }
      
          @Override
          public void cancel() {
               isRunning = false;
          }
      
          @Override
          public void open(Configuration parameters) throws Exception {
              super.open(parameters);
          }
      
          @Override
          public void close() throws Exception {
              super.close();
          }
      }
      

Transformation

Flink对DataSource提供了大量已实现的算子

  • Map: 输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作

  • FlatMap: 输入一个元素,可以返回零个、一个或者多个元素

  • Filter: 过滤函数,对传入的数据进行判断,复合条件的数据留下

  • KeyBy: 根据指定的key进行分组,key相同的数据会进入同一个分区,常用示例如下

    DataStream.keyBy("someKey") 指定对象中的someKey段作为分组key
    DataStream.keyBy(0) 指定Tuple中的第一个元素作为分组key
    
  • Reduce: 对数据进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值

  • Aggregations: sum()、min()、max()等

  • Union: 合并多个流,新的流会包含所有流中的数据,但Union有一个限制,就是所有合并的流类型必须一致

  • Connect: 和Union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法

  • coMap和coFlatMap: 在ConnectedSteam中需要使用这种函数,类似Map和FlatMap

  • Split: 根据规则把一个数据流切分成多个流

  • Select: 和Split配合使用,选择切分后的流

Flink针对DataStream提供了一些数据分区规则

  • Random partitioning: 随机分区

    DataStream.shuffle()
    
  • Rebalancing: 对数据集进行在平衡、重分区和消除数据倾斜

    DataStream.rebalance()
    
  • Rescaling: 重新调节

    DataStream.rescale()
    

    Rescaling与Rebalancing的区别为Rebalancing会产生全量重分区,而Rescaling不会。

  • Custom partitioning: 自定义分区

    DataStream.partitionCustom(partitioner,"someKey")
    或者
    DataStream.partitionCustom(partitioner,0)
    

    自定义实战:

    • 根据数字的奇偶性来分区(通过自定义分区规则)

      package engineer.j.streaming;
      
      import org.apache.flink.api.common.functions.Partitioner;
      
      /**
       * @author : engineer
       * @Project: flinkExample
       * @Package engineer.j.streaming
       * @Description: 根据数值的奇偶性分区
       * @date Date : 2020-07-04 17:57
       */
      public class MyPartition implements Partitioner<Long> {
          @Override
          public int partition(Long aLong, int i) {
              System.out.println("分区总数:" + i);
              if (aLong % 2 ==0){
                  return 0;
              }else{
                  return 1;
              }
          }
      }
      
      package engineer.j.streaming;
      
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.java.tuple.Tuple1;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      
      /**
       * @author : engineer
       * @Project: flinkExample
       * @Package engineer.j.streaming
       * @Description: 使用自定义的分区
       * @date Date : 2020-07-04 17:59
       */
      public class StreamingDemoWithMyPartition {
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(2);
      
              DataStreamSource<Long> text = env.addSource(new MyNoParalleSource());
      
              // 对数据进行转换,把Long类型转换成Tuple类型
              DataStream<Tuple1<Long>> tupleDataStream = text.map(new MapFunction<Long, Tuple1<Long>>() {
                  @Override
                  public Tuple1<Long> map(Long aLong) throws Exception {
                      return new Tuple1<>(aLong);
                  }
              });
      
              //分区后的数据
              DataStream<Tuple1<Long>> partitionData = tupleDataStream.partitionCustom(new MyPartition(),0);
      
              DataStream<Long> result = partitionData.map(new MapFunction<Tuple1<Long>, Long>() {
                  @Override
                  public Long map(Tuple1<Long> longTuple1) throws Exception {
                      System.out.println("当前线程id:" + Thread.currentThread().getId() + ",value=" + longTuple1);
                      return  longTuple1.getField(0);
                  }
              });
      
              result.print().setParallelism(1);
      
              env.execute("StreamingDemoWithMyPartition");
          }
      }
      

Sink

​ Flink针对DataStream提供了大量的已经实现的数据Sink,如下:

  • writeAsText() : 将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

  • print() / printToErr(): 打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

  • 自定义输出: addSink可以实现把数据输出到第三方存储介质中

    Flink提供了内置的Connector和第三方组件Bahir对sink的支持见DataSource自定义输入表格说明。

    Flink提供的Sink接口容错性保障如下

    DataSource语义保证说明
    HDFSExactly-once
    FileAt-least-once
    RedisAt-least-once
    ESAt-least-once
    KafkaAt-least-once/Exactly-onceKafka 0.9/0.10 提供At-least-once Kafka 0.11 提供Exactly-once

    Flink 自定义Sink有两种实现方式:

    • 实现SinkFunction接口
    • 继承RichSinkFunction接口

    自定义Sink代码实现建议参考RedisSink类(org.apache.Flink.streaming.connectors.redis.RedisSink)。

    自定义实战(以RedisSink为例的用法):

    • 接受Socket传输的数据保存到Redis中

      package engineer.j.streaming;
      
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.connectors.redis.RedisSink;
      import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
      import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
      import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
      import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
      
      /**
       * @author : engineer
       * @Project: flinkExample
       * @Package engineer.j.streaming
       * @Description:
       * @date Date : 2020-07-04 18:34
       */
      public class StreamingDemoToRedis {
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              DataStreamSource<String> text = env.socketTextStream("127.0.0.1",9000,"\n");
      
              //对数据进行封装,把String转化为Tuple2<String,String>
              DataStream<Tuple2<String,String>>  l_wordData = text.map(new MapFunction<String, Tuple2<String, String>>() {
                  @Override
                  public Tuple2<String, String> map(String s) throws Exception {
                      return new Tuple2<>("l_words",s);
                  }
              });
              //创建RedisSink的配置
              FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("192.168.100.59").setPort(6379).build();
      
              //创建RedisSink
              RedisSink<Tuple2<String,String>> redisSink = new RedisSink<>(redisConf, new MyRedisMapper());
      
              l_wordData.addSink(redisSink);
      
              env.execute("StreamingDemoToRedis");
          }
      
          public  static class  MyRedisMapper implements RedisMapper<Tuple2<String,String>>{
      
              @Override
              public RedisCommandDescription getCommandDescription() {
                  return new RedisCommandDescription(RedisCommand.LPUSH);
              }
      
              /**
               * 从接收的数据中获取需要操作的redis key
               * @param stringStringTuple2
               * @return
               */
              @Override
              public String getKeyFromData(Tuple2<String, String> stringStringTuple2) {
                  return stringStringTuple2.f0;
              }
      
              /**
               * 从接收的数据中获取需要操作的redis value
               * @param stringStringTuple2
               * @return
               */
              @Override
              public String getValueFromData(Tuple2<String, String> stringStringTuple2) {
                  return stringStringTuple2.f1;
              }
          }
      }