Zookeeper 集群搭建
ZK 的写流程,客户端可以连接任意的 zkserver 实例,向 server 发送写请求命令,如果当前连接的 server 不是Leader,server 会将写命令发送给 Leader,Leader 将写操作命令广播到集群的其他节点,所有节点都执行写操作命令,一旦集群中半数以上的节点写数据成功,Leader 会响应当前 Server,让当前 Server 响应客户端,写操作完成。
集群搭建流程可参考之前 文章), 启动后效果如下:
Hadoop HA
之前搭建 hadoop 集群的时候,NN 和 RM 都只有一个节点,那么实现 hadoop 的 HA,必须保证在 NN 和 RM 故障时,采取容错机制,可以让集群继续使用。
HDFS HA
元数据同步过程
NN 的高可用中元数据的同步过程为,在 active【使用 active 状态来标记主节点,使用 standby 状态标记备用节点】的 NN 格式化后,将空白的 fsimage 文件拷贝到所有的 NN 的机器上,active 的 NN 在启动后,将 edits 文件中的内容发送给 Journalnode 进程,standby 状态的 NN 主动从 Journalnode 进程拷贝数据,保证元数据的同步。Journalnode 在设计时,采用 paxos 协议, Journalnode 适合在奇数台机器上启动,在 hadoop 中,要求至少需要3个 Journalnode 进程,如果开启了 hdfs 的 ha, 就不能再启动 2NN。在同一时刻,最多只能有一个 NN 作为主节点,对外提供服务,其余的 NN,都作为备用节点,不对外提供服务。
搭建过程
1. 修改 core-site.xml 中的 fs.defaultFS 地址
1 | <property> |
mycluster 是自定义的集群名称
2. 修改 hdfs-site.xml 文件,配置 N 个 NN 运行的主机和端口。配置 JournalNode
1 | <!-- 完全分布式集群名称 --> |
上面配置中配置了两个NN 节点,分别在 hadoop10 和 hadoop11 机器上,分别配置了两个节点的 RPC 地址和 HTTP 地址,配置了 Journal 服务所在的位置等。
启动过程
1. 在所有机器上启动 JournalNode
1 | sh xcall hadoop-daemons.sh start journalnode |
查看状态
1 | sh xcall jps |
2. 格式化 NN,将格式化后的 fsimage 文件同步到其他 NN 节点,启动所有 NN,将其中一个 NN 的状态转换为 active 状态
上面配置了两个 NN,格式化 hadoop10 上的 NN 并格式化
1 | hadoop namenode -format # 格式化 hadoop 10 上的 NN |
在 hadoop11 上同步 hadoop 10 上的数据,包括 fsimage 文件等
1 | hdfs namenode -bootstrapStandby # 在 hadoop11 上同步 hadoop 10 上的数据 |
启动 nn1 和 nn2 的 datanode
1 | sh xcall hadoop-daemons.sh start datanode |
访问 http://hadoop11:50070/ 和 http://hadoop10:50070/ 两个 nn 的 namenode web 地址,两者都显示为 standby
手动将某个 nn 节点修改为 active 状态
1 | hdfs haadmin -transitionToActive nn1 # 手动将 nn1 节点状态改为 active |
再次访问 http://hadoop10:50070/ 就能看到状态从 standby 变成了 active。
3. 文件上传测试
上传一个文件到 hdfs 中,因为只有 nn1 是 active 的,所以只有 nn1 提供服务。在 web 页面中只有 nn1 可以看到上传的文件信息。如果手动将 nn1 状态修改为 standby,将 nn2 状态修改为 active,那么就只有 nn2 可以看到上传的文件信息而 nn1 则不可以。
1 | hdfs haadmin -transitionToStandby nn1 # 将 nn1 切换为 Standby |
手动故障转移
在上面的过程中 nn2 已经成为了 active 的状态,现在手动杀死 nn2 的 namenode 进程。因为 nn2 是 actice,且已经被杀死了,所以现在是无法正常提供服务的。
1 | jps # 先获取 namenode 进程号 |
强制将 nn1 的状态修改为 Active
1 | hdfs haadmin -transitionToActive --forceactive nn1 # 强制将 nn1 修改为 Active |
修改成功后 hdfs 就能正常的提供服务了
自动故障转移
自动故障转移为 HDFS 部署增加了两个新组件:ZooKeeper 和 ZKFailoverController(ZKFC)进程。ZKFC 使用一个健康检查命令定期地 ping 与之在相同主机的 NameNode,只要该 NameNode 及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的。并且其他 NameNode 的 ZKFC 进程在 ZooKeeper 抢夺分布式锁,抢到的则成为 Active 状态。
为了防止脑裂情况的发生,hadoop HDFS 提供了两种解决方法,一种是配置 ssh 发送 kill 命令,即其他机器会通过 ssh 的方式来给你发送 kill 命令,防止你是假死。另外一种是自己配置一个脚本,当自己的 ZKFC 进程检测到自己处于不健康的状态时,那么就调用最的脚本将自己杀死。
自动故障转移配置
在 hdfs-site.xml 中配置自动故障转移
1 | <property> |
在 core-site.xml 中配置 zk 的集群地址信息
1 | <property> |
分发修改的两个文件,然后启动 zk 服务
1 | sh xcall /opt/module/apache-zookeeper-3.6.1-bin/bin/zkServer.sh start # 启动 |
初始化 HA 在 Zookeeper 中状态(其实就是在 zk 中新增一个 znode 信息,下面存放 hadoop ha 的信息)
1 | hdfs zkfc -formatZK |
启动 hdfs 服务
1 | start-dfs.sh |
查看 http://hadoop11:50070/ 和 http://hadoop10:50070/ 发现 hadoop11 成为了active
模拟故障,将 hadoop11 namenode 进程杀死,然后发现 hadoop10 自动成为了 active 状态。
YARN HA
在 yarn-site.xml 中增加下面配置
1 | <!--启用resourcemanager ha--> |
上面配置中声明了两台 resourcemanager 的地址分别是 hadoop10 和 hadoop11。还配置了 zookeeper 集群的地址,配置了自动恢复等。
在 hadoop10 和 hadoop11 上启动 rm
1 | yarn-daemon.sh start resourcemanager # hadoop10 上执行 |
访问 http://hadoop10:8088 和 http://hadoop11:8088 会发现 http://hadoop11:8088 会跳转到 http://hadoop10:8088。所以 hadoop10 上的 Yarn 就变成了 active 对外服务。
将 hadoop10 上的 Yarn 进程杀死会发现只有访问 http://hadoop11:8088 才能成功,因为 http://hadoop11:8088 成为了 active。若再次将 hadoop10 上的 yarn 重新启动后,访问 hadoop10 会跳转到 hadoop11.
压缩
目的和原则
压缩的目的:压缩的目的是在 MR 运行期间,提高 MR 运行的效率,压缩可以减少 MR 运行期间的磁盘IO 和网络IO。
压缩的原则:IO 密集型,多用压缩。计算密集型,CPU 负载过重,少用压缩。
Hadoop 默认支持的压缩格式有 deflate, bzip2, gzip。需要额外安装的有 lzo, snappy。特点是 bzip2 压缩比最高,压缩速度最慢。snappy 压缩速度最快,压缩比凑合。deflate,gzip 折中。
常用配置
压缩常用配置项如下:
1 | io.compression.codecs: 代表整个Job运行期间,可以使用哪些压缩格式,配置这个参数后,配置的压缩格式会被自动初始化,默认值 deflate,gzip,bzip2 |
压缩场景
什么时候需要考虑压缩:
- Mapper 的输入: 主要考虑每个文件的大小,如果文件过大,需要使用可以切片的压缩格式。
- Reducer 的输出: reducer 的输出主要考虑,输出之后,是否需要下一个 Job 继续处理,如果需要被下个 Job 继续处理,且单个文件过大,也要使用可以切片的压缩格式。
- shuffle阶段:能加速即可
调度器
FIFO调度器
FIFO 调度器的特点就是单队列,所有的 Job 按照客户端提交的先后顺序,先到先服务。弊端是如果当前队列中有一个大的 Job,非常消耗资源,那么这个 Job 之后的其他 Job 都需要付额外的等待时间。造成集群的资源利用率不足。
容量调度器
容量调度器的本质是多个 FIFO 的队列组成,Hadoop 默认使用就是容量调度器。
特点是每个队列可以配置一定的容量,空闲的资源可以匀给其他队列临时使用。可以配置每个job使用的容量的限制,防止一个大的 job 独占所有资源。可以配置每个用户可以使用的容量限制,防止当个用户占用所有资源。
公平调度器
公平调度器的设置和容量调度器大致相同,也是多条队列,每天队列都可以设置一定的容量,每个 Job,用户可以设置容量。区别在于公平调度器在调度策略上,采用最大最小公平算法,来调度 Job,这个算法会保证同一个队列中,所有已经提交,未运行结束的 Job,获取到队列中的资源是平等的。
Hadoop的优化
小文件的优化
源头上处理,在上传到集群之前,提前处理小文件
小文件已经在 HDFS 存在,可以使用 hadoop archieve 进行归档
在运行 MR 时,可以使用 CombineTextInputFormat 将多个小文件规划到一个切片中
小文件过多,可以开启 JVM 重用
MR 的优化
合理设置 MapTask 和 ReduceTask 的数量
避免数据倾斜。如果 Map 端的数据发生倾斜,那么在切片时,注意每片数据尽量均匀,防止有些不可切片的数据。Reduce 端的数据倾斜,提前对数据进行抽样调查,统计出大致的分布范围,根据分布范围,合理编写Partitioner,让每个分区的数据尽量均衡。
优化磁盘 IO 和网络 IO。可以启用 combiner。启动压缩。调大 MapTask 缓冲区的大小,减少溢写次数。调大MapTask 中 merge 阶段一次合并的片段数,减少合并花费的时间。调大 reduceTask 中 shuffle 线程可以使用的内存,减少溢写次数。调大 reduceTask 中,input.buffer 的大小,提前缓存部分数据到 buffer 中。