网站引导制作,友情链接举例,房产网站有哪些,全球最大互联网公司排名MapReducer
目录
MapReducer
1.Hadoop是干嘛的
2.maven
3.MapReducer
1#xff09;分析数据 写sql
2#xff09;写程序
a.mapper程序
b.洗牌 分组排序
c.reducer程序
d.Test类 1.Hadoop是干嘛的
1#xff09;分布式存储 HDFS
2#xff09;处理大规模数据 Map…MapReducer
目录
MapReducer
1.Hadoop是干嘛的
2.maven
3.MapReducer
1分析数据 写sql
2写程序
a.mapper程序
b.洗牌 分组排序
c.reducer程序
d.Test类 1.Hadoop是干嘛的
1分布式存储 HDFS
2处理大规模数据 MapReducer 2.maven
1maven是用来下载jar包和加载依赖的
2项目管理 打jar包 项目之前依赖
3如何在maven中下载jar包
通过组id 工程名 和版本号就能确定一个工程 确定一个jar包 4下载jar包 要给他一个下载的网址 3.MapReducer
1分析数据 写sql 我们现在有一份订单数据
orderinfo
dt name money
2024-04-23,zhangsan,90
2024-04-23,lisi,50
2024-04-24,zhangsan,95
2024-04-24,lisi,55 现在 求商家每天的收入金额 假设我们现在使用sql语句求这个值 select sum(money),dt from orderinfo group by dt; 2写程序
a.mapper程序 ①用来接收每一行数据
②确定kv对 并输出kv对
k就是group后面的字段 v就是money //KEYIN, VALUEIN, KEYOUT, VALUEOUT
//在Hadoop的输入输出中 不让我们用Java类型 使用Hadoop对应的类型
//long对应LongWritable String对应Text Float对应FloatWritable
public class OrderMapper extends MapperLongWritable, Text,Text, FloatWritable {} Hadoop为什么不让使用Java类型
map的输出kv或reduce的输出kv最后写到磁盘上而用java类型写入磁盘序列化速度非常慢也就是说Java在作序列化的时候速度非常慢所以要用Hadoop类型对Java的序列化做了改进。 package com.pracle.mr;import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;//KEYIN, VALUEIN, KEYOUT, VALUEOUT
//在Hadoop的输入输出中 不让我们用Java类型 使用Hadoop对应的类型
//long对应LongWritable String对应Text Float对应FloatWritable
public class OrderMapper extends MapperLongWritable, Text,Text, FloatWritable {Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, Text, FloatWritable.Context context) throws IOException, InterruptedException {String[] orders value.toString().split(,);String okeyorders[0];String ovalueorders[2];//context表示输出 输入输出都是Hadoop类型context.write(new Text(okey),new FloatWritable(Float.parseFloat(ovalue)));}
}练习求流量和 写一下Mapper程序
182133434,2020-12-12,9000
2123444343,2020-12-13,900
2323432424,2020-12-12,900
23234344,2020-12-13,900 package com.pracle.mr;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FlowMapper extends MapperLongWritable, Text,LongWritable, DoubleWritable {Overrideprotected void map(LongWritable key, Text value, MapperLongWritable, Text, LongWritable, DoubleWritable.Context context) throws IOException, InterruptedException {String[] flow value.toString().split(,);String telephoneflow[0];String floflow[2];context.write(new LongWritable(Integer.parseInt(telephone)),new DoubleWritable(Double.parseDouble(flo)));}
}MapperLongWritable, Text,Text, FloatWritable
LongWritable字符个数
Text每一行数据
TextSQL语句中group 后面的字段
FloatWritableSQL语句中里面的字段 value b.洗牌 分组排序 Mapper运行完以后 将数据交给shuffle shuffle根据key默认升序对数据进行分组排序 c.reducer程序
一组一组读取数据 reducer有四个参数
kinshuffle中已经分好组的数据的key
vinkey对应的数据 可能会有多个 我们可以联想到list 数组
okey就是kin
ovalue:sum(money) package com.pracle.mr;import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;
import java.util.Iterator;public class OrderReducer extends ReducerText, FloatWritable,Text,FloatWritable {Overrideprotected void reduce(Text key, IterableFloatWritable values, ReducerText, FloatWritable, Text, FloatWritable.Context context) throws IOException, InterruptedException {IteratorFloatWritable it values.iterator();float sum0;while (true){if(it.hasNext()){FloatWritable f it.next();sumf.get();//类对象不能做 - * /}else {break;}}
context.write(key,new FloatWritable(sum));}
}d.Test类
Test类主要用来创建一个作业 完成对一份数据的处理 package com.pracle.mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
//Test用于提交我们的Job作业
public class Test {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// System.setProperty(hadoop.home.dir,D:\\ALidownload\\hadoop-27);Configuration configurationnew Configuration();//获取Job的实例对象Job job Job.getInstance(configuration);//设置驱动的类job.setJarByClass(Test.class);//设置Mapper的具体实现类job.setMapperClass(OrderMapper.class);//设置Map端输出的数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FloatWritable.class);//设置Reducer的具体实现类job.setReducerClass(OrderReducer.class);//设置Reduce端输出的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FloatWritable.class);//设置输入输出路径FileInputFormat.setInputPaths(job,new Path(D:\\IDEA_workplace\\jtxy_hdfs\\data\\a.txt));FileOutputFormat.setOutputPath(job,new Path(D:\\IDEA_workplace\\jtxy_hdfs\\output\\a_out.txt));if(job.waitForCompletion(true)){System.out.println(程序运行成功);}else{System.out.println(程序运行失败);}}
}在运行程序之前 我们先配置一下Hadoop的环境变量 环境变量配置完成以后 我们点击运行就可以了 点击查看结果 上面的程序我们也可以在Hadoop上面运行
我们修改以下内容 然后打包 将我们打包的jar放到HDFS上去
输入以下命令 证明MapReduce也可以在Hadoop上运行 hadoop fs -put a.txt /
hadoop jar jtxy_hdfs-1.0-SNAPSHOT-jar-with-dependencies.jar /a.txt /out3