Apache Storm - 工作示例
我们已经了解了 Apache Storm 的核心技术细节,现在是时候编写一些简单的场景了。
场景 – 手机通话记录分析器
移动呼叫及其持续时间将作为 Apache Storm 的输入,Storm 将处理和分组同一呼叫者和接收者之间的呼叫以及它们的呼叫总数。
Spout 创建
Spout 是一个用于数据生成的组件。 基本上,一个 spout 将实现一个 IRichSpout 接口。 "IRichSpout"接口有以下几个重要方法 −
open − 为 spout 提供执行环境。 执行程序将运行此方法来初始化 spout。
nextTuple −通过收集器发出生成的数据。
close − 当 spout 将要关闭时调用此方法。
declareOutputFields − 声明元组的输出模式。
ack − 确认处理了特定的元组
fail − 指定不处理并且不重新处理特定元组。
Open
open 方法的签名如下 −
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf − 为这个 spout 提供 Storm 配置。
context − 提供有关 topology 中的 spout 位置、其任务 ID、输入和输出信息的完整信息。
collector − 使我们能够发出将由 bolts 处理的元组。
nextTuple
nextTuple 方法的签名如下 −
nextTuple()
nextTuple() 从与 ack() 和 fail() 方法相同的循环中定期调用。 当没有工作要做时,它必须释放对线程的控制,以便其他方法有机会被调用。 所以 nextTuple 的第一行检查处理是否完成。 如果是这样,它应该在返回之前至少休眠一毫秒以减少处理器上的负载。
close
close 方法的签名如下 −
close()
declareOutputFields
declareOutputFields 方法的签名如下 −
declareOutputFields(OutputFieldsDeclarer declarer)
declarer − 用于声明输出流id、输出字段等。
此方法用于指定元组的输出模式。
ack
ack(Object msgId)
此方法确认已处理特定元组。
fail
nextTuple 方法的签名如下 −
ack(Object msgId)
此方法通知特定元组尚未完全处理。 Storm 将重新处理特定的元组。
FakeCallLogReaderSpout
在我们的场景中,我们需要收集通话记录详细信息。 通话记录的信息包含。
- 来电号码
- 接收人号码
- 持续时间
由于我们没有实时的通话记录信息,我们会生成虚假的通话记录。 虚假信息将使用 Random 类创建。 完整的程序代码如下。
代码 − FakeCallLogReaderSpout.java
import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Bolt 创建
Bolt 是一个将元组作为输入,处理元组,并产生新元组作为输出的组件。 Bolts 将实现 IRichBolt 接口。 在这个程序中,两个 Bolt 类 CallLogCreatorBolt 和 CallLogCounterBolt 用于执行操作。
IRichBolt 接口有以下方法 −
prepare − 为 bolt 提供一个执行环境。 执行程序将运行此方法来初始化 spout。
execute − 处理单个输入元组。
cleanup − 当 bolt 要关闭时调用。
declareOutputFields − 声明元组的输出模式。
Prepare
prepare 方法的签名如下 −
prepare(Map conf, TopologyContext context, OutputCollector collector)
conf − 为这个 bolt 提供 Storm 配置。
context − 提供有关 topology 中 bolt 位置、其任务 ID、输入和输出信息等的完整信息。
collector − 使我们能够发出已处理的元组。
execute
execute 方法的签名如下 −
execute(Tuple tuple)
这里 tuple 是要处理的输入元组。
execute 方法一次处理一个元组。 元组数据可以通过 Tuple 类的 getValue 方法访问。 没有必要立即处理输入元组。 可以处理多个元组并将其作为单个输出元组输出。 可以使用 OutputCollector 类发出处理后的元组。
cleanup
cleanup 方法的签名如下 −
cleanup()
declareOutputFields
declareOutputFields 方法的签名如下 −
declareOutputFields(OutputFieldsDeclarer declarer)
这里的参数declarer用于声明输出流id、输出字段等
该方法用于指定元组的输出模式
通话记录创建者 Bolt
通话记录创建者 Bolt 接收通话记录元组。 呼叫日志元组具有呼叫者号码、接收者号码和呼叫持续时间。 这个 Bolt 通过组合呼叫者号码和接收者号码来简单地创建一个新值。 新值的格式为"来电号码 - 接听人号码",命名为新字段"call"。 完整的代码如下。
代码 − CallLogCreatorBolt.java
//import util packages import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; //import Storm IRichBolt package import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //Create a class CallLogCreatorBolt which implement IRichBolt interface public class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
呼叫日志计数器 Bolt
呼叫日志计数器 Bolt 以元组形式接收呼叫及其持续时间。 这个bolt在prepare方法中初始化了一个字典(Map)对象。 在 execute 方法中,它检查元组并在字典对象中为元组中的每个新"调用"值创建一个新条目,并在字典对象中设置值 1。对于字典中已经可用的条目,它只是增加它的值。 简单来说,这个bolt将调用及其计数保存在字典对象中。 除了将调用及其计数保存在字典中之外,我们还可以将其保存到数据源中。 完整的程序代码如下 −
代码 − CallLogCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
创建 topology
Storm topology 基本上是一个 Thrift 结构。 TopologyBuilder 类提供了简单易用的方法来创建复杂的 topology。 TopologyBuilder 类具有设置 spout (setSpout) 和设置 bolt (setBolt) 的方法。最后,TopologyBuilder 有 createTopology 来创建 topology。 使用以下代码片段创建 topology −
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping 和 fieldsGrouping 方法有助于为 spout 和 bolts 设置流分组。
本地集群
出于开发目的,我们可以使用"LocalCluster"对象创建一个本地集群,然后使用"LocalCluster"类的"submitTopology"方法提交 topology。"submitTopology"的参数之一是"Config"类的一个实例。"Config"类用于在提交 topology 之前设置配置选项。 此配置选项将在运行时与集群配置合并,并使用 prepare 方法发送到所有任务(spout 和 bolt)。 将 topology 提交到集群后,我们将等待 10 秒让集群计算提交的 topology,然后使用"LocalCluster"的"关闭"方法关闭集群。 完整的程序代码如下 −
代码 − LogAnalyserStorm.java
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }
构建和运行应用程序
完整的应用程序有四个 Java 代码。 他们是 −
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java
可以使用以下命令构建应用程序 −
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
该应用程序可以使用以下命令运行 −
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
输出
一旦应用程序启动,它会输出集群启动过程、spout和bolt处理,最后是集群关闭过程的完整细节。 在"CallLogCounterBolt"中,我们打印了呼叫及其计数详细信息。 此信息将显示在控制台上,如下所示 −
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93
非 JVM 语言
Storm topology 由 Thrift 接口实现,可以轻松提交任何语言的 topology。 Storm 支持 Ruby、Python 和许多其他语言。 让我们看一下python绑定。
Python 绑定
Python 是一种通用的解释型、交互式、面向对象的高级编程语言。 Storm 支持 Python 实现其 topology。 Python 支持发射、锚定、确认和记录操作。
如您所知,bolt可以用任何语言定义。 用另一种语言编写的 Bolt 作为子进程执行,Storm 通过 stdin/stdout 使用 JSON 消息与这些子进程通信。 首先取一个支持python绑定的示例bolt WordCount。
public static class WordCount implements IRichBolt { public WordSplit() { super("python", "splitword.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
这里的类 WordCount 实现了 IRichBolt 接口并使用 python 实现指定的超级方法参数"splitword.py"运行。现在创建一个名为"splitword.py"的 python 实现。
import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) WordCountBolt().run()
这是 Python 的示例实现,用于计算给定句子中的单词。 同样,您也可以绑定其他支持语言。