快乐学习
前程无忧、中华英才非你莫属!

开发复杂的MapReduce应用程序

一、适合Hadoop的数据类型

Hadoop使用派生于Writable接口的类作为MapReduce计算的数据类型,这些数据类型用于整个MapReduce计算流的数据吞吐过程,这个过程从读取输入数据开始,到传输map和reduce任务之间的中间数据,一直到最后写入输出数据为止;为输入数据、中间数据和输出数据选择合适的Writable数据类型面对MapReduce程序的可编程性和性能有很大的影响
为了用作MapReduce计算的value数据类型,数据类型必须实现org.apache.hadoop.io.Writable接口;Writable接口定义了当需要数据传输和数据存储时,Hadoop应该如何序列化和反序列化为了用作MapReduce计算的key数据类型,数据类型必须实现org.apache.hadoop.io.WritableComparable<T>接口,除了Writable接口的功能之外,有一种WritableComparable接口更进一步定义了如何将这种类型的键相互比较,以达到排序的目的。

实战:下列步骤显示了如何配置Hadoop MapReduce 应用程序的输入和输出数据类型。

1.使用泛型类型变量为mapper的键值对指定输入数据类型(键:LongWritable,值:Text)和输出数据类型(键:Text,值:IntWritable)

public class SampleMapper extends Mapper<LongWritable,Text,Text,IntWritable>{

      public void map(LongWritable key, Text value,Context context). . .{
           . . .
      } 
}

2.使用通用型变量为reducer的键值对指定输入数据类型(键:Text,值:IntWritable)和输出数据类型(键:Text,值:IntWritable)【reducer的输入键值对数据类型应该和mapper的输出键值对数据类型相匹配

public class SampleMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
 
     public void map(LongWritable key, Text value,Context context). . .{
          . . . 
     }
}

3.使用Job对象指定MapReduce计算输出数据类型,如下面的代码片段所示。这些数据类型将用作 reducer和mapper 二者的输出类型,除非你如步骤4中完成的那样专门为mapper配置了输出类型,

Job job = new Job(.  .);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

4.或者,当mapper 和 reducer 有不同的输出键值对数据类型时候,可以通过下面的步骤,使用不同的数据类型配置mapper的输出键值对:

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

Hadoop提供一些基本数据类型,IntWritable,LongWritable,BooleanWritable,FloatWritable,ByteWritable,这是它们各自的Java基本数据类型的Writable版本,可以使用这些类型作为key类型和value类型。

下面是几种Hadoop内置的数据类型,可以作为作为key类型和value类型:

Text:存储UTF8文本
BytesWritable:存储一个字节序列
VIntWritable和VLongWritable:存储变长整型和长整型值
NullWritable:这是零长度的Writable类型(可以在不希望使用key或value类型的时候使用)

下面是Hadoop内置的集合数据类型【只能作为value类型】:

ArrayWritable:存储属于Writable类型的值数组(要使用ArrayWritable类型作为reduce输入的value类型,需要创建ArrayWritable的子类来指定存储在其中的Writable值的类型):

 public class LongArrayWritable extends ArrayWritable{
      public LongArrayWritable(){
         super(LongWritable.class);
      }
 }

TwoDArrayWritable:存储属于同一个Writable类型的值矩阵(要使用TwoDArrayWritable类型作为reduce输入的value类型,需要创建与ArrayWritable类型相似的TwoDArrayWritable类型的子类来指定存储的值的类型)
MapWritable:存储键值对的映射(键和值应该是Writable数据类型)
SortedMapWritable:存储键值对的有序映射(键应该事先WritableComparable接口)

二、实现自定义的Hadoop Writable数据类型

通过`org.apache.hadoop.io.Writable`接口编写一个定制Writable数据类型用于定义数据类型的序列化格式(基于Writable的接口类型可以用来作为Hadooop MapReduce计算的value类型)

假设日志包含5个部分:请求的主机,时间戳,请求的URL,相应大小,HTTP状态码,如下:

192.168.0.2 -- [01/Jul.1995:00:00:01-0400] "GET/history/appollo/HTTP/1.0" 200 6245

下面是实现HTTP服务器日志条目的自定义Hadoop Writable 数据类型的步骤。

操作步骤:

1.写一个新的LogWritable类实现org.apache.hadoop.io.Writable接口

public class LogWritable implements Writable{
     private Text userIP;
     private Text timestamp;
     private Text request;
     private IntWritable responseSize;
     private IntWritable status;
     public LogWritable(){
          this.userIP = new Text();
          this.timestamp = new Text();
          this.request = new Text();
          this.responseSize = new IntWritable();
          this.status = new IntWritable();
     }
    
     public void readFields(DataInput in)throws IOException{
          userIP.readFields(in);
          timestamp.readFields(in);
          request.readFields(in);
          responseSize.readFields(in);
          status.readFields(in);
     }
 
     public void write(DataOuput out)throws IOException{
          userIP.write(out);
          timestamp.write(out);
          request.write(out);
          responseSize.write(out);
          status.write(out);
     } . . .//getters and setters for the fields
}

2.使用新的LogWritable类型作为MapReduce计算的value类型 

   例如,使用LogWritable类型作为Map输出值的类型

   public class LogProcessMap extends Mapper<LongWritable,Text,Text,LogWritable>{
        ...
   }
   public class LogProcessReduce extends Reducer<Text,LogWritable,Text,IntWritable>{
        public void reduce(Text key,Iterable<LogWritable> values,Context context){
             ...
        }
    }

3.配置相应的作用的输出类型

Job job = new Job(...);
...
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LogWritable.class);

工作原理 

Writable接口包含2个方法:readFields()和 write(),在readFields()方法中我们反序列化输入数据,并填充Writable对象的字段:

public void readFields(DataInput in)throws IOException{
      userIP.readFields(in);
      timestamp.readFields(in);
      request.readFields(in);
      responseSize.readFields(in);
      status.readFields(in);
}

在上面的示例中,使用Writable类型作为自定义的Writable类型的字段,并且使用readFields()方法从DataInput对象的数据字段中反序列化数据,【当然也可以使用Java的基本数据类型作为Writabel类型的字段,并使用DataInput对象的想要读取方法从基础流中读取值】,如下面的代码所示:

int responseSize = in.readInt();
String userIP = in.readUTF();

在Write()方法中,在底层流中写入Writable对象的字段

                                                                                 

public void write(DataOuput out)throws IOException{
     userIP.write(out);
     timestamp.write(out);
     request.write(out);
     responseSize.write(out);
     status.write(out);
}

如果使用的是Java基本数据类型作为Writable对象的字段,可以使用DataOutPut对象中的对应写入方法写入底层流的值:

out.writeInt(responseSize);
out.writeUTF(userIP);

在实现自定义的Writable数据类型时,需要注意一下问题:

1.如果要添加一个自定义的构造函数用于自定义的Writable类,一定要保持默认的空构造函数
2.TextOutputFormat使用toString()方法来序列化key和value类型,如果使用的是TextOutputFormat序列化自定义的Writable类型的实例,那么要确保用于自定义Writable数据类型的是一个有意义的toString()实现
3.在读取输入数据时,Hadoop可多次重复使用Writable类的一个实例,在ReadFields()方法里面填充字段时,不应该依赖于该对象的现有状态

三、开发复杂的MapReduce应用程序详解:  

     

            http://blog.csdn.net/Wee_Mita/article/details/52750840

  

打赏
赞(0) 打赏
未经允许不得转载:同乐学堂 » 开发复杂的MapReduce应用程序

特别的技术,给特别的你!

联系QQ:1071235258QQ群:710045715

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续给力更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫打赏

微信扫一扫打赏

error: Sorry,暂时内容不可复制!