Hadoop - MapReduce
MapReduce 是一个框架,我们可以使用该框架编写应用程序,以可靠的方式在大型商用硬件集群上并行处理大量数据。
什么是 MapReduce?
MapReduce是一种基于java的分布式计算的处理技术和程序模型。 MapReduce 算法包含两个重要任务,即 Map 和 Reduce。 Map 获取一组数据并将其转换为另一组数据,其中单个元素被分解为元组(键/值对)。 其次,reduce 任务,它将 map 的输出作为输入,并将这些数据元组组合成一组更小的元组。 正如 MapReduce 名称的顺序所暗示的,reduce 任务总是在 map 作业之后执行。
MapReduce 的主要优点是可以轻松地在多个计算节点上扩展数据处理。 在 MapReduce 模型下,数据处理原语称为 mapper 和 reducers。 将一个数据处理应用程序分解为 mappers 和 reducers 有时并不简单。 但是,一旦我们以 MapReduce 形式编写应用程序,将应用程序扩展为在集群中的数百、数千甚至数万台机器上运行只是一种配置更改。 这种简单的可伸缩性吸引了许多程序员使用 MapReduce 模型。
算法
一般来说,MapReduce 范式是基于将计算机发送到数据所在的位置!
MapReduce程序分三个阶段执行,即map阶段、shuffle阶段和reduce阶段。
Map阶段 − map 或 mapper 的工作是处理输入数据。 一般输入数据以文件或目录的形式存储在Hadoop文件系统(HDFS)中。 输入文件逐行传递给mapper函数。 mapper处理数据并创建几个小数据块。
Reduce阶段 − 这个阶段是 Shuffle 阶段和 Reduce 阶段的组合。Reducer 的工作是处理来自mapper的数据。 处理后,它会产生一组新的输出,将存储在 HDFS 中。
在 MapReduce 作业期间,Hadoop 将 Map 和 Reduce 任务发送到集群中相应的服务器。
该框架管理数据传递的所有细节,例如发布任务、验证任务完成以及在节点之间的集群周围复制数据。
大多数计算发生在节点上,数据位于本地磁盘上,从而减少了网络流量。
完成给定任务后,集群收集并减少数据以形成适当的结果,并将其发送回 Hadoop 服务器。
输入和输出(Java 视角)
MapReduce 框架对 <key, value> 对进行操作,即框架将作业的输入视为一组 <key, value> 对,并生成一组 <key, value> 对作为作业的输出,可以想象到不同的类型。
键和值类应该由框架以序列化的方式进行,因此需要实现 Writable 接口。 此外,关键类必须实现 Writable-Comparable 接口以方便框架进行排序。 MapReduce 作业 的输入和输出类型 − (输入) <k1, v1> → map → <k2, v2> → reduce → <k3, v3>(输出)。
输入 | 输出 | |
---|---|---|
Map | <k1, v1> | list (<k2, v2>) |
Reduce | <k2, list(v2)> | list (<k3, v3>) |
术语
PayLoad − 应用程序实现了 Map 和 Reduce 功能,构成了工作的核心。
Mapper − Mapper 将输入的键/值对映射到一组中间键/值对。
NamedNode − 管理 Hadoop 分布式文件系统 (HDFS) 的节点。
DataNode − 在进行任何处理之前提前呈现数据的节点。
MasterNode − JobTracker 运行并接受来自客户端的作业请求的节点。
SlaveNode − Map 和 Reduce 程序运行的节点。
JobTracker − 安排作业并跟踪分配给任务跟踪器的作业。
Task Tracker − 跟踪任务并向 JobTracker 报告状态。
Job − 程序是跨数据集的 Mapper 和 Reducer 的执行。
Task − 在数据切片上执行 Mapper 或 Reducer。
Task Attempt − 尝试在 SlaveNode 上执行任务的特定实例。
示例场景
以下是有关组织用电量的数据。 它包含了各个年份的每月用电量和年平均值。
Jan | Feb | Mar | Apr | May | Jun | Jul | Aug | Sep | Oct | Nov | Dec | Avg | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
如果将上述数据作为输入,我们必须编写应用程序对其进行处理并生成结果,例如查找最大使用年份、最小使用年份等。 对于记录数有限的程序员来说,这是一个临时方案。 他们将简单地编写逻辑以生成所需的输出,并将数据传递给编写的应用程序。
但是,想想代表某个特定州自成立以来所有大规模工业的电力消耗的数据。
当我们编写应用程序来处理此类批量数据时,
他们将花费大量时间来执行。
当我们将数据从源移动到网络服务器等时,将会出现大量网络流量。
为了解决这些问题,我们有 MapReduce 框架。
输入数据
上述数据保存为 sample.txt 并作为输入给出。 输入文件如下所示。
1979 23 23 2 43 24 25 26 26 26 26 25 26 25 1980 26 27 28 28 28 30 31 31 31 30 30 30 29 1981 31 32 32 32 33 34 35 36 36 34 34 34 34 1984 39 38 39 39 39 41 42 43 40 39 38 38 40 1985 38 39 39 39 39 41 41 41 00 40 39 39 45
示例程序
下面给出的是使用 MapReduce 框架的示例数据的程序。
package hadoop; import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class ProcessUnits { //Mapper class public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,/*Input key Type */ Text, /*Input value Type*/ Text, /*Output key Type*/ IntWritable> /*Output value Type*/ { //Map function public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String lasttoken = null; StringTokenizer s = new StringTokenizer(line,"\t"); String year = s.nextToken(); while(s.hasMoreTokens()) { lasttoken = s.nextToken(); } int avgprice = Integer.parseInt(lasttoken); output.collect(new Text(year), new IntWritable(avgprice)); } } //Reducer class public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > { //Reduce function public void reduce( Text key, Iterator <IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxavg = 30; int val = Integer.MIN_VALUE; while (values.hasNext()) { if((val = values.next().get())>maxavg) { output.collect(key, new IntWritable(val)); } } } } //Main function public static void main(String args[])throws Exception { JobConf conf = new JobConf(ProcessUnits.class); conf.setJobName("max_eletricityunits"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(E_EMapper.class); conf.setCombinerClass(E_EReduce.class); conf.setReducerClass(E_EReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
将上述程序另存为 ProcessUnits.java。 下面解释程序的编译和执行。
Process Units程序的编译与执行
假设我们在 Hadoop 用户的主目录中(例如 /home/hadoop)。
按照下面给出的步骤编译和执行上述程序。
步骤 1
下面的命令是创建一个目录来存放编译后的java类。
$ mkdir units
步骤 2
下载 Hadoop-core-1.2.1.jar, 用于编译和执行 MapReduce 程序。 访问以下链接 mvnrepository.com 下载 jar。 假设下载的文件夹是 /home/hadoop/.
步骤 3
以下命令用于编译 ProcessUnits.java 程序并为该程序创建一个 jar。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java $ jar -cvf units.jar -C units/ .
步骤 4
以下命令用于在 HDFS 中创建输入目录。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
步骤 5
以下命令用于将名为 sample.txt 的输入文件复制到 HDFS 的输入目录中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
步骤 6
以下命令用于验证输入目录中的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
步骤 7
以下命令用于通过从输入目录获取输入文件来运行 Eleunit_max 应用程序。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
稍等片刻,直到文件执行完毕。 执行后如下图,输出会包含输入split个数,Map任务个数,reducer任务个数等。
INFO mapreduce.Job: Job job_1414748220717_0002 completed successfully 14/10/31 06:02:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read = 61 FILE: Number of bytes written = 279400 FILE: Number of read operations = 0 FILE: Number of large read operations = 0 FILE: Number of write operations = 0 HDFS: Number of bytes read = 546 HDFS: Number of bytes written = 40 HDFS: Number of read operations = 9 HDFS: Number of large read operations = 0 HDFS: Number of write operations = 2 Job Counters Launched map tasks = 2 Launched reduce tasks = 1 Data-local map tasks = 2 Total time spent by all maps in occupied slots (ms) = 146137 Total time spent by all reduces in occupied slots (ms) = 441 Total time spent by all map tasks (ms) = 14613 Total time spent by all reduce tasks (ms) = 44120 Total vcore-seconds taken by all map tasks = 146137 Total vcore-seconds taken by all reduce tasks = 44120 Total megabyte-seconds taken by all map tasks = 149644288 Total megabyte-seconds taken by all reduce tasks = 45178880 Map-Reduce Framework Map input records = 5 Map output records = 5 Map output bytes = 45 Map output materialized bytes = 67 Input split bytes = 208 Combine input records = 5 Combine output records = 5 Reduce input groups = 5 Reduce shuffle bytes = 6 Reduce input records = 5 Reduce output records = 5 Spilled Records = 10 Shuffled Maps = 2 Failed Shuffles = 0 Merged Map outputs = 2 GC time elapsed (ms) = 948 CPU time spent (ms) = 5160 Physical memory (bytes) snapshot = 47749120 Virtual memory (bytes) snapshot = 2899349504 Total committed heap usage (bytes) = 277684224 File Output Format Counters Bytes Written = 40
步骤 8
以下命令用于验证输出文件夹中的结果文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
步骤 9
以下命令用于查看 Part-00000 文件中的输出。 该文件由 HDFS 生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
下面是 MapReduce 程序生成的输出。
1981 34 1984 40 1985 45
步骤 10
以下命令用于将输出文件夹从 HDFS 复制到本地文件系统进行分析。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
重要的指令
所有 Hadoop 命令都由 $HADOOP_HOME/bin/hadoop 命令调用。 运行不带任何参数的 Hadoop 脚本会打印所有命令的描述。
用法 − hadoop [--config confdir] 命令
下表列出了可用的选项及其说明。
序号 | 选项 & 描述 |
---|---|
1 | namenode -format 格式化 DFS 文件系统。 |
2 | secondarynamenode 运行 DFS 辅助名称节点。 |
3 | namenode 运行 DFS 名称节点。 |
4 | datanode 运行一个 DFS 数据节点。 |
5 | dfsadmin 运行 DFS 管理客户端。 |
6 | mradmin 运行 Map-Reduce 管理客户端。 |
7 | fsck 运行 DFS 文件系统检查实用程序。 |
8 | fs 运行通用文件系统用户客户端。 |
9 | balancer 运行集群平衡实用程序。 |
10 | oiv 将离线 fsimage 查看器应用于 fsimage。 |
11 | fetchdt 从 NameNode 获取委托令牌。 |
12 | jobtracker 运行 MapReduce 作业跟踪器节点。 |
13 | pipes 运行管道作业。 |
14 | tasktracker 运行一个 MapReduce 任务 Tracker 节点。 |
15 | historyserver 将作业历史服务器作为独立的守护进程运行。 |
16 | job 操作 MapReduce 作业。 |
17 | queue 获取有关 JobQueues 的信息。 |
18 | version 打印版本。 |
19 | jar <jar> 运行一个 jar 文件。 |
20 | distcp <srcurl> <desturl> 递归复制文件或目录。 |
21 | distcp2 <srcurl> <desturl> DistCp 版本 2。 |
22 | archive -archiveName NAME -p <parent path> <src>* <dest> 创建一个 hadoop 存档。 |
23 | classpath 打印获取 Hadoop jar 和所需库所需的类路径。 |
24 | daemonlog 获取/设置每个守护进程的日志级别 |
如何与 MapReduce 作业交互
用法 − hadoop 作业 [GENERIC_OPTIONS]
以下是 Hadoop 作业中可用的通用选项。
序号 | GENERIC_OPTION & Description |
---|---|
1 | -submit <job-file> 提交作业。 |
2 | -status <job-id> 打印 map 并减少完成百分比和所有作业计数器。 |
3 | -counter <job-id> <group-name> <countername> 打印计数器值。 |
4 | -kill <job-id> 结束作业。 |
5 | -events <job-id> <fromevent-#> <#-of-events> 打印 jobtracker 在给定范围内收到的事件详细信息。 |
6 | -history [all] <jobOutputDir> - history < jobOutputDir> 打印作业详细信息、失败和终止提示详细信息。 通过指定 [all] 选项可以查看有关作业的更多详细信息,例如成功的任务和为每个任务所做的任务尝试。 |
7 | -list[all] 显示所有作业。 -list 仅显示尚未完成的作业。 |
8 | -kill-task <task-id> 终止任务。 被终止的任务不计入失败的尝试。 |
9 | -fail-task <task-id> 任务失败。 失败的任务计入失败的尝试。 |
10 | -set-priority <job-id> <priority> 更改作业的优先级。 允许的优先级值为 VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOW |
查看作业状态
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
查看作业输出目录的历史
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> e.g. $ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
终止作业
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004