package com.bw.mr;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;// yarn mr--->Mapper map Reducer reduce// Mapper:四个泛型 //keyin :Map端输入的K值 keyin :偏移量// hello word hello tom hello jim //hello word 9 (hello word) String// hello tom 17( hello tom)// hello jim .....//valuein: word // hadoop 的api writeable// keyout valueout ----> k(单词) public class WCMapper extends Mapper{ Text t=new Text(); IntWritable i =new IntWritable(1); @Override// map端 分别和1 组装 protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException {// hadoop Api " hello word hello tom" --->"hello"" word" hello tom String splits[]= value.toString().split(" ");// java hadoop for(String word:splits) {// word --->text t.set(word);// 上下文信息: map 端信息发出去 context 发出去 context.write(t, i); } }}
package com.bw.mr;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;// Mr :input map reduce output// reducer reduce hello(1,1,1,1,1)-->hello(1+1+1+...)// map(LongWriteable,text) --->(text,IntWriteable)\// reduce (text,IntWriteable) ---->(text,IntWriteable)// hello(1,1,1,1,1)-->public class WCReducer extends Reducer{// 重写 reduce 方法 @Override// text :word Iterable (111111111111111) protected void reduce(Text arg0, Iterable arg1, Reducer .Context arg2) throws IOException, InterruptedException {// reduce --->归并 ---》 word(1,1,1,1,...)---->word(count) int count =0;// 循环 。。。for for(IntWritable i:arg1) { count++; }// 输出最后 的结果 arg2.write(arg0,new IntWritable(count)); }}
package com.bw.mr;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount {// 触发 启动类 public static void main(String[] args) throws Exception { // 配置信息 Configuration conf = new Configuration(); // mr 程序 job Job job = Job.getInstance(conf); // job 运行 class job.setJarByClass(WordCount.class); // job.setMapperClass(WCMapper.class); // job:有关于 mr的全部 ----》jar包 (包含所有的四要素,所有的类) job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // job WC :mr:job 数据 FileInputFormat.addInputPath(job, new Path("hdfs://linux04:9000/aa.txt")); // 是经过 mapreduce 之后的输出结果 FileOutputFormat.setOutputPath(job, new Path("hdfs://linux04:9000/aajiegou.txt")); // job 要提交到集群上去的 job.waitForCompletion(true); // jar ---->集群上传 -————》 // hadoop jar wordcountjar cn.beiwang.mr.Wordcount // 1.8 hadoop jar hadoop jar jar hadoop jar wordcountjar 具体路径 }}