4. MapReduce-1

MR 相关概念

  1. Job(作业) : 一个MR程序称为一个Job
  2. MRAppMaster(MR任务的主节点): 一个Job在运行时,会先启动一个进程,这个进程为 MRAppMaster。负责 Job 中执行状态的监控,容错,和 RM 申请资源,提交 Task 等。
  3. Task(任务): Task是一个进程,负责某项计算。
  4. Map(Map阶段): Map 是 MapReduce 程序运行的第一个阶段。Map阶段的目的是将输入的数据,进行切分。将一个大数据,切分为若干小部分。切分后,每个部分称为1片(split),每片数据会交给一个Task(进程)进行计算,负责 Map 阶段的 Task 称为 MapTask。在一个 MR 程序的 Map 阶段,会启动N(取决于切片数,多少个切片就会启动多少个 MapTask)个 MapTask。每个 MapTask 是并行运行。
  5. Reduce(Reduce阶段): Reduce 是MapReduce 程序运行的第二个阶段(最后一个阶段),Reduce 阶段的目的是将 Map 阶段,每个 MapTask 计算后的结果进行合并汇总,得到最终结果。Reduce阶段是可选的,不一定有。负责 Reduce 阶段的 Task 称为ReduceTask。一个Job可以通过设置,启动N个ReduceTask,这些ReduceTask也是并行运行,每个ReduceTask最终都会产生一个结果。

MR 相关组件

  1. Mapper: map 阶段核心的处理逻辑
  2. Reducer: reduce 阶段核心的处理逻辑
  3. InputFormat: 输入格式。MR 程序必须指定一个输入目录,一个输出目录,InputFormat 代表输入目录中文件的格式。如果是普通文件,可以使用FileInputFormat。如果是SequeceFile(hadoop提供的一种文件格式),可以使用 SequnceFileInputFormat,如果处理的数据在数据库中,需要使用 DBInputFormat。
  4. RecordReader: 记录读取器。RecordReader 负责从输入格式中,读取数据,读取后封装为一组记录(k-v)。
  5. OutPutFormat: 输出格式。OutPutFormat 代表 MR 处理后的结果,要以什么样的文件格式写出。将结果写出到一个普通文件中,可以使用 FileOutputFormat,将结果写出到数据库中,可以使用 DBOutPutFormat,将结果写出到 SequeceFil e中,可以使用 SequnceFileOutputFormat。
  6. RecordWriter: 记录写出器。将处理的结果以什么样的格式写出到输出文件中。
  7. Partitioner: 分区器。负责在 Mapper 将数据写出时,为每组 keyout-valueout 打上标记,进行分区。一个ReduceTask只会处理一个分区的数据。

MR 流程

  1. InputFormat 调用 RecordReader,从输入目录的文件中,读取一组数据,封装为 keyin-valuein 对象
  2. 将封装好的 key-value,交给 Mapper.map() ——>将处理的结果写出 keyout-valueout
  3. ReduceTask 启动 Reducer,使用 Reducer.reduce() 处理 Mapper写出的 keyout-valueout,
  4. OutPutFormat 调用 RecordWriter,将 Reducer 处理后的 keyout-valueout 写出到文件

Map阶段(MapTask): 切片(Split) —– 读取数据(Read) —– 交给Mapper处理(Map) —– 分区和排序(sort)
Reduce阶段(ReduceTask): 拷贝数据(copy) —– 排序(sort) —– 合并(reduce) —– 写出(write)

MR 编程

MR的编程只需要将自定义的组件和系统默认组件进行组合,组合之后运行即可,步骤:

  1. Map 阶段的核心处理逻辑需要编写在 Mapper 中
  2. Reduce 阶段的核心处理逻辑需要编写在 Reducer 中
  3. 将编写的 Mapper 和 Reducer 进行组合,组合成一个 Job
  4. 对 Job 进行设置,设置后运行

wordcount

InputFormat 的实现类很多

InputFormat 的作用:

  1. 验证输入目录中的文件格式,是否符合当前 Job 的要求
  2. 生成切片,每个切片都会交给一个 MapTask 处理
  3. 提供 RecordReader,由 RecordReader 从切片中读取记录,交给 Mapper 处理

InputFormat 中的 List<InputSplit> getSplits 方法的功能就是切片。ecordReader<K,V> createRecordReader 的功能是创建 RecordReader。默认 Hadoop 使用的是 TextInputFormat,而 TextInputFormat 创建的 RecordReader 是 LineRecordReader。所以 Hadoop 默认的 InputFormat 使用 TextInputFormat,默认是 Reader 使用 LineRecordReader。

本地模式

WCMapper 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.yanrs.mr.wordcount;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
* KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
*/
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// key 是行号,value 是一行的文本内容
System.out.println("keyin: " + key + " valuein: " + value);
// 将文本内容进行拆分,得到一个个单词组成的数组
String[] words = value.toString().split("\t");
// 遍历数组,并输出,输出格式为(单词,1)
for (String word:words) {
outKey.set(word);
context.write(outKey, outValue);
}
}
}

WCReducer 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.yanrs.mr.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* KEYIN,VALUEIN: Mapper 的输出做为这里的输入
* KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
*/
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable outValue = new IntWritable();

//reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
for (IntWritable value:values) {
sum+=value.get();
}

outValue.set(sum);
// 将结果写出,key 是单词,outValue 是累加的次数
context.write(key, outValue);
}
}

WCDriver 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.yanrs.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
* 启动这个进程,那么就会运行该 job
*/
public class WCDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 获取文件系统
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
FileSystem fileSystem = FileSystem.get(conf);

// 设置输入目录和输出目录
Path inputPath = new Path("/wcinput");
Path outPath = new Path("/mroutput");
// 输出目录存在就删除
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

// 创建 Job
Job job = Job.getInstance(conf);

// 设置 job 名称
job.setJobName("wordcount");

// 设置job运行的 Mapper,Reducer
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);

// 设置 Mapper,Reducer 的输出 key 和 value 类型。
// job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
// 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 设置输入输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);

// 运行 Job 并打印日志信息
job.waitForCompletion(true);
}
}

直接在 idea 中运行 WCDriver 的 main 方法即可。上面设置连接的是 Hadoop10 的文件系统,但是是在本地运行的。

代码地址

yarn 上运行

WCMapper 完整代码,同上

WCReducer 完整代码,同上

在 yarn 上运行,需要指定运行方式为 yarn,且指定 resourcemanager 的地址

1
2
3
// 设置在 yarn 上运行
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hadoop11");

还需要设置 job 所在的 jar 包

1
2
3
// yarn 运行时候还需要设置 job 所在的 jar 包
job.setJarByClass(WCDriver.class);
// 或者使用

将代码打包,上传到 hadoop 上,使用 hadoop jar 命令运行

1
hadoop jar mapreduce-test-1.0-SNAPSHOT.jar com.yanrs.mr.wordcount.WCDriver

WCDriver 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.yanrs.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
* 启动这个进程,那么就会运行该 job
*/
public class WCDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 获取文件系统
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
// 设置在 yarn 上运行
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hadoop11");

FileSystem fileSystem = FileSystem.get(conf);

// 设置输入目录和输出目录
Path inputPath = new Path("/wcinput");
Path outPath = new Path("/mroutput");
// 输出目录存在就删除
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

// 创建 Job
Job job = Job.getInstance(conf);

// yarn 运行时候还需要设置 job 所在的 jar 包
job.setJarByClass(WCDriver.class);
// 或者使用
// job.setJar("mapreduce-test-1.0-SNAPSHOT.jar");

// 设置 job 名称
job.setJobName("wordcount");

// 设置job运行的 Mapper,Reducer
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReducer.class);

// 设置 Mapper,Reducer 的输出 key 和 value 类型。
// job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
// 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 设置输入输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);

// 运行 Job 并打印日志信息
job.waitForCompletion(true);
}
}

代码地址

自定义 Bean

数据格式如上所示,需要统计每个手机消耗的上行,下行,总流量信息

FlowBeanMapper 代码如下,mapper 输入参数 key 为行号,value 为一行的文本。mapper 输出参数 key 手机号,value 为 bean 对象(对象中分别有上行,下行,总流量三个属性)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.yanrs.mr.flowbean;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* mapper 输入参数 key 为行号,value 为一行的文本
* mapper 输出参数 key 手机号,value bean 对象(对象中分别有上行,下行,总流量三个属性)
*/
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
private Text outKey = new Text();
private FlowBean flowBean = new FlowBean();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// key 为序列号,value 为每行的内容
String[] words = value.toString().split("\t");

// 封装手机号
outKey.set(words[1]);
// 上行流量
flowBean.setUpFlow(Long.parseLong(words[words.length - 3]));
// 下行流量
flowBean.setDownFlow(Long.parseLong(words[words.length - 2]));
context.write(outKey, flowBean);
}
}

FlowBean 为实体类,有三个属性,需要实现 hadoop 的序列化方法。需要重写 write(称为序列化) 和 readFields(称为反序列化) 方法。并且反序列化和序列化的顺序要一致,并且提供属性的 get,set 方法,空参构造,toString 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.yanrs.mr.flowbean;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;

/**
* 序列化, 在写出属性时,如果属性为引用数据类型,那么属性不能为 null
* @param dataOutput
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}

/**
* 反序列化,反序列化和序列化的顺序要一致
* @param dataInput
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

public FlowBean() {
}

@Override
public String toString() {
return "FlowBean{" +
"upFlow=" + upFlow +
", downFlow=" + downFlow +
", sumFlow=" + sumFlow +
'}';
}
}

FlowBeanReducer 处理 FlowBeanMapper 输出的数据,所以输入 key 和 value 的类型分别为 Text 和 FlowBean。输出也为 Text, FlowBean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.yanrs.mr.flowbean;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* 输入 key 和 value 的类型分别为 Text 和 FlowBean
*
*/
public class FlowBeanReducer extends Reducer <Text, FlowBean, Text, FlowBean>{

private FlowBean outValue = new FlowBean();

@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
// 累加每个手机号的上行流量和下行流量,并计算总流量
long sumUpFlow = 0;
long sumDownFlow = 0;

for (FlowBean flowBean: values) {
sumUpFlow += flowBean.getUpFlow();
sumDownFlow += flowBean.getDownFlow();
}

// 将值封装进入 FlowBean 中
outValue.setDownFlow(sumDownFlow);
outValue.setUpFlow(sumUpFlow);
outValue.setSumFlow(sumDownFlow + sumUpFlow);

context.write(key, outValue);
}
}

FlowBeanDriver 中设置输入和输出目录,设置 MapperClass 和 ReducerClass。设置 Mapper,Reducer 的输出 key 和 value 类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.yanrs.mr.flowbean;

import com.yanrs.mr.wordcount.WCMapper;
import com.yanrs.mr.wordcount.WCReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
* 启动这个进程,那么就会运行该 job
*/
public class FlowBeanDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 获取文件系统
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop10:9000");

FileSystem fileSystem = FileSystem.get(conf);

// 设置输入目录和输出目录
Path inputPath = new Path("/mrinput/flowbean");
Path outPath = new Path("/mroutput/flowbean");
// 输出目录存在就删除
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

// 创建 Job
Job job = Job.getInstance(conf);

// 设置 job 名称
job.setJobName("FlowBean");

// 设置job运行的 Mapper,Reducer
job.setMapperClass(FlowBeanMapper.class);
job.setReducerClass(FlowBeanReducer.class);

// 设置 Mapper,Reducer 的输出 key 和 value 类型。
// job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
// 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 设置输入输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);

// 运行 Job 并打印日志信息
job.waitForCompletion(true);
}
}

因为没有配置在 yarn 上运行,所以直接 idea 运行即可。结果如下

1
2
3
4
5
6
7
8
13470253144	FlowBean{upFlow=180, downFlow=180, sumFlow=360}
13509468723 FlowBean{upFlow=7335, downFlow=110349, sumFlow=117684}
13560439638 FlowBean{upFlow=918, downFlow=4938, sumFlow=5856}
13568436656 FlowBean{upFlow=3597, downFlow=25635, sumFlow=29232}
13590439668 FlowBean{upFlow=1116, downFlow=954, sumFlow=2070}
13630577991 FlowBean{upFlow=6960, downFlow=690, sumFlow=7650}
13682846555 FlowBean{upFlow=1938, downFlow=2910, sumFlow=4848}
......

代码地址

默认的切片流程

片和块的关系

:在计算MR程序时,才会切片。在运行程序时,临时将文件从逻辑上划分为若干部分(所以只是逻辑上的切片,并不是真正的切分),使用的输入格式不同(不同的 InputFormat),切片的方式不同,切片的数量也不同。每片的数据最终也是以块的形式存储在 HDFS。

: 在向HDFS写文件时,文件中的内容以块为单位存储,块是实际的物理存在。

建议: 片大小最好等于块大小,将片大小设置和块大小一致,可以最大限度减少因为切片带来的磁盘IO和网络IO,MR计算框架速度慢的原因在于在执行MR时,会发生频繁的磁盘IO和网络IO。理论上来说:如果文件的数据量是一定的话,片越大,切片数量少,启动的 MapTask 少,Map 阶段运算慢,片越小,切片数量多,启动的MapTask多,Map阶段运算快。默认情况下片大小就是块大小,即文件的块大小默认为 128M,默认每片就是128M。MapTask的数量只取决于切片数,有多少切片就有多少个 MapTask

如果需要调节片大小 > 块大小:那么需要配置 mapreduce.input.fileinputformat.split.minsize > 128M

如果需要调节片大小 < 块大小:那么需要配置 mapreduce.input.fileinputformat.split.maxsize < 128M

FileInputFormat的切片策略(默认)
  1. 获取当前输入目录中所有的文件
  2. 以文件为单位切片,如果文件为空文件,默认创建一个空的切片
  3. 如果文件不为空,尝试判断文件是否可切(不是压缩文件,都可切)
  4. 如果文件不可切,整个文件作为1片
  5. 如果文件可切,先获取片大小(默认等于块大小),循环判断 待切部分/ 片大小 > 1.1倍,如果大于先切去一片,再判断…
  6. 剩余部分整个作为1片

常见的输入格式

FileInputFormat 中有六个子类,下面总结一下常见的四个子类的切片策略和 RecordReader

TextInputFormat

TextInputFormat 常用于输入目录中全部是文本文件

切片策略: 默认的切片策略

RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value,即 key 的类型为 LongWritable,value 的类型为 Text

上面的 wordcount 例子就是使用的默认的 TextInputFormat

NlineInputFormat

切片策略: 以文件为单位,读取配置中 mapreduce.input.lineinputformat.linespermap 参数(默认为1),每次这么多行切为一片。

RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为key,一行内容作为value,即 key 的类型为 LongWritable,value 的类型为 Text

NLMapper 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.yanrs.mr.nline;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
* KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
*/
public class NLMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// key 是行号,value 是一行的文本内容
System.out.println("keyin: " + key + " valuein: " + value);
// 将文本内容进行拆分,得到一个个单词组成的数组
String[] words = value.toString().split("\t");
// 遍历数组,并输出,输出格式为(单词,1)
for (String word:words) {
outKey.set(word);
context.write(outKey, outValue);
}
}
}

NLReducer 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.yanrs.mr.nline;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* KEYIN,VALUEIN: Mapper 的输出做为这里的输入
* KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
*/
public class NLReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable outValue = new IntWritable();

//reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
for (IntWritable value:values) {
sum+=value.get();
}

outValue.set(sum);
// 将结果写出,key 是单词,outValue 是累加的次数
context.write(key, outValue);
}
}

NLDriver 完整代码。在 Driver 中新增设置使用 NLineInputFormat。默认是一行切分为一片,如果需要设置可以在 conf 中设置 mapreduce.input.lineinputformat.linespermap 值即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.yanrs.mr.nline;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
* 启动这个进程,那么就会运行该 job
*/
public class NLDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 获取文件系统
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
// 设置几行为一片,默认一行一片
// conf.set("mapreduce.input.lineinputformat.linespermap", "2");

FileSystem fileSystem = FileSystem.get(conf);

// 设置输入目录和输出目录
Path inputPath = new Path("/mrinput/nline");
Path outPath = new Path("/mroutput/nline");
// 输出目录存在就删除
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

// 创建 Job
Job job = Job.getInstance(conf);

// 设置使用 NLineInputFormat
job.setInputFormatClass(NLineInputFormat.class);

// 设置 job 名称
job.setJobName("nline");

// 设置job运行的 Mapper,Reducer
job.setMapperClass(NLMapper.class);
job.setReducerClass(NLReducer.class);

// 设置 Mapper,Reducer 的输出 key 和 value 类型。
// job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
// 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 设置输入输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);

// 运行 Job 并打印日志信息
job.waitForCompletion(true);
}
}

代码地址

KeyValueTextInputFormat

针对文本文件,使用分割字符,将每一行分割为 key 和 value,如果没有找到分隔符,当前行的内容作为 key,value 为空串。默认分隔符为 \t,可以通过参数 mapreduce.input.keyvaluelinerecordreader.key.value.separator 指定。

切片策略:默认的切片策略

RecordReader : key 和 value 的类型都是 Text

KVMapper 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.yanrs.mr.keyvalue;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
* KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
*/
public class KVMapper extends Mapper<Text, Text, Text, IntWritable> {

private IntWritable outValue = new IntWritable(1);

@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
// key 是 * 之前的姓名,value 是计数1
context.write(key, outValue);
}
}

KVReducer 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.yanrs.mr.keyvalue;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* KEYIN,VALUEIN: Mapper 的输出做为这里的输入
* KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
*/
public class KVReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable outValue = new IntWritable();

//reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
for (IntWritable value:values) {
sum+=value.get();
}

outValue.set(sum);
// 将结果写出,key 是单词,outValue 是累加的次数
context.write(key, outValue);
}
}

KVDriver 完整代码如下,需要设置使用 KeyValueTextInputFormat,并且需要设置分隔符,需要注意的是分隔符只是一个 byte 类型的数据,即便传入的是一个字符串,也只会读取第一个字符。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.yanrs.mr.keyvalue;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
* 启动这个进程,那么就会运行该 job
*/
public class KVDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 获取文件系统
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
// 设置分隔符(需要注意的是分隔符只是一个 byte 类型的数据,即便传入的是一个字符串,也只会读取第一个字符)
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "*");

FileSystem fileSystem = FileSystem.get(conf);

// 设置输入目录和输出目录
Path inputPath = new Path("/mrinput/keyvalue");
Path outPath = new Path("/mroutput/keyvalue");
// 输出目录存在就删除
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

// 创建 Job
Job job = Job.getInstance(conf);

// 设置使用 KeyValueTextInputFormat
job.setInputFormatClass(KeyValueTextInputFormat.class);

// 设置 job 名称
job.setJobName("keyvalue");

// 设置job运行的 Mapper,Reducer
job.setMapperClass(KVMapper.class);
job.setReducerClass(KVReducer.class);

// 设置 Mapper,Reducer 的输出 key 和 value 类型。
// job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
// 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 设置输入输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);

// 运行 Job 并打印日志信息
job.waitForCompletion(true);
}
}

代码地址

CombineTextInputFormat

改变了传统的切片方式。将多个小文件,划分到一个切片中,适合小文件过多的场景。

切片策略: 先确定片的最大值 maxSize,maxSize 通过参数 mapreduce.input.fileinputformat.split.maxsize 设置。流程是以文件为单位,将每个文件划分为若干 part,如果文件的待切部分的大小小于等于 maxSize, 则整个待切部分作为1个 part,如果文件的待切部分的大小大于 maxsize 但是小于等于 2 maxSize, 那么将整个待切部分均匀的切分为2个 part。如果文件的待切部分的大小大于 2 maxSize, 那么先切去 maxSize 大小,得到 1个 part,剩余待切部分继续判断

RecordReader: LineRecordReader,一次处理一行,将一行内容的偏移量作为 key,一行内容作为 value,即 key 的类型为 LongWritable,value 的类型为 Text

CMMapper 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.yanrs.mr.combine;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* KEYIN, VALUEIN: mapper 输入的 key-value 类型,由当前 JOb 的 InputFormat的 RecordReader 决定
* KEYOUT, VALUEOUT:mapper 输出的 key-value 类型
*/
public class CMMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// key 是行号,value 是一行的文本内容
System.out.println("keyin: " + key + " valuein: " + value);
// 将文本内容进行拆分,得到一个个单词组成的数组
String[] words = value.toString().split("\t");
// 遍历数组,并输出,输出格式为(单词,1)
for (String word:words) {
outKey.set(word);
context.write(outKey, outValue);
}
}
}

CMReducer 完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.yanrs.mr.combine;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* KEYIN,VALUEIN: Mapper 的输出做为这里的输入
* KEYOUT,VALUEOUT: 自定义,因为这个 MR 程序是统计单词出现的频率,所以这里类型为 Text, IntWritable
*/
public class CMReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable outValue = new IntWritable();

//reduce 方法一次处理一组数据,key(单词) 相同的数据是一组
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 遍历每个 key(单词) ,让相同的 key(单词) 的值进行累加
for (IntWritable value:values) {
sum+=value.get();
}

outValue.set(sum);
// 将结果写出,key 是单词,outValue 是累加的次数
context.write(key, outValue);
}
}

CMDriver 完整代码。需要设置多大文件切为一片,设置使用 CombineTextInputFormat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.yanrs.mr.combine;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
* 启动这个进程,那么就会运行该 job
*/
public class CMDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 获取文件系统
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop10:9000");
// 设置多大文件切为一片
conf.set("mapreduce.input.fileinputformat.split.maxsize", "2048");

FileSystem fileSystem = FileSystem.get(conf);

// 设置输入目录和输出目录
Path inputPath = new Path("/mrinput/combine");
Path outPath = new Path("/mroutput/combine");
// 输出目录存在就删除
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

// 创建 Job
Job job = Job.getInstance(conf);

// 设置使用 CombineTextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);

// 设置 job 名称
job.setJobName("combine");

// 设置job运行的 Mapper,Reducer
job.setMapperClass(CMMapper.class);
job.setReducerClass(CMReducer.class);

// 设置 Mapper,Reducer 的输出 key 和 value 类型。
// job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
// 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 设置输入输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);

// 运行 Job 并打印日志信息
job.waitForCompletion(true);
}
}

代码地址

MR 核心阶段划分

MapTask 阶段

  1. map
  2. sort

RedcueTask 阶段

  1. copy
  2. sort
  3. reduce

shuffle 阶段

上面的 2-4 又称为 shuffle 阶段。Shuffle 阶段横跨 MapTask 和 RedcueTask,在MapTask端也有 Shuffle,在RedcueTask 也有 Shuffle。具体 Shuffle 阶段指 MapTask 的 map 方法运行之后到 RedcuceTask 的 reduce 方法运行之前。

总结

mapper 的输出,为 reducer 的输入,mapper 的输出由不同的 InputFormat 的 RecordReader 决定。

不同的 InputFormat 有着不同的切片策略,默认如果不设置,那么使用的是 TextInputFormat。

reduce 方法一次处理一组数据,key 相同的数据为一组。

mapper 和 reducer 的输出数据格式由自己根据需求来设置,可以是 hadoop 内置的类型,也可以自定义 bean。

如果要将编写好的程序在 yarn 上运行,那么需要配置 yarn 的地址,设置 job 所在的 jar 包,将程序打包为 jar 之后运行。