Apache Flume - 获取 Twitter 数据

使用 Flume,我们可以从各种服务获取数据并将其传输到集中存储(HDFS 和 HBase)。 本章介绍如何使用 Apache Flume 从 Twitter 服务获取数据并将其存储在 HDFS 中。

正如 Flume 架构中所讨论的,Web 服务器生成日志数据,并且该数据由 Flume 中的代理收集。 通道将此数据缓冲到接收器,接收器最终将其推送到集中存储。

在本章提供的示例中,我们将创建一个应用程序并使用 Apache Flume 提供的实验性 Twitter 源从中获取推文。 我们将使用内存通道来缓冲这些推文,并使用 HDFS 接收器将这些推文推送到 HDFS。

获取数据

要获取 Twitter 数据,我们必须按照以下步骤操作 −

  • 创建 Twitter 应用程序
  • 安装/启动 HDFS
  • 配置 Flume

创建 Twitter 应用程序

为了从 Twitter 获取推文,需要创建一个 Twitter 应用程序。 按照下面给出的步骤创建 Twitter 应用程序。

步骤 1

要创建 Twitter 应用程序,请单击以下链接 https://apps.twitter.com/。 登录您的 Twitter 帐户。 您将有一个 Twitter 应用程序管理窗口,您可以在其中创建、删除和管理 Twitter 应用程序。

应用程序管理窗口

步骤 2

单击创建新应用程序按钮。 您将被重定向到一个窗口,您将在其中获得一份申请表,您必须在其中填写详细信息才能创建应用程序。 填写网站地址时,给出完整的 URL 模式,例如 http://example.com.

创建应用程序

步骤 3

填写详细信息,完成后接受开发者协议,单击页面底部的创建您的 Twitter 应用程序按钮。 如果一切顺利,将使用给定的详细信息创建一个应用程序,如下所示。

应用程序已创建

步骤 4

在页面底部的密钥和访问令牌选项卡下,您可以看到一个名为创建我的访问令牌的按钮。 单击它以生成访问令牌。

密钥访问令牌

步骤 5

最后,单击页面右上角的测试 OAuth 按钮。 这将打开一个页面,其中显示您的消费者密钥、消费者秘密、访问令牌访问令牌密钥。 复制这些详细信息。 这些对于在 Flume 中配置代理很有用。

OAuth 工具

启动HDFS

由于我们将数据存储在 HDFS 中,因此我们需要安装/验证 Hadoop。 启动Hadoop并在其中创建一个文件夹来存储Flume数据。 在配置 Flume 之前,请按照以下步骤操作。

第 1 步:安装/验证 Hadoop

安装Hadoop。 如果您的系统中已安装 Hadoop,请使用 Hadoop version 命令验证安装,如下所示。

$ hadoop version 

If your system contains Hadoop, and if you have set the path variable, then you will get the following output −

Hadoop 2.6.0 
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 
Compiled by jenkins on 2014-11-13T21:10Z 
Compiled with protoc 2.5.0 
From source with checksum 18e43357c8f927c0695f1e9522859d6a 
This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar

第 2 步:启动 Hadoop

浏览Hadoop的sbin目录并启动yarn和Hadoop dfs(分布式文件系统),如下所示。

cd /$Hadoop_Home/sbin/ 
$ start-dfs.sh 
localhost: starting namenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out 
localhost: starting datanode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out 
Starting secondary namenodes [0.0.0.0] 
starting secondarynamenode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out
  
$ start-yarn.sh 
starting yarn daemons 
starting resourcemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out 
localhost: starting nodemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out 

第3步:在HDFS中创建目录

在 Hadoop DFS 中,您可以使用命令 mkdir 创建目录。 浏览它并在所需路径中创建一个名为 twitter_data 的目录,如下所示。

$cd /$Hadoop_Home/bin/ 
$ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data 

配置 Flume

我们必须使用 conf 文件夹中的配置文件来配置源、通道和接收器。 本章给出的示例使用 Apache Flume 提供的名为 Twitter 1% Firehose 内存通道和 HDFS 接收器的实验源。

Twitter 1% Firehose 源

这个来源是高度实验性的。 它使用流 API 连接到 1% 示例 Twitter Firehose,持续下载推文,将其转换为 Avro 格式,并将 Avro 事件发送到下游 Flume 接收器。

随着 Flume 的安装,我们会默认获得这个源。 与此源对应的 jar 文件可以位于 lib 文件夹中,如下所示。

Twitter Jar 文件

设置 classpath

classpath变量设置为Flume-env.sh文件中Flume的lib文件夹,如下所示。

export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/* 

此源需要 Twitter 应用程序的详细信息,例如消费者密钥、访问令牌访问令牌密钥。 配置此源时,您必须为以下属性提供值 −

  • Channels

  • Source type : org.apache.flume.source.twitter.TwitterSource

  • consumerKey − OAuth 使用者密钥

  • consumerSecret − OAuth 消费者密钥

  • accessToken − OAuth 访问令牌

  • accessTokenSecret − OAuth 令牌密钥

  • maxBatchSize − Twitter 批次中应包含的 Twitter 消息的最大数量。 默认值为 1000(可选)。

  • maxBatchDurationMillis − 关闭批次之前等待的最大毫秒数。 默认值为 1000(可选)。

Channel

我们正在使用内存通道。 要配置内存通道,您必须为通道类型提供值。

  • type − 它保存通道的类型。 在我们的示例中,类型为 MemChannel

  • Capacity − 它是通道中存储的最大事件数。 其默认值为 100(可选)。

  • TransactionCapacity − 它是通道接受或发送的最大事件数。 其默认值为 100(可选)。

HDFS 接收器

该接收器将数据写入 HDFS。 要配置此接收器,您必须提供以下详细信息。

  • Channel

  • type − hdfs

  • hdfs.path − HDFS中存储数据的目录路径。

并且我们可以根据场景提供一些可选值。 下面给出的是我们在应用程序中配置的 HDFS 接收器的可选属性。

  • fileType − 这是我们的 HDFS 文件所需的文件格式。 SequenceFile、DataStreamCompressedStream 是此流可用的三种类型。 在我们的示例中,我们使用DataStream

  • writeFormat − 可以是文本或可写。

  • batchSize − 它是在将文件刷新到 HDFS 之前写入文件的事件数。 它的默认值为 100。

  • rollsize − 这是触发滚动的文件大小。 默认值为 100。

  • rollCount − 它是在滚动之前写入文件的事件数。 它的默认值为 10。

示例 - 配置文件

下面给出的是配置文件的示例。 复制此内容并在 Flume 的conf文件夹中另存为twitter.conf

# Naming the components on the current agent. 
TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS
  
# Describing/Configuring the source 
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret 
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token 
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret 
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql
  
# Describing/Configuring the sink 

TwitterAgent.sinks.HDFS.type = hdfs 
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream 
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text 
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 
 
# Describing/Configuring the channel 
TwitterAgent.channels.MemChannel.type = memory 
TwitterAgent.channels.MemChannel.capacity = 10000 
TwitterAgent.channels.MemChannel.transactionCapacity = 100
  
# Binding the source and sink to the channel 
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel 

执行

浏览 Flume 主目录并执行应用程序,如下所示。

$ cd $FLUME_HOME 
$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf 
Dflume.root.logger=DEBUG,console -n TwitterAgent

如果一切顺利,推文将开始流式传输到 HDFS。 下面给出的是获取推文时命令提示符窗口的快照。

获取推文

验证 HDFS

您可以使用下面给出的 URL 访问 Hadoop 管理 Web UI。

http://localhost:50070/ 

单击页面右侧名为实用程序的下拉列表。 您可以看到两个选项,如下面的快照所示。

验证 HDFS

单击浏览文件系统并输入存储推文的 HDFS 目录的路径。 在我们的示例中,路径为 /user/Hadoop/twitter_data/。 然后,您可以看到存储在 HDFS 中的 twitter 日志文件列表,如下所示。

浏览文件系统