Apache Flink - 创建 Flink 应用程序
在本章中,我们将学习如何创建 Flink 应用程序。
打开 Eclipse IDE,单击"新建项目"并选择"Java 项目"。
给出项目名称并单击"完成"。
现在,单击"完成",如以下屏幕截图所示。
现在,右键单击src并转到New >> Class。
提供类名称并单击"完成"。
将以下代码复制并粘贴到编辑器中。
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.util.Collector; public class WordCount { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text = env.readTextFile(params.get("input")); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); // emit result if (params.has("output")) { counts.writeAsCsv(params.get("output"), " ", " "); // execute program env.execute("WordCount Example"); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); } } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } }
您将在编辑器中看到许多错误,因为需要将 Flink 库添加到该项目中。
右键单击 project >> Build Path >> Configure Build Path。
选择"库"选项卡并单击"添加外部 JAR"。
进入Flink的lib目录,选择所有4个库,然后单击"确定"。
转到"Order and Export"选项卡,选择所有库,然后单击"确定"。
您将看到错误不再存在。
现在,让我们导出该应用程序。 右键单击该项目,然后单击"导出"。
选择 JAR 文件并单击"下一步"
给出目标路径并单击"下一步"
点击"下一步">
单击"浏览",选择主类 (WordCount),然后单击"完成"。
注意 − 如果您收到任何警告,请单击"确定"。
运行以下命令。 它将进一步运行您刚刚创建的 Flink 应用程序。
./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output