MapReduce - 分区器

分区器的作用类似于处理输入数据集的条件。分区阶段发生在 Map 阶段之后和 Reduce 阶段之前。

分区器的数量等于 Reducer 的数量。这意味着分区器将根据 Reducer 的数量划分数据。因此,从单个分区器传递的数据由单个 Reducer 处理。

分区器

分区器对中间 Map 输出的键值对进行分区。它使用用户定义的条件对数据进行分区,其工作方式类似于哈希函数。分区总数与作业的 Reducer 任务数相同。让我们举一个例子来了解分区器的工作原理。

MapReduce 分区器实现

为了方便起见,我们假设我们有一个名为 Employee 的小表,其中包含以下数据。我们将使用此示例数据作为输入数据集来演示分区器的工作原理。

Id Name Age Gender Salary
1201 gopal 45 Male 50,000
1202 manisha 40 Female 50,000
1203 khalil 34 Male 30,000
1204 prasanth 30 Male 30,000
1205 kiran 20 Male 40,000
1206 laxmi 25 Female 35,000
1207 bhavya 20 Female 15,000
1208 reshma 19 Female 15,000
1209 kranthi 22 Male 22,000
1210 Satish 24 Male 25,000
1211 Krishna 25 Male 25,000
1212 Arshad 28 Male 20,000
1213 lavanya 18 Female 8,000

我们必须编写一个应用程序来处理输入数据集,以便按性别查找不同年龄段(例如,20 岁以下、21 至 30 岁之间、30 岁以上)中薪水最高的员工。

输入数据

上述数据在"/home/hadoop/hadoopPartitioner"目录中保存为 input.txt,并作为输入提供。

1201 gopal 45 Male 50000
1202 manisha 40 Female 51000
1203 khaleel 34 Male 30000
1204 prasanth 30 Male 31000
1205 kiran 20 Male 40000
1206 laxmi 25 Female 35000
1207 bhavya 20 Female 15000
1208 reshma 19 Female 14000
1209 kranthi 22 Male 22000
1210 Satish 24 Male 25000
1211 Krishna 25 Male 26000
1212 Arshad 28 Male 20000
1213 lavanya 18 Female 8000

根据给定的输入,以下是程序的算法解释。

映射任务

当我们在文本文件中拥有文本数据时,映射任务接受键值对作为输入。此映射任务的输入如下 −

输入 − 键将是一种模式,例如"任何特殊键 + 文件名 + 行号"(例如:key = @input1),值将是该行中的数据(例如:value = 1201 gopal 45 Male 50000)。

方法 −该map任务的操作如下 −

  • 从字符串中的参数列表中读取作为输入值的value(记录数据)。

  • 使用split函数,分离性别并存储在字符串变量中。

String[] str = value.toString().split(" ", -3);
String gender=str[3];
  • 将性别信息和记录数据value作为输出键值对从map任务发送到partition任务

context.write(new Text(gender), new Text(value));
  • 对文本文件中的所有记录重复上述所有步骤。

输出 − 您将获得性别数据和记录数据值作为键值对。

分区器任务

分区器任务接受来自 map 任务的键值对作为其输入。分区意味着将数据分成几段。根据给定的分区条件标准,输入的键值对数据可以根据年龄标准分为三部分。

输入 −键值对集合中的全部数据。

key = 记录中的性别字段值。

value = 该性别的整条记录数据值。

方法 − 分区逻辑的运行过程如下。

  • 从输入的键值对中读取年龄字段值。
String[] str = value.toString().split(" ");
int age = Integer.parseInt(str[2]);
  • 检查年龄值是否符合以下条件。

    • 年龄小于或等于 20
    • 年龄大于 20 且小于或等于 30。
    • 年龄大于 30。
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

输出 − 将整个键值对数据分割成三个键值对集合。Reducer 分别对每个集合进行操作。

Reduce 任务

分区器任务的数量等于 Reducer 任务的数量。这里我们有三个分区器任务,因此我们有三个 Reducer 任务要执行。

输入 − Reducer 将使用不同的键值对集合执行三次。

key = 记录中的性别字段值。

value = 该性别的整个记录​​数据。

方法 −以下逻辑将应用于每个集合。

  • 读取每条记录的 Salary 字段值。
String [] str = val.toString().split(" ", -3);
注意:str[4] 具有 salary 字段值。
  • 使用 max 变量检查 salary。如果 str[4] 是最高薪水,则将 str[4] 分配给 max,否则跳过此步骤。

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • 对每个键集合(Male & Female 为键集合)重复步骤 1 和 2。执行完这三个步骤后,您将从 Male 键集合中找到一个最高工资,从 Female 键集合中找到一个最高工资。

context.write(new Text(key), new IntWritable(max));

输出 − 最后,您将获得三个不同年龄段集合中的一组键值对数据。它分别包含每个年龄段的 Male 集合中的最高工资和 Female 集合中的最高工资。

执行 Map、Partitioner 和 Reduce 任务后,三个键值对数据集将作为输出存储在三个不同的文件中。

所有三个任务都被视为 MapReduce 作业。这些作业的以下要求和规范应在配置中指定 −

  • 作业名称
  • 键和值的输入和输出格式
  • Map、Reduce 和 Partitioner 任务的单独类
配置 conf = getConf();

//创建作业
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

//文件输入和输出路径
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//设置键值对的 Mapper 类和输出格式。
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//设置分区器语句
job.setPartitionerClass(CaderPartitioner.class);

//设置 Reducer 类和键值对的输入/输出格式。
job.setReducerClass(ReduceClass.class);

//Reducer 任务的数量。
job.setNumReduceTasks(3);

//数据的输入和输出格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

示例程序

以下程序显示如何在 MapReduce 程序中为给定条件实现分区器。

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("	", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("	", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("	");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

将上述代码保存为"/home/hadoop/hadoopPartitioner"中的PartitionerExample.java。程序的编译和执行如下所示。

编译和执行

假设我们位于 Hadoop 用户的主目录中(例如,/home/hadoop)。

按照以下步骤编译并执行上述程序。

步骤 1 − 下载 Hadoop-core-1.2.1.jar,用于编译和执行 MapReduce 程序。您可以从 mvnrepository.com 下载 jar。

假设下载的文件夹是"/home/hadoop/hadoopPartitioner"

步骤 2 − 以下命令用于编译程序 PartitionerExample.java 并为该程序创建 jar。

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

步骤 3 −使用以下命令在 HDFS 中创建输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

步骤 4 − 使用以下命令将名为 input.txt 的输入文件复制到 HDFS 的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

步骤 5 − 使用以下命令验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

步骤 6 −使用以下命令从输入目录中获取输入文件,运行 Top salary 应用程序。

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

等待一段时间,直到文件执行。执行后,输出包含许多输入拆分、map 任务和 Reducer 任务。

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

步骤 7 − 使用以下命令验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

您将在三个文件中找到输出,因为您在程序中使用了三个分区器和三个 Reducer。

步骤 8 − 使用以下命令查看 Part-00000 文件中的输出。此文件由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Part-00000 中的输出

Female   15000
Male     40000

使用以下命令查看 Part-00001 文件中的输出。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Part-00001 中的输出

Female   35000
Male    31000

使用以下命令查看 Part-00001 文件中的输出Part-00002 文件。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Part-00002 中的输出

Female  51000
Male   50000