5. MapReduce-2

MapTask 工作机制

MR 核心阶段:map,sort,copy,sort,reduce

MapTask 分为两个阶段,分别是 map 和 sort。在执行 context.write(keyout-valueout) 之前都属于 map 阶段,然后如果该 job 有 ReduceTask,那么在进行 sort 排序。

在执行 context.write() 并不是直接将 key-value 写出,而是先攒到一个缓存区 MapOutPutBuffer 中。每个记录在进入缓冲区时,先调用 Partitioner(分区器)为记录计算一个区号。例如以单词统计为例,数据进入缓冲区后格式可能如下,index 为索引,partition为分区号,keystart,valuestart 分别表示 key 的起始位置和 value 的起始位置,key 为统计的单词,value 单词出现的个数。

1
2
3
4
index    partition  keystart  valuestart   key      value
0 1 0 6 hadoop 1
1 1 7 11 hive 1
2 0 xx xx spark 1

缓存区有两个线程,一个为收集线程,收集线程负责将 Mapper 写出的 key-value 收集到缓冲区。第二个为溢写线程,溢写线程会在缓冲区已经收集了 80% 空间的数据时【缓冲区大小默认为 100M,80% 即 80M】,被唤醒,唤醒后负责将缓冲区收集的数据溢写到磁盘。一旦缓冲区满足溢写条件,先对缓冲区的所有数据,进行一次排序,利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号 Partition 进行排序,然后按照 key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。排序时,只排索引(记录有序的索引的顺序),不移动数据。达到溢写条件后,按照分区,进行溢写,每次溢写生成一个临时文件 spillx.out【x为分区号】。溢写多次,生成多个临时文件。当所有的数据全部被溢写结束,最后一批数据不满足溢写条件会执行一次 flush。

写结束后,会对所有的溢写片段执行一次 merge (将多个临时文件合并成一个最终结果)操作。合并时,将所有临时文件同一个分区的数据进行汇总,汇总后再排序【归并排序】,最后合并为一个文件,这个文件每个分区中的 key-value 都是有序的。

ReduceTask 工作机制

  1. Copy 阶段:ReduceTask 从各个MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
  2. Merge 阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
  3. Sort 阶段:按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按 key 进行聚集的一组数据。为了将key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
  4. Reduce阶段:reduce() 函数将计算结果写到HDFS上。

Partition分区

默认的分区策略是根据 key 的 hashCode 对 ResultTasks 个数取模得到的。可以从配置中 mapreduce.job.partitioner.class 参数来设置分区器,如果没有设置,就使用 HashPartitioner 作为分区器。分区的数量和 ReduceTask 的数量一致,一个 ReduceTask 对应着一个分区,所以如果想设置多个分区,那么就需要设置 ReduceTask 的数量。

分区案例

将统计结果按照手机归属地不同省份输出到不同文件中(分区)。期望输出数据,手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

PartitionerFlowBeanDriver 完整代码如下,需要在里面设置 ReduceTask 的数量和自定义使用的分区器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.yanrs.mr.Partitioner;

import com.yanrs.mr.flowbean.FlowBean;
import com.yanrs.mr.flowbean.FlowBeanMapper;
import com.yanrs.mr.flowbean.FlowBeanReducer;
import com.yanrs.mr.wordcount.WCMapper;
import com.yanrs.mr.wordcount.WCReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
* 启动这个进程,那么就会运行该 job
*/
public class PartitionerFlowBeanDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 获取文件系统
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop10:9000");

FileSystem fileSystem = FileSystem.get(conf);

// 设置输入目录和输出目录
Path inputPath = new Path("/mrinput/flowbean");
Path outPath = new Path("/mroutput/partition");
// 输出目录存在就删除
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

// 创建 Job
Job job = Job.getInstance(conf);

// 设置 ReduceTask 的数量为 5
job.setNumReduceTasks(5);
// 设置使用自定义分区器
job.setPartitionerClass(MyPatitioner.class);

// 设置 job 名称
job.setJobName("PartitionerFlowBean");

// 设置job运行的 Mapper,Reducer
job.setMapperClass(PartitionerFlowBeanMapper.class);
job.setReducerClass(PartitionerFlowBeanReducer.class);

// 设置 Mapper,Reducer 的输出 key 和 value 类型。
// job 需要根据 Mapper,Reducer 输出的 key value 类型准备序列化器,通过序列化器对输出的 key value 进行序列化和反序列化
// 如果 Mapper,Reducer 输出的 key 和 value 类型一致,那么可以像下面一样直接设置 job 的最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PartitionerFlowBean.class);

// 设置输入输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);

// 运行 Job 并打印日志信息
job.waitForCompletion(true);
}
}

MyPatitioner 完整代码如下,接收到每个 key 之后,根据需求将不同的 key 划分到不同的 Partitioner 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.yanrs.mr.Partitioner;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
* KEY , VALUE 是 Mapper 输出的 key,value 的类型
*/
public class MyPatitioner extends Partitioner<Text, PartitionerFlowBean> {

// getPartition 方法就是计算分区,numPartitions 为总的分区数,也就是 ReduceTask 的数量
@Override
public int getPartition(Text text, PartitionerFlowBean partitionerFlowBean, int numPartitions) {
int partitionNum = 0;
// 获取手机号的前三位
String suffix = text.toString().substring(0, 3);
switch(suffix){
case "136":
partitionNum = 1; // 手机号如果是 136 开头,那么就分到 1 分区
break;
case "137":
partitionNum = 2; // 手机号如果是 137 开头,那么就分到 2 分区
break;
case "138":
partitionNum = 3; // 手机号如果是 138 开头,那么就分到 3 分区
break;
case "139":
partitionNum = 4; // 手机号如果是 139 开头,那么就分到 4 分区
break;
}
return partitionNum;
}
}

运行之后可以看到结果文件为 5 份,136 开头的号码在 part-r-00001 中,137 开头的号码在 part-r-00002 中,138 开头的号码在 part-r-00003 中,139 开头的号码在 part-r-00004 中,其余的在 part-r-00000 中。

完整代码

排序

自定义比较器的两种方法

1. 定义 Mappper 输出的key,让 key 实现 WritableComparable, 实现 CompareTo()

2. 自定义类时,继承 WriableComparator 或实现 RawCompartor,使用时设置 mapreduce.job.output.key.comparator.class=自定义的类

排序案例

Mapper 输出 key 为内置类型

对消耗的总流量进行升序排序。在之前 flowbean 案例的结果的基础上,对用户手机号所消耗的总流量升序排序。之前 flowbean 结果如下。

1
2
3
4
5
6
13470253144	FlowBean{upFlow=180, downFlow=180, sumFlow=360}
13509468723 FlowBean{upFlow=7335, downFlow=110349, sumFlow=117684}
13560439638 FlowBean{upFlow=918, downFlow=4938, sumFlow=5856}
13568436656 FlowBean{upFlow=3597, downFlow=25635, sumFlow=29232}
13590439668 FlowBean{upFlow=1116, downFlow=954, sumFlow=2070}
...

因为之前案例手机号是 key ,所以输出结果默认是按照手机号 key 进行排序的。即需要注意的一点是,排序只针对 key 进行 什么是 key,那么就对这个字段进行排序。

现在的需求的是根据总流量排序,所以要将总流量做为 key 。上述结果文件总流量可以通过 = 拆分,然后去除 } 获取到。Sort1Mapper 代码如下,这里需要注意的是 mapper 的输出,因为排序只针对 mapper 输出的 key 排序,所以这里 key 是总流量的大小,即类型为 LongWritable【内置类型】,value 为手机号。还需要注意如何拆分字符串能获取到总流量和手机号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.yanrs.mr.sort1;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Sort1Mapper extends Mapper<LongWritable, Text, LongWritable, Text>{
private LongWritable outKey = new LongWritable();
private Text outValue = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("=");
// 封装总流量为 key
outKey.set(Long.parseLong(words[3].replace("}", "")));
// 封装手机号为 value
outValue.set(words[0].split("\t")[0]);
context.write(outKey, outValue);
}
}

Sort1Reducer 中需要注意的是,将 mapper 的输出结果进行顺序对换,即 reducer 的输出 key 为手机号,输出 value 为排序好的总流量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.yanrs.mr.sort1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Sort1Reducer extends Reducer <LongWritable, Text, Text, LongWritable>{
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value: values) {
// 这里写出数据的时候,还是将手机号放在前面,排序好的总流量放在后面
context.write(value, key);
}
}
}

Sort1Driver 中需要注意设置 mapper 的输出类型和 reducer 的输出类型,因为现在两者不一致了,所以需要单独设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.yanrs.mr.sort1;

import com.yanrs.mr.flowbean.FlowBean;
import com.yanrs.mr.flowbean.FlowBeanMapper;
import com.yanrs.mr.flowbean.FlowBeanReducer;
import com.yanrs.mr.wordcount.WCMapper;
import com.yanrs.mr.wordcount.WCReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
* 启动这个进程,那么就会运行该 job
*/
public class Sort1Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 获取文件系统
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop10:9000");

FileSystem fileSystem = FileSystem.get(conf);

// 设置输入目录和输出目录
Path inputPath = new Path("/mroutput/flowbean/part-r-00000");
Path outPath = new Path("/mroutput/flowbean/sort1");
// 输出目录存在就删除
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

// 创建 Job
Job job = Job.getInstance(conf);

// 设置 job 名称
job.setJobName("sort1");

// 设置job运行的 Mapper,Reducer
job.setMapperClass(Sort1Mapper.class);
job.setReducerClass(Sort1Reducer.class);

// 设置 Mapper,Reducer 的输出 key 和 value 类型。
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

// 设置输入输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);

// 运行 Job 并打印日志信息
job.waitForCompletion(true);
}
}

运行输出结果

1
2
3
4
5
6
7
8
13966251146	240
13729199489 240
13768778790 240
13846544121 264
13470253144 360
13956435636 1644
13590439668 2070
15959002129 2118

示例代码

继承 WriableComparator

对消耗的总流量进行降序排序。上面例子 对消耗的总流量进行升序排序 的例子中,因为 mapper 的输出是总流量,类型为 LongWritable,而且 LongWritable 实现了 WritableComparable 接口,并且有 CompareTo 方法,而且 CompareTo 方法是按照升序排序的,所以我在上述例子中使用的就是 LongWritable 实现的比较器,得到的是升序排序的结果。

对消耗的总流量进行降序排序的例子中,我们需要自己实现一个比较器,实现比较器有两种方式,这里采用自定义类继承 WriableComparator,使用时在 Driver 中设置自定义的比较器即可。

Sort2Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.yanrs.mr.sort2;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Sort2Mapper extends Mapper<LongWritable, Text, LongWritable, Text>{
private LongWritable outKey = new LongWritable();
private Text outValue = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("=");
// 封装总流量为 key
outKey.set(Long.parseLong(words[3].replace("}", "")));
// 封装手机号为 value
outValue.set(words[0].split("\t")[0]);
context.write(outKey, outValue);
}
}

Sort2Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.yanrs.mr.sort2;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Sort2Mapper extends Mapper<LongWritable, Text, LongWritable, Text>{
private LongWritable outKey = new LongWritable();
private Text outValue = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("=");
// 封装总流量为 key
outKey.set(Long.parseLong(words[3].replace("}", "")));
// 封装手机号为 value
outValue.set(words[0].split("\t")[0]);
context.write(outKey, outValue);
}
}

Sort2Driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.yanrs.mr.sort2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URISyntaxException;

/**
* 启动这个进程,那么就会运行该 job
*/
public class Sort2Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 获取文件系统
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop10:9000");

FileSystem fileSystem = FileSystem.get(conf);

// 设置输入目录和输出目录
Path inputPath = new Path("/mroutput/flowbean/part-r-00000");
Path outPath = new Path("/mroutput/flowbean/sort2");
// 输出目录存在就删除
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

// 创建 Job
Job job = Job.getInstance(conf);

// 设置 job 名称
job.setJobName("sort1");

// 设置使用自定义的比较器
job.setSortComparatorClass(MyDescComparator.class);

// 设置job运行的 Mapper,Reducer
job.setMapperClass(Sort2Mapper.class);
job.setReducerClass(Sort2Reducer.class);

// 设置 Mapper,Reducer 的输出 key 和 value 类型。
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

// 设置输入输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);

// 运行 Job 并打印日志信息
job.waitForCompletion(true);
}
}

MyDescComparator

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.yanrs.mr.sort2;


import org.apache.hadoop.io.WritableComparator;

public class MyDescComparator extends WritableComparator {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
long thisValue = readLong(b1, s1);
long thatValue = readLong(b2, s2);
return thisValue < thatValue ? 1 : (thisValue == thatValue ? 0 : -1);
}
}

运行输出结果

1
2
3
4
5
6
13509468723	117684
13975057813 59301
13568436656 29232
13736230513 27162
15043685818 7197
.....

示例代码

Mapper 输出 key 为自定义类型

对消耗的总流量进行降序排序。自定义 key 的时候需要实现 WritableComparable 接口,而不是以前的 Writable 接口。实现 WritableComparable 后重写 compareTo 方法,在里面实现要比较的逻辑即可。

FlowBeanSort3Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.yanrs.mr.sort3;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class FlowBeanSort3Mapper extends Mapper<LongWritable, Text, FlowBeanSort3, Text>{
private Text outValue = new Text();
private FlowBeanSort3 outKey = new FlowBeanSort3();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// key 为序列号,value 为每行的内容
String[] words = value.toString().split("\t");
// 上行流量
outKey.setUpFlow(Long.parseLong(words[words.length - 3]));
// 下行流量
outKey.setDownFlow(Long.parseLong(words[words.length - 2]));
outKey.setSumFlow(Long.parseLong(words[words.length - 2]) + Long.parseLong(words[words.length - 3]));
// 封装手机号
outValue.set(words[1]);
context.write(outKey, outValue);
}
}

FlowBeanSort3Reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.yanrs.mr.sort3;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowBeanSort3Reducer extends Reducer <FlowBeanSort3, Text, Text, FlowBeanSort3>{
@Override
protected void reduce(FlowBeanSort3 key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value:values) {
context.write(value, key);
}
}
}

FlowBeanSort3Driver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.yanrs.mr.sort3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
/**
* 启动这个进程,那么就会运行该 job
*/
public class FlowBeanSort3Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取文件系统
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop10:9000");

FileSystem fileSystem = FileSystem.get(conf);

// 设置输入目录和输出目录
Path inputPath = new Path("/mrinput/flowbean");
Path outPath = new Path("/mroutput/flowbean/sort3");
// 输出目录存在就删除
if(fileSystem.exists(outPath)){
fileSystem.delete(outPath, true);
}

// 创建 Job
Job job = Job.getInstance(conf);

// 设置 job 名称
job.setJobName("sort3");

// 设置job运行的 Mapper,Reducer
job.setMapperClass(FlowBeanSort3Mapper.class);
job.setReducerClass(FlowBeanSort3Reducer.class);

// 设置 Mapper,Reducer 的输出 key 和 value 类型。
job.setMapOutputKeyClass(FlowBeanSort3.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBeanSort3.class);

// 设置输入输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outPath);

// 运行 Job 并打印日志信息
job.waitForCompletion(true);
}
}

FlowBeanSort3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.yanrs.mr.sort3;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class FlowBeanSort3 implements WritableComparable<FlowBeanSort3> {
private long upFlow;
private long downFlow;
private long sumFlow;

/**
* 序列化, 在写出属性时,如果属性为引用数据类型,那么属性不能为 null
* @param dataOutput
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}

/**
* 反序列化,反序列化和序列化的顺序要一致
* @param dataInput
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}

public long getUpFlow() {
return upFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public long getDownFlow() {
return downFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

public FlowBeanSort3() {
}

@Override
public String toString() {
return "PartitionerFlowBean{" +
"upFlow=" + upFlow +
", downFlow=" + downFlow +
", sumFlow=" + sumFlow +
'}';
}

@Override
public int compareTo(FlowBeanSort3 flowBeanSort3) {
return this.sumFlow -flowBeanSort3.getSumFlow() > 0?-1:1;
}
}

示例代码

实现 RawCompartor 接口

对消耗的总流量进行降序排序。实现 RawCompartor 接口后会重写两个 compare 方法,在一个方法中获取比较的对象,另一个方法中进行比较。

MyDescRawCompartor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.yanrs.mr.sort4;

import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;

import java.io.IOException;

public class MyDescRawCompartor implements RawComparator<FlowBeanSort4> {
private FlowBeanSort4 key1 = new FlowBeanSort4();
private FlowBeanSort4 key2 = new FlowBeanSort4();
private DataInputBuffer buffer = new DataInputBuffer();
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
this.buffer.reset(b1, s1, l1);
this.key1.readFields(this.buffer);
this.buffer.reset(b2, s2, l2);
this.key2.readFields(this.buffer);
this.buffer.reset((byte[])null, 0, 0);
} catch (IOException var8) {
throw new RuntimeException(var8);
}

return this.compare(this.key1, this.key2);
}

@Override
public int compare(FlowBeanSort4 o1, FlowBeanSort4 o2) {
return o1.getSumFlow() - o2.getSumFlow() > 0?-1:1;
}
}

示例代码

Combiner

Combiner 实际上本质是一个 Reducer 类,Conbiner 只有在设置了之后,才会运行。combiner 的作用是在shuffle 阶段对相同 key 的 key-value 进行提前合并,以便在传输中可以减少磁盘 IO 和网络 IO。

Combiner 和 Reducer 的区别

Reducer 是在 reduce 阶段调用,Combiner 是在 shuffle 阶段调用【既有可能在 MapTask 端,也可能在ReduceTask 端】。但本质都是 Reducer 类,作用都是对有相同 key 的 key-value 进行合并。

Combiner 使用条件

Combiner 用在+,- 操作的场景,不能用在 *,/ 操作的场景。使用 Combiner 必须保证不能影响处理逻辑和结果。

使用时在 Driver 中设置Combiner 类即可 job.setCombinerClass(Reducer 类.class);

调用时机

MapTask 端调用:

  1. 每次溢写前会调用 Combiner 对溢写的数据进行局部合并。
  2. 在merge时如果溢写的分区数 >=3,如果设置了 Combiner,Combiner 会再次对数据进行 Combine。

ReduceTask 端调用:

  1. shuffle 线程拷贝多个 MapTask 同一分区的数据,拷贝后执行 merge 和 sort, 如果数据量过大,需要将部分数据先合并排序后,溢写到磁盘。如果设置了Combiner,Combiner 会再次运行。

案例

在之前的 flowbean 案例上,只需要在 Driver 中添加配置 job.setCombinerClass(FlowBeanReducer.class); 即可。

未添加 combine 之前结果如下:

添加 combine 之后结果如下

分组

分组通过分组比较器,对进入reduce的key进行对比,key相同的分为一组,一次性进入Reducer,被调用reduce方法。

自定义分组比较器

用户可以自定义 key 的分组比较器,自定义的比较器必须是一个 RawComparator类型的类然后实现compareTo()方法。如果没有设置 key 的分组比较器,默认采取在 Map 阶段排序时,key 的比较器。

分组案例

样例数据如上所示,现在需要求出每一个订单中最贵的商品。思路:将订单数据分装为 bean 对象,然后将 bean 做为 mapper 的输出 key,并让 bean 实现 WritableComparable 接口,重写 compareTo 方法,compareTo 方法中先对 订单 id 进行排序,若订单 id 相同再对成交金额进行排序。这样数据就是按照订单中金额有序排序的了。

总结

分区

总的分区数取决于reduceTask的数量,一个Job要启动几个reduceTask,取决于期望产生几个分区,每个分区最后都会生成一个结果文件。

当 reduceTask>1,尝试获取用户设置的 Partionner,如果没有设置使用内置的 HashPartitoner。如果reduceTask<=1, 系统默认提供一个 Partionner,它会将所有记录都分到0号区。

排序

每次溢写前,使用快速排序最后merge时,使用归并排序

比较器

如果用户自定义了比较器,MR 就使用用户自定义的比较器(RawComparator 类型),如果用户没有自定义,那么Mapper 输出的 Key 需要实现 WriableComparable 接口系统会自动提供比较器。不管是自己提供比较器还是实现WriableComparable 接口,最后在比较时,都是在调用自己实现的CompareTo 方法。

执行流程

  1. Partitioner计算分区
  2. 满足溢写条件,对所有数据进行排序,排序时用比较器对比 key,每次溢写前的排序,默认使用的快排。如果设置了 Combiner,在溢写前,排好序的结果会先被 Combiner 进行 combine 再溢写。
  3. 2过程会发生 N 次
  4. 所有的溢写片段需要 merge 为一个总的文件,合并时,使用归并排序,对 key 进行排序。如果溢写片段数量超过 3,在溢写成一个最终的文件时,Combiner 再次调用,执行Combine,combine 后再溢写。