HCatalog - 输入输出格式
HCatInputFormat 和 HCatOutputFormat 接口用于从 HDFS 读取数据,并在处理后使用 MapReduce 作业将结果数据写入 HDFS。让我们详细说明输入和输出格式接口。
HCatInputFormat
HCatInputFormat 与 MapReduce 作业一起使用,从 HCatalog 管理的表中读取数据。HCatInputFormat 公开了 Hadoop 0.20 MapReduce API,用于读取数据,就像数据已发布到表中一样。
Sr.No. | 方法名称 &描述 |
---|---|
1 | public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException 设置要用于作业的输入。它使用给定的输入规范查询元存储,并将匹配的分区序列化到 MapReduce 任务的作业配置中。 |
2 | public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException 设置要用于作业的输入。它使用给定的输入规范查询元存储,并将匹配的分区序列化到 MapReduce 任务的作业配置中。 |
3 | public HCatInputFormat setFilter(String filter)throws IOException 在输入表上设置过滤器。 |
4 | public HCatInputFormat setProperties(Properties properties) throws IOException 设置输入格式的属性。 |
HCatInputFormat API 包括以下方法 −
- setInput
- setOutputSchema
- getTableSchema
要使用 HCatInputFormat 读取数据,首先使用正在读取的表中的必要信息实例化 InputJobInfo,然后使用 InputJobInfo 调用 setInput。
您可以使用 setOutputSchema 方法包含 投影模式,以指定输出字段。如果未指定模式,则将返回表中的所有列。您可以使用 getTableSchema 方法确定指定输入表的表模式。
HCatOutputFormat
HCatOutputFormat 与 MapReduce 作业一起使用,将数据写入 HCatalog 管理的表。 HCatOutputFormat 公开了用于将数据写入表的 Hadoop 0.20 MapReduce API。当 MapReduce 作业使用 HCatOutputFormat 写入输出时,将使用为表配置的默认 OutputFormat,并在作业完成后将新分区发布到表中。
Sr.No. | 方法名称和说明 |
---|---|
1 | public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException 设置要为作业写入的输出信息。它查询元数据服务器以查找用于表的 StorageHandler。如果分区已发布,它会抛出错误。 |
2 | public static void setSchema (Configuration conf, HCatSchema schema) throws IOException 设置要写入分区的数据的架构。如果未调用此方法,则默认情况下将使用表架构进行分区。 |
3 | public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException 获取作业的记录写入器。它使用 StorageHandler 的默认 OutputFormat 来获取记录写入器。 |
4 | public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException 获取此输出格式的输出提交者。它确保输出正确提交。 |
HCatOutputFormat API 包括以下方法 −
- setOutput
- setSchema
- getTableSchema
对 HCatOutputFormat 的第一次调用必须是 setOutput;任何其他调用都会引发异常,表示输出格式未初始化。
setSchema 方法指定要写出的数据的架构。您必须调用此方法,提供要写入的数据的架构。如果您的数据具有与表架构相同的架构,则可以使用 HCatOutputFormat.getTableSchema() 获取表架构,然后将其传递给 setSchema()。
示例
以下 MapReduce 程序从一个表中读取数据,并假设该表的第二列("第 1 列")中有一个整数,并计算它找到的每个不同值的实例数。也就是说,它执行的操作相当于"select col1, count(*) from $table group by col1;"。
例如,如果第二列中的值为 {1, 1, 1, 3, 3, 5},则程序将生成以下值和计数输出 −
1, 3 3, 2 5, 1
现在我们来看看程序代码 −
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.HCatalog.common.HCatConstants; import org.apache.HCatalog.data.DefaultHCatRecord; import org.apache.HCatalog.data.HCatRecord; import org.apache.HCatalog.data.schema.HCatSchema; import org.apache.HCatalog.mapreduce.HCatInputFormat; import org.apache.HCatalog.mapreduce.HCatOutputFormat; import org.apache.HCatalog.mapreduce.InputJobInfo; import org.apache.HCatalog.mapreduce.OutputJobInfo; public class GroupByAge extends Configured implements Tool { public static class Map extends Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable> { int age; @Override protected void map( WritableComparable key, HCatRecord value, org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable>.Context context )throws IOException, InterruptedException { age = (Integer) value.get(1); context.write(new IntWritable(age), new IntWritable(1)); } } public static class Reduce extends Reducer<IntWritable, IntWritable, WritableComparable, HCatRecord> { @Override protected void reduce( IntWritable key, java.lang.Iterable<IntWritable> values, org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable, WritableComparable, HCatRecord>.Context context )throws IOException ,InterruptedException { int sum = 0; Iterator<IntWritable> iter = values.iterator(); while (iter.hasNext()) { sum++; iter.next(); } HCatRecord record = new DefaultHCatRecord(2); record.set(0, key.get()); record.set(1, sum); context.write(null, record); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); args = new GenericOptionsParser(conf, args).getRemainingArgs(); String serverUri = args[0]; String inputTableName = args[1]; String outputTableName = args[2]; String dbName = null; String principalID = System .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL); if (principalID != null) conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID); Job job = new Job(conf, "GroupByAge"); HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null)); // initialize HCatOutputFormat job.setInputFormatClass(HCatInputFormat.class); job.setJarByClass(GroupByAge.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(WritableComparable.class); job.setOutputValueClass(DefaultHCatRecord.class); HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null)); HCatSchema s = HCatOutputFormat.getTableSchema(job); System.err.println("INFO: output schema explicitly set for writing:" + s); HCatOutputFormat.setSchema(job, s); job.setOutputFormatClass(HCatOutputFormat.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new GroupByAge(), args); System.exit(exitCode); } }
在编译上述程序之前,您必须下载一些 jar 并将其添加到此应用程序的 classpath 中。您需要下载所有 Hive jar 和 HCatalog jar(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。
使用以下命令将这些 jar 文件从本地复制到HDFS,并将其添加到classpath中。
bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar, hdfs:///tmp/hive-metastore-0.10.0.jar, hdfs:///tmp/libthrift-0.7.0.jar, hdfs:///tmp/hive-exec-0.10.0.jar, hdfs:///tmp/libfb303-0.7.0.jar, hdfs:///tmp/jdo2-api-2.3-ec.jar, hdfs:///tmp/slf4j-api-1.6.1.jar
使用以下命令编译并执行给定的程序。
$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive
现在,检查输出目录 (hdfs: user/tmp/hive) 中的输出 (part_0000, part_0001)。