Apache Flink - API 概念

Flink 拥有丰富的 API,开发人员可以使用这些 API 对批量数据和实时数据执行转换。 各种转换包括映射、过滤、排序、连接、分组和聚合。 Apache Flink 的这些转换是在分布式数据上执行的。 让我们讨论 Apache Flink 提供的不同 API。

Dataset API

Apache Flink中的Dataset API用于对一段时间内的数据进行批量操作。 该 API 可以在 Java、Scala 和 Python 中使用。 它可以对数据集应用不同类型的转换,例如过滤、映射、聚合、连接和分组。

数据集是从本地文件等源创建的,或者通过从特定源读取文件来创建的,结果数据可以写入不同的接收器(如分布式文件或命令行终端)。 Java 和 Scala 编程语言都支持此 API。

这是Dataset API的Wordcount程序 −

public class WordCountProg {
   public static void main(String[] args) throws Exception {
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      DataSet<String> text = env.fromElements(
      "Hello",
      "My Dataset API Flink Program");

      DataSet<Tuple2<String, Integer>> wordCounts = text
      .flatMap(new LineSplitter())
      .groupBy(0)
      .sum(1);

      wordCounts.print();
   }

   public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
         for (String word : line.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}

DataStream API

该API用于处理连续流中的数据。 您可以对流数据执行各种操作,例如过滤、映射、加窗、聚合。 该数据流有多种来源,例如消息队列、文件、套接字流,并且结果数据可以写入不同的接收器(例如命令行终端)。 Java 和 Scala 编程语言都支持此 API。

这是 DataStream API 的流式字数统计程序,其中有连续的字数统计流,并且数据在第二个窗口中分组。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCountProg {
   public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      DataStream<Tuple2<String, Integer>> dataStream = env
      .socketTextStream("localhost", 9999)
      .flatMap(new Splitter())
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1);
      dataStream.print();
      env.execute("Streaming WordCount Example");
   }
   public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
         for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}