hadoop之mapreduce快速上手

需求描述

将每一行文本数据变成<单词,1>这样的k,v数据

将相同单词的一组k,v数据进行聚合:累加所有的v

例如

可以创建多个文件,内容格式如下

hello java
hello python
。。。。

经过分析计算生成结果:

hello 2
java 1
python 1

前期准备

数据准备:

a.txt

hello java

hello python

hello hadoop

hello spark

b.txt c.txt

hello hadoop

hello python

hello python

hello python

c.txt

hello hadoop

word hadoop

hello spark

hello spark

启动hadoop(我这里namenode和datanode都在localhost上)

hadoop-daemon.sh start namenode dataname

启动yarn(也是在localhost启动)

start-yarn.sh

在hadoop中创建目录,并将a.txt b.txt c.txt上传

hadoop fs -mkdir -p /wordcount/input
hadoop fs -put *txt /wordcount/input

处理maptask的类,每读取一行调用一次,主要是将一行:hello java,转化为:(hello,1)(java,1)(hello,1)

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordcountMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for(String word:words){
context.write(new Text(word), new IntWritable(1));
}
}
}

执行reducetask类,处理maptask返回结果,进行聚合处理

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountReducer extends Reducer{
@Override
protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {
int count = 0;
Iterator iterator = values.iterator();
while(iterator.hasNext()){
IntWritable value = iterator.next();
count += value.get();
}
context.write(key, new IntWritable(count));
}
}

调度程序,将job进行执行在local上(local为hadoop本地虚拟出来的)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobSubmitterLinuxToYarn {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
 # conf.set("mapreduce.framework.name", "yarn"); # 设置job提交到yarn运行,默认local
Job job = Job.getInstance(conf);
job.setJarByClass(JobSubmitterLinuxToYarn.class);
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));
job.setNumReduceTasks(3);
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}

在linux上进行编译打包.java

1.1将代码上传至服务器

1.2开始进行编译打包

[root@node1 java_jar]# javac *.java -cp $(hadoop classpath)
[root@node1 java_jar]# vi MANIFEST.MF
#添加
Main-Class: JobSubmitterLinuxToYarn
[root@node1 java_jar]# jar -cvfm mr.jar MANIFEST.MF *.class
[root@node1 java_jar]# ls

开始执行任务

[root@node1 java_jar]# hadoop jar mr.jar JobSubmitterLinuxToYarn

控制台输出

查看hadoop目录

[root@node1 java_jar]# hadoop fs -ls /wordcount/output
[root@node1 java_jar]# hadoop fs -cat /wordcount/output/par*

如果你是在yarn上调度,可以打开 http://node1:8088/cluster

控制台输出

展开阅读全文

页面更新:2024-04-10

标签:控制台   上手   单词   文本   需求   快速   上传   代码   格式   服务器   目录   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top