Apache Flink - 创建 Flink 应用程序

在本章中,我们将学习如何创建 Flink 应用程序。

打开 Eclipse IDE,单击"新建项目"并选择"Java 项目"。

创建 Flink 应用程序

给出项目名称并单击"完成"。

创建 Flink 应用程序2

现在,单击"完成",如以下屏幕截图所示。

创建 Flink 应用程序3

现在,右键单击src并转到New >> Class。

创建 Flink 应用程序4

提供类名称并单击"完成"。

创建 Flink 应用程序5

将以下代码复制并粘贴到编辑器中。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {

   // *************************************************************************
   // PROGRAM
   // *************************************************************************
   public static void main(String[] args) throws Exception {
      final ParameterTool params = ParameterTool.fromArgs(args);
      // set up the execution environment
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      // make parameters available in the web interface
      env.getConfig().setGlobalJobParameters(params);
      // get input data
      DataSet<String> text = env.readTextFile(params.get("input"));
      DataSet<Tuple2<String, Integer>> counts =
      // split up the lines in pairs (2-tuples) containing: (word,1)
      text.flatMap(new Tokenizer())
      // group by the tuple field "0" and sum up tuple field "1"
      .groupBy(0)
      .sum(1);
      // emit result
      if (params.has("output")) {
         counts.writeAsCsv(params.get("output"), "
", " ");
         // execute program
         env.execute("WordCount Example");
      } else {
         System.out.println("Printing result to stdout. Use --output to specify output path.");
         counts.print();
      }
   }
   
   // *************************************************************************
   // USER FUNCTIONS
   // *************************************************************************
   public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
         // normalize and split the line
         String[] tokens = value.toLowerCase().split("\W+");
         // emit the pairs
         for (String token : tokens) {
            if (token.length() > 0) {
               out.collect(new Tuple2<>(token, 1));
            }
         }
      }
   }
}

您将在编辑器中看到许多错误,因为需要将 Flink 库添加到该项目中。

已添加 Flink 库

右键单击 project >> Build Path >> Configure Build Path。

右键单击项目

选择"库"选项卡并单击"添加外部 JAR"。

选择库

进入Flink的lib目录,选择所有4个库,然后单击"确定"。

Flinks lib 目录

转到"Order and Export"选项卡,选择所有库,然后单击"确定"。

Order and Export 选项卡

您将看到错误不再存在。

现在,让我们导出该应用程序。 右键单击该项目,然后单击"导出"。

导出此应用程序

选择 JAR 文件并单击"下一步"

选择 JAR 文件

给出目标路径并单击"下一步"

目标路径

点击"下一步">

单击下一步

单击"浏览",选择主类 (WordCount),然后单击"完成"。

单击完成

注意 − 如果您收到任何警告,请单击"确定"。

运行以下命令。 它将进一步运行您刚刚创建的 Flink 应用程序。

./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output
收到警告