腾讯网站建设分析,手机网站快速建设,招标代理公司,深圳互动网站建设文章目录 #x1f4da;实验目的#x1f4da;实验平台#x1f4da;实验内容#x1f407;在本地编写程序和调试#x1f955;代码框架思路#x1f955;代码实现 #x1f407;在集群上提交作业并执行#x1f955;在集群上提交作业并执行#xff0c;同本地执行相比即需修改… 文章目录 实验目的实验平台实验内容在本地编写程序和调试代码框架思路代码实现 在集群上提交作业并执行在集群上提交作业并执行同本地执行相比即需修改路径。修改后通过expoet导出jar包关注 Main-Class 的设置在终端依次输入以下指令完成提交 实验目的
倒排索引Inverted Index被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射是目前几乎所有支持全文索引的搜索引擎都需要依赖的一个数据结构。通过对倒排索引的编程实现熟练掌握 MapReduce 程序在集群上的提交与执行过程加深对 MapReduce 编程框架的理解。
实验平台
操作系统LinuxHadoop 版本3.2.2JDK 版本1.8Java IDEEclipse
实验内容 关于倒排索引 在本地编写程序和调试 在本地 eclipse 上编写带词频属性的对英文文档的文档倒排索引程序要求程序能够实现对 stop-words(如 a,an,the,in,of 等词)的去除能够统计单词在每篇文档中出现的频率。文档数据和停词表可在此链接上下载在伪分布式环境下完成程序的编写和调试。 代码框架思路
Map()对输入的Text切分为多个word。这里的Map()包含setup()和map()。每一次map都伴随着一次setup进行停词筛选那些不需要统计的。Combine()将Map输出的中间结果相同key部分的value累加减少向Reduce节点传输的数据量。Partition()为了将同一个word的键值对发送到同一个Reduce节点对key进行临时处理。将原key的(word, filename)临时拆开使Partitioner只按照word值进行选择Reduce节点。基于哈希值的分片方法。Reduce()利用每个Reducer接收到的键值对中word是排好序的来进行最后的整合。将word#filename拆分开将filename与累加和拼到一起存在str中。每次比较当前的word和上一次的word是否相同若相同则将filename和累加和附加到str中否则输出key:wordvalue:str并将新的word作为key继续。上述reduce()只会在遇到新word时处理并输出前一个word故对于最后一个word还需要额外的处理。重载cleanup()处理最后一个word并输出 倒排索引的Map、Combiner、Partitioner部分就和上图一样 一个Map对应一个Combiner借助Combiner对Map输出进行一次初始整合一个Combiner又对应一个PartitionerPartitioner将同一个word的键值对发送到同一个Reduce节点 代码实现
关注本地路径
package index;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
import java.util.Vector;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class index
{public static class Map extends MapperObject, Text, Text, IntWritable {/*** setup():读取停词表到vector stop_words中*/VectorString stop_words;//停词表protected void setup(Context context) throws IOException {stop_words new VectorString();//初始化停词表Configuration conf context.getConfiguration();//读取停词表文件BufferedReader reader new BufferedReader(new InputStreamReader(FileSystem.get(conf).open(new Path(hdfs://localhost:9000/user/hadoop/input/stop_words_eng.txt))));String line;while ((line reader.readLine()) ! null) {//按行处理StringTokenizer itrnew StringTokenizer(line);while(itr.hasMoreTokens()){//遍历词,存入vectorstop_words.add(itr.nextToken());}}reader.close();}/*** map():对输入的Text切分为多个word* 输入key:当前行偏移位置 value:当前行内容* 输出key:word#filename value:1*/protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {FileSplit fileSplit (FileSplit) context.getInputSplit();String fileName fileSplit.getPath().getName();//获取文件名转换为小写String line value.toString().toLowerCase();//将行内容全部转为小写字母//只保留数字和字母String new_line;for(int i 0; i line.length(); i ) {if((line.charAt(i)48 line.charAt(i)57) || (line.charAt(i)97 line.charAt(i)122)) {//按行处理new_line line.charAt(i);} else {//其他字符保存为空格new_line ;}}line new_line.trim();//去掉开头和结尾的空格StringTokenizer strTokennew StringTokenizer(line);//按照空格拆分while(strToken.hasMoreTokens()){String str strToken.nextToken();if(!stop_words.contains(str)) {//不是停词则输出key-value对context.write(new Text(str#fileName), new IntWritable(1));}}}}public static class Combine extends ReducerText, IntWritable, Text, IntWritable {/*** 将Map输出的中间结果相同key部分的value累加减少向Reduce节点传输的数据量* 输入key:word#filename value:1* 输出key:word#filename value:累加和词频*/protected void reduce(Text key, IterableIntWritable values, Context context) throws IOException, InterruptedException {int sum 0;for (IntWritable val : values) {sum ;}context.write(key, new IntWritable(sum));}}public static class Partition extends HashPartitionerText, IntWritable {/*** 为了将同一个word的键值对发送到同一个Reduce节点对key进行临时处理* 将原key的(word, filename)临时拆开使Partitioner只按照word值进行选择Reduce节点* 基于哈希值的分片方法*/public int getPartition(Text key, IntWritable value, int numReduceTasks) {//第三个参数numPartitions表示每个Mapper的分片数也就是Reducer的个数String term key.toString().split(#)[0];//获取word#filename中的wordreturn super.getPartition(new Text(term), value, numReduceTasks);//按照word分配reduce节点 }}public static class Reduce extends ReducerText, IntWritable, Text, Text {/*** Reduce():利用每个Reducer接收到的键值对中word是排好序的,来进行最后的整合* 将word#filename拆分开将filename与累加和拼到一起存在str中* 每次比较当前的word和上一次的word是否相同若相同则将filename和累加和附加到str中否则输出key:wordvalue:str并将新的word作为key继续* 输入* key value* word1#filename 1 [num1,num2,...]* word1#filename 2 [num1,num2,...]* word2#filename 1 [num1,num2,...]* 输出* key:word value:filename1,词频filename2,词频...total,总词频*/private String lastfile null;//存储上一个filenameprivate String lastword null;//存储上一个wordprivate String str ;//存储要输出的value内容private int count 0;private int totalcount 0;protected void reduce(Text key, IterableIntWritable values, Context context) throws IOException, InterruptedException {String[] tokens key.toString().split(#);//将word和filename存在tokens数组中if(lastword null) {lastword tokens[0];}if(lastfile null) {lastfile tokens[1];}if (!tokens[0].equals(lastword)) {//此次word与上次不一样则将上次的word进行处理并输出str lastfile,count;total,totalcount.;context.write(new Text(lastword), new Text(str));//value部分拼接后输出lastword tokens[0];//更新wordlastfile tokens[1];//更新filenamecount 0;str;for (IntWritable val : values) {//累加相同word和filename中出现次数count val.get();//转为int}totalcount count;return;}if(!tokens[1].equals(lastfile)) {//新的文档str lastfile,count;;lastfile tokens[1];//更新文档名count 0;//重设count值for (IntWritable value : values){//计数count value.get();//转为int}totalcount count;return;}//其他情况只计算总数即可for (IntWritable val : values) {count val.get();totalcount val.get();}}/*** 上述reduce()只会在遇到新word时处理并输出前一个word故对于最后一个word还需要额外的处理* 重载cleanup()处理最后一个word并输出*/public void cleanup(Context context) throws IOException, InterruptedException {str lastfile,count;total,totalcount.;context.write(new Text(lastword), new Text(str));super.cleanup(context);}}public static void main(String args[]) throws Exception {Configuration conf new Configuration();conf.set(fs.defaultFS, hdfs://localhost:9000);if(args.length ! 2) {System.err.println(Usage: Relation in out);System.exit(2);}Job job Job.getInstance(conf, InvertedIndex);//设置环境参数job.setJarByClass(index.class);//设置整个程序的类名job.setMapperClass(Map.class);//设置Mapper类job.setCombinerClass(Combine.class);//设置combiner类job.setPartitionerClass(Partition.class);//设置Partitioner类job.setReducerClass(Reduce.class);//设置reducer类job.setOutputKeyClass(Text.class);//设置Mapper输出key类型job.setOutputValueClass(IntWritable.class);//设置Mapper输出value类型FileInputFormat.addInputPath(job, new Path(args[0]));//输入文件目录FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出文件目录System.exit(job.waitForCompletion(true) ? 0 : 1);//参数true表示检查并打印 Job 和 Task 的运行状况}}⭐补充当我们新建一个Package和Class后运行时可能会出现如下报错主要是在MapReduce编程输入输出里会遇到 ⭐解决办法
“Run As”选中“Run Configurations…” 然后在“Arguments”里输入input output然后再run就行了。 在集群上提交作业并执行 集群的服务器地址为 10.102.0.198用户主目录为/home/用户名hdfs 目录为/user/用户名。集群上的实验文档存放目录为 hdfs://10.102.0.198:9000/input/. 英文停词表文件存放位置为hdfs://10.102.0.198:9000/stop_words/stop_words_eng.txt。 在集群上提交作业并执行同本地执行相比即需修改路径。 修改后通过expoet导出jar包关注 Main-Class 的设置
选中index.java右键Export。 如下图选中JAR file后点Next。 确认选中index及其srcJAR的命名要和class名一样比如这里是index.java就是class index也就是index.jar。然后点Next。 到如下页面再点Next。 在Main class那点Browse选中index。 如下图。 最后点finish完成导出可在文件夹里找到index.jar。双击index.jar在它的METS-INT里头查看Main-Class是否设置成功。
在终端依次输入以下指令完成提交
使用 scp InvertedIndex.jar 用户名10.102.0.198:/home/用户名 命令将本地程序提交到 Hadoop 集群通过 ssh 用户名10.102.0.198 命令远程登录到 Hadoop 集群进行操作使用 hadoop jar InvertedIndex.jar /input /user/用户名/output 命令在集群上运行 Hadoop 作业指定输出目录为自己 hdfs 目录下的 output。使用 diff 命令判断自己的输出结果与标准输出的差异
scp index.jar bigdata_学号10.102.0.198:/home/bigdata_学号
ssh bigdata_学号10.102.0.198
hadoop jar index.jar /input /user/bigdata_学号/output
diff (hdfs dfs -cat /output/part-r-00000) (hdfs dfs -cat /user/bigdata_学号/output/part-r-00000)在浏览器中打开 http://10.102.0.198:8088可以查看集群上作业的基本执行情况。