莱西网站制作,网络推广怎么优化,长沙做企业网站的公司,网站的格式分类文章目录 序列化概述自定义bean对象实现序列化接口#xff08;Writable#xff09;案例需求编写MapReduce程序运行结果 序列化概述
序列化就是把内存中的对象#xff0c;转换成字节序列#xff08;或其他数据传输协议#xff09;以便于存储到磁盘#xff08;持久化… 文章目录 序列化概述自定义bean对象实现序列化接口Writable案例需求编写MapReduce程序运行结果 序列化概述
序列化就是把内存中的对象转换成字节序列或其他数据传输协议以便于存储到磁盘持久化和网络传输。 反序列化就是将收到字节序列或其他数据传输协议或者是磁盘的持久化数据转换成内存中的对象。
自定义bean对象实现序列化接口Writable
在企业开发中往往常用的基本序列化类型不能满足所有需求比如在Hadoop框架内部传递一个bean对象那么该对象就需要实现序列化接口。 具体实现bean对象序列化步骤如下7步 1必须实现Writable接口 2反序列化时需要反射调用空参构造函数所以必须有空参构造
public FlowBean() {super();
}3重写序列化方法
Override
public void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);
}4重写反序列化方法
Override
public void readFields(DataInput in) throws IOException {upFlow in.readLong();downFlow in.readLong();sumFlow in.readLong();
}5注意反序列化的顺序和序列化的顺序完全一致 6要想把结果显示在文件中需要重写toString()可用\t分开方便后续用。 7如果需要将自定义的bean放在key中传输则还需要实现Comparable接口因为MapReduce框中的Shuffle过程要求对key必须能排序。
Override
public int compareTo(FlowBean o) {// 倒序排列从大到小return this.sumFlow o.getSumFlow() ? -1 : 1;
}案例需求
统计每一个手机号耗费的总上行流量、总下行流量、总流量 输入总数据 输入数据格式 7 13560436666 120.196.100.99 1116 954 200 id 手机号码 网络ip 上行流量 下行流量 网络状态码 期望输出数据格式 13560436666 1116 954 2070 手机号码 上行流量 下行流量 总流量
编写MapReduce程序 FlowBean
package com.atxiaoyu.xuliehua;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class FlowBean implements Writable {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //总流量//空参构造public FlowBean() {}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow sumFlow;}public void setSumFlow() {this.sumFlow this.upFlowthis.downFlow;}Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}Overridepublic void readFields(DataInput in) throws IOException {this.upFlowin.readLong();this.downFlowin.readLong();this.sumFlowin.readLong();}Overridepublic String toString() {return upFlow\tdownFlow\tsumFlow;}
}
FlowMapper:
package com.atxiaoyu.xuliehua;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends MapperLongWritable, Text,Text,FlowBean {private Text outKnew Text();private FlowBean outVnew FlowBean();Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 获取一行String linevalue.toString();//切割String[] splitline.split(\t);//抓取想要的数据String phonesplit[1];String upsplit[split.length-3]; //上行流量String downsplit[split.length-2]; //下行流量//封装outK.set(phone);outV.setUpFlow(Long.parseLong(up));outV.setDownFlow(Long.parseLong(down));outV.setSumFlow();// 写出context.write(outK,outV);}
}
FlowReducer:
package com.atxiaoyu.xuliehua;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FlowReducer extends ReducerText,FlowBean,Text,FlowBean {private FlowBean outVnew FlowBean();Overrideprotected void reduce(Text key, IterableFlowBean values, Context context) throws IOException, InterruptedException {//遍历集合累加值long totalUp0;long totalDown0;for (FlowBean value : values) {totalUptotalUpvalue.getUpFlow();totalDowntotalUpvalue.getDownFlow();//封装outK,outVoutV.setUpFlow(totalUp);outV.setDownFlow(totalDown);outV.setSumFlow();//写出context.write(key,outV);}}
}
FlowDriver:
package com.atxiaoyu.xuliehua;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.kerby.config.Conf;import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {Configuration conf new Configuration();//1 获取jobJob job Job.getInstance(conf);//2 设置jar包路径job.setJarByClass(FlowDriver.class);// 3 管理mapper和reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 4 设置map输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//5 设置最终输出的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//6 设置输入路径和输出路径FileInputFormat.setInputPaths(job, new Path(D:\\input));FileOutputFormat.setOutputPath(job, new Path(D:\\output));//7 提交jobboolean result job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
运行结果 与我们设想的输出结果一致。