将每一行文本数据变成<单词,1>这样的k,v数据
将相同单词的一组k,v数据进行聚合:累加所有的v
例如
可以创建多个文件,内容格式如下
hello java
hello python
。。。。
经过分析计算生成结果:
hello 2
java 1
python 1
数据准备:
a.txt
hello java
hello python
hello hadoop
hello spark
b.txt c.txt
hello hadoop
hello python
hello python
hello python
c.txt
hello hadoop
word hadoop
hello spark
hello spark
启动hadoop(我这里namenode和datanode都在localhost上)
hadoop-daemon.sh start namenode dataname
启动yarn(也是在localhost启动)
start-yarn.sh
hadoop fs -mkdir -p /wordcount/input
hadoop fs -put *txt /wordcount/input
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordcountMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for(String word:words){
context.write(new Text(word), new IntWritable(1));
}
}
}
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountReducer extends Reducer{
@Override
protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {
int count = 0;
Iterator iterator = values.iterator();
while(iterator.hasNext()){
IntWritable value = iterator.next();
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobSubmitterLinuxToYarn {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
# conf.set("mapreduce.framework.name", "yarn"); # 设置job提交到yarn运行,默认local
Job job = Job.getInstance(conf);
job.setJarByClass(JobSubmitterLinuxToYarn.class);
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));
job.setNumReduceTasks(3);
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
1.1将代码上传至服务器
1.2开始进行编译打包
[root@node1 java_jar]# javac *.java -cp $(hadoop classpath)
[root@node1 java_jar]# vi MANIFEST.MF
#添加
Main-Class: JobSubmitterLinuxToYarn
[root@node1 java_jar]# jar -cvfm mr.jar MANIFEST.MF *.class
[root@node1 java_jar]# ls
[root@node1 java_jar]# hadoop jar mr.jar JobSubmitterLinuxToYarn
[root@node1 java_jar]# hadoop fs -ls /wordcount/output
[root@node1 java_jar]# hadoop fs -cat /wordcount/output/par*
如果你是在yarn上调度,可以打开 http://node1:8088/cluster
控制台输出
页面更新:2024-04-10
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号