之前的写/追加写流程中, 已经讲过写失败会触发块级别 的错误恢复, 那么今天接着说一下从HDFS2.X版本开始, 引入的异常磁盘自动摘除和热更换的功能, 以及这个功能使用后可能存在的一些问题, 系统的来学习一下Datanode是如何操作磁盘 的
0x00. 主题 HDFS关于选DN的操作在Namenode端, 而对磁盘的操作主要是在Datanode端, 我们先来看看DN端.
大体有以下几个关键问题, 因为本文涉及的内容稍有点多, 会拆分为两篇, 先看一下后面的章节.
1.1 Datanode如何识别磁盘 HDFS而言, 它并不直接 和任何块设备(磁盘卷)打交道, 不管是RAM/SSD/HDD, 都是通过手工在配置文件的dfs.datanode.data.dir
选项配置的, 如下所示:
1 2 3 4 5 <property > <name > dfs.datanode.data.dir</name > <value > [SSD]/data01/block,[DISK]/data02/block,[DISK]/data03/block</value > </property >
备注: 其中不同磁盘类型属于异构相关范畴, 在NN篇 会详述. 可简单了解
1.2 Datanode如何选取磁盘 简言之 , DN有两种磁盘选取策略, 默认的是轮询(1->2->3), 改进的是优选空盘的带权策略. (后面详述)
2.1 DN如何发现坏盘 DN会对配置的磁盘卷(volume )进行基本的检查, 包括目录存在 且可以正常读写 . 并且会定期检查 (待确定 ), 如果发现前面说的几个条件有任一不满足, 则认为此盘已坏, 自动把它”摘掉”, 但是不能读写是不是就一定是盘坏了呢? 这里之后再细说. 整个集群的坏盘信息也可以从HDFS的UI界面查看, 如下所示:
2.2 坏盘对Datanode的影响 HDFS配置里有一个dfs.datanode.failed.volumes.tolerated
项, 意为当前DN能够容忍的最大坏盘数, 默认值 是0也就是说挂一个盘DN就会停止工作 .
2.3 更换坏盘 在HDFS2.X之前的版本中, 如果存在坏盘, 那么只能先让这台DN下线, 更换/修复磁盘之后, 再重新启动, 在2.6之后的版本支持”热替换 “功能, 可直接修改#1
中的配置目录, 去掉坏盘对应的映射, 然后reconfig
重新加载配置文件, 之后再换掉坏的磁盘, 把映射加回去刷新一次就行了. (方便许多)
0x01. Datanode加载磁盘 DataNode类中包括了对磁盘相关设置的调用, 不过我们得把来龙去脉先理一下, 看看这些方法是如何被调用的.
首先DN启动后, 它会进行一系列的初始化. 其中一个重要初始化是BlockPoolManager
对象, 它的功能就是管理对应的BPOfferService
对象
而BPOfferService
这个对象会管理DN的一个核心工作线程BPServiceActor
,它负责以下四个主要功能:
与Namenode注册前的握手通信
将当前DN汇报(注册)给Namenode (1和2是否应该合并 –> 将当前DN注册到NN上)
定时给Namenode发送心跳
处理Namenode发来的指令
其中步骤1, 2包含初始化和注册DN的过程, 是磁盘初始化相关的点, 大体流程如下所示:
sequenceDiagram
participant A as BPServiceActor
participant B as BPOfferService
participant C as DataNode
%%participant D as FSVolume
participant E as NameNode
C->>B: 0.初始化bpos对象
activate B
B->>A: 初始化Actor线程
deactivate B
activate A
C->>+E: connectToNN()
E-->>-A: 1.获得NN-Proxy
A->>+E: .
E-->>-A: 2.获取Namespace信息
B->>+C: 3.1设置Namespace信息
C->>C: initBlockPool()
C->>-C: 检查磁盘健康
Note right of C: 重点: 初始化块池
A->>+B: 5.register()
B->>E: registerDatanode()
B-->>-A: 注册成功
deactivate A
然后我们梳理了整体的流程之后, 再来看看具体的几个磁盘检查点, 由于检查方式 有同步和异步两个选择, 二者的实际内容完全一样, 放一起说
0x01. 磁盘检查 1. 启动DN时的检查 DN第一次初始化块池时, 以及之后每隔一段时间 (存疑? ), 都会使用异步的 磁盘检查方法checkDiskError()
, 步骤也挺清晰:
1 2 3 4 5 6 7 8 9 10 11 12 public void checkDiskError () { Set<FsVolumeSpi> unhealthyVolumes; try { unhealthyVolumes = volumeChecker.checkAllVolumes(data); lastDiskErrorCheck = Time.monotonicNow(); } catch (InterruptedException e) { throw new IOException("Interrupted while running disk check" , e); } if (unhealthyVolumes.size() > 0 ) handleVolumeFailures(unhealthyVolumes); }
这里说的就是几个基本的检查项是否能通过, 如果不通过则会把当前目录(磁盘)加入异常volumes处理列表, 并会将坏盘信息上报NN , 并检查坏盘数是否超过配置, 超过则停止当前DN服务, 磁盘检查核心内容如下图所示:
2. 检查内容
然后我们看一下磁盘检查实现的细节, 整个框架是个异步体系, 不关心的同学可以直接跳到最后的check()
调用, 这里还是完整看看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public Set<FsVolumeSpi> checkAllVolumes (final FsDatasetSpi<? extends FsVolumeSpi> dataset) { final long gap = timer.monotonicNow() - lastAllVolumesCheck; if (gap < minDiskCheckGapMs) { numSkippedChecks.incrementAndGet(); return Collections.emptySet(); } final FsDatasetSpi.FsVolumeReferences references = dataset.getFsVolumeReferences(); if (references.size() == 0 ) return Collections.emptySet(); lastAllVolumesCheck = timer.monotonicNow(); final Set<FsVolumeSpi> healthyVolumes = new HashSet<>(); final Set<FsVolumeSpi> failedVolumes = new HashSet<>(); final Set<FsVolumeSpi> allVolumes = new HashSet<>(); final AtomicLong numVolumes = new AtomicLong(references.size()); final CountDownLatch latch = new CountDownLatch(1 ); for (int i = 0 ; i < references.size(); ++i) { final FsVolumeReference reference = references.getReference(i); Optional<ListenableFuture<VolumeCheckResult>> olf; olf = delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); if (olf.isPresent()) { allVolumes.add(reference.getVolume()); Futures.addCallback(olf.get(), new ResultHandler(reference, healthyVolumes, failedVolumes, numVolumes, (ignored1, ignored2) -> latch.countDown())); } else { IOUtils.cleanup(null , reference); if (numVolumes.decrementAndGet() == 0 ) latch.countDown(); } } if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) { LOG.warn("checkAllVolumes timed out after xx ms" ); } numSyncDatasetChecks.incrementAndGet(); synchronized (this ) { return new HashSet<>(Sets.difference(allVolumes, healthyVolumes)); } } @Override public VolumeCheckResult check (CheckContext context) throws IOException { if (storageType != StorageType.PROVIDED) { DiskChecker.checkDir(context.localFileSystem, new Path(baseURI), context.expectedPermission); } return VolumeCheckResult.HEALTHY; }
上面说的磁盘检查, 基本只有DN自触发, 更多的情况, 发现磁盘问题, 其实是通过读写副本块 时手动被触发的, 那么也就是接下来说的异步的磁盘检查. 先来看看有哪些情况会直接触发:
FileIoProvider.onFailure(volume, xx)
方法: DN读写磁盘的工作类 , 当读写操作失败(抛各种异常)时, 会触发此方法进行磁盘检查.
FsDataSetImpl.validateBlockFile(bpid, blockId)
方法: 找副本块对应的文件, 如果副本块存在, 但文件不存在, 则认为磁盘有故障, 进行检查
简单说, 也就当DN端的读写磁盘发生异常时, 会触发异步的磁盘检查, 检查内容和同步检查一样, 处理方式也一样, 不再重复.
接下来我们来关注, 如果一块磁盘是健康的, 那么写数据时, DN会如何去选择.
0x02. 选盘策略 HDFS3中自带有两种 检查策略:
RoundRobinVolumeChoosingPolicy
(默认): 轮询策略
AvailableSpaceVolumeChoosingPolicy
(推荐): 空盘优先策略
参考下官方blog的图, 它们总体的对比如下: (蓝色是磁盘已有数据, 黄色是一个block)
大概理解意思之后, 我们再来逐一看看他们具体的实现细节, 尤其是第二个优选空盘策略
1. 轮询选择策略 这是HDFS一直以来默认的磁盘卷选择方式, 它的选法很简单, 当前盘是否还能满足写入要求, 满足就挨个盘去写: (假设仅一种存储类型, 异构 单独说)
从第一个磁盘卷开始, 获取它的实际可用空间 (怎么算的, 后续单说)
当前磁盘卷可用空间 > 待写入的副本块 大小 (默认也就 >128M
)
满足, 则选取当前磁盘卷, 结束
不满足, 记录当前磁盘可用值, 然后遍历下一个磁盘卷
遍历到任一磁盘卷满足, 则结束
如果遍历完所有都不满足, 则抛出异常
之后的操作从第二个磁盘卷开始遍历, 达到轮询写入的效果 (一个10个块的文件, 两块盘, 那么每块盘就正好写5个块)
来看看代码的细节, 顺便想一下这样的选盘策略弊端是什么:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private V chooseVolume (int curVolumeIndex, List<V> volumes, long blockSize) throws IOException { int curVolume = curVolumes[curVolumeIndex] < volumes.size() ? curVolumes[curVolumeIndex] : 0 ; int startVolume = curVolume; long maxAvailable = 0 ; while (true ) { final V volume = volumes.get(curVolume); curVolume = (curVolume + 1 ) % volumes.size(); long availableVolumeSize = volume.getAvailable(); if (availableVolumeSize > blockSize) { curVolumes[curVolumeIndex] = curVolume; return volume; } if (availableVolumeSize > maxAvailable) { maxAvailable = availableVolumeSize; } if (curVolume == startVolume) { String err = "The volume with the most available space < the block size xx" throw new DiskOutOfSpaceException(err); } } }
2. 空盘优选策略 轮询策略的弊端很好设想, 它可能会造成短时间严重的磁盘使用比不均匀, 那么如果选择AvailableSpace
策略呢? 看看下面的流程图:
默认都充裕的定义是每个盘都多10G以上
空盘权重占比75% (3倍于将满盘)
graph TD
a(选择考量剩余空间策略) --> b --不是--> c(划分为充裕和紧张两类) -->d(根据充裕盘优选的原则) --> e(根据实际比例算出权重) --> f(使用轮询策略从中选盘)
b -.是.-> f
subgraph Part A
b
c
end
subgraph Part B
d
e
end
然后再看看具体的代码细节, 以及具体的计算方式, 还是比较好理解的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private V doChooseVolume (final List<V> volumes, long replicaSize, String storageId) throws IOException { var volumesWithSpaces = new AvailableSpaceVolumeList(volumes); if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) { V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize, storageId); return volume; } else { V volume; long mostAvailableAmongLowVolumes = volumesWithSpaces .getMostAvailableSpaceAmongVolumesWithLowAvailableSpace(); List<V> highAvailableVolumes = extractVolumesFromPairs( volumesWithSpaces.getVolumesWithHighAvailableSpace()); List<V> lowAvailableVolumes = extractVolumesFromPairs( volumesWithSpaces.getVolumesWithLowAvailableSpace()); float preferencePercentScaler = (highAvailableVolumes.size() * balancedPreferencePercent) + (lowAvailableVolumes.size() * (1 - balancedPreferencePercent)); float scaledPreferencePercent = (highAvailableVolumes.size() * balancedPreferencePercent) / preferencePercentScaler; if (mostAvailableAmongLowVolumes < replicaSize || random.nextFloat() < scaledPreferencePercent) { volume = roundRobinPolicyHighAvailable.chooseVolume( highAvailableVolumes, replicaSize, storageId); } else { volume = roundRobinPolicyLowAvailable.chooseVolume( lowAvailableVolumes, replicaSize, storageId); } return volume; } }
所以可以看出, 空盘优先的策略考虑更为全面, 如果磁盘都比较空闲, 它会自动退化为轮询, 如果出现倾斜(比如高负载集群我加了几块新盘), 它会优先写入空闲的新盘, 从而使磁盘使用更加均匀, 线上推荐使用此策略. 也可以减少盘满的概率, 并可与NN的优选空DN策略搭配.
0x03. 坏盘处理 1. 处理异常磁盘 紧接着上面的逻辑, 如果检查volumes
后发现异常, 那么进入的处理逻辑的大流程如下3个 :
FsDatasetImpl
先处理坏盘 (预处理?)
从DN维护的所有关系中移除异常磁盘卷
汇报坏盘信息给NN, 并判断是否需要停止此DN
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void handleVolumeFailures (Set<FsVolumeSpi> unhealthyVolumes) { if (unhealthyVolumes.isEmpty()) return ; data.handleVolumeFailures(unhealthyVolumes); Set<StorageLocation> unhealthyLocations = new HashSet(); StringBuilder sb = new StringBuilder("DataNode failed volumes:" ); for (FsVolumeSpi vol : unhealthyVolumes) { unhealthyLocations.add(vol.getStorageLocation()); sb.append(vol.getStorageLocation()).append(";" ); } try { removeVolumes(unhealthyLocations, false ); } catch (IOException e) { LOG.warn("Error occurred when removing unhealthy storage dirs" ); } handleDiskError(sb.toString()); }
先来看第一步在FSVolumeList
中:
更新Map<StorageLocation, VolumeFailureInfo>
对象, 记录坏盘的基本信息(位置, 时间, 预估丢失的数据大小)
调用removeVolume
从volumeList中移除此目录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void handleVolumeFailures (Set<FsVolumeSpi> failedVolumes) { try (AutoCloseableLock lock = checkDirsLock.acquire()) { for (FsVolumeSpi vol : failedVolumes) { FsVolumeImpl fsv = (FsVolumeImpl) vol; addVolumeFailureInfo(fsv); removeVolume(fsv); } waitVolumeRemoved(5000 , checkDirsLockCondition); } } private void removeVolume (FsVolumeImpl target) { if (volumes.remove(target)) { if (blockScanner != null ) blockScanner.removeVolumeScanner(target); try { target.setClosed(); } catch (IOException e) { LOG.warn("Error occurs when waiting volume xx to close: " ); } target.shutdown(); volumesBeingRemoved.add(target); } }
通过第一步的移除操作, 已经把volumeList和一些准备关闭工作完成了, 下面的操作就是进一步移除其他所有关联
从FsDataset
中移除volumes和副本块信息
从DataStorage
中移除volumes信息
在内存中把dfs.datanode.data.dir
配置的磁盘卷更新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private synchronized void removeVolumes (params..) throws IOException { if (storageLocations.isEmpty()) return ; IOException ioe = null ; data.removeVolumes(storageLocations, clearFailure); try { storage.removeVolumes(storageLocations); } catch (IOException e) { ioe = e; } for (Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext(); ) { StorageLocation loc = it.next(); if (storageLocations.contains(loc)) it.remove(); } getConf().set("dfs.datanode.data.dir" , Joiner.on("," ).join(dataDirs)); if (ioe != null ) throw ioe; }
那么分两个方法来单独看看这里的两个移除具体做了什么, 先看看FsDataSetImpl
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 @Override public void removeVolumes (storageLocsToRemove, boolean clearFailure) { var storageLocationsToRemove = new ArrayList<>(storageLocsToRemove); Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>(); List<String> storageToRemove = new ArrayList<>(); try (AutoCloseableLock lock = datasetLock.acquire()) { for (int idx = 0 ; idx < dataStorage.getNumStorageDirs(); idx++) { Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); final StorageLocation sdLocation = sd.getStorageLocation(); if (storageLocationsToRemove.contains(sdLocation)) { asyncDiskService.removeVolume(sd.getStorageUuid()); volumes.removeVolume(sdLocation, clearFailure); volumes.waitVolumeRemoved(5000 , datasetLockCondition); for (String bpid : volumeMap.getBlockPoolList()) { List<ReplicaInfo> blocks = new ArrayList<>(); for (var it = volumeMap.replicas(bpid).iterator(); it.hasNext();) { ReplicaInfo block = it.next(); final var blockStorageLocation = block.getVolume().getStorageLocation(); if (blockStorageLocation.equals(sdLocation)) { blocks.add(block); it.remove(); } } blkToInvalidate.put(bpid, blocks); } storageToRemove.add(sd.getStorageUuid()); storageLocationsToRemove.remove(sdLocation); } } if (clearFailure) { for (StorageLocation storageLocToRemove : storageLocationsToRemove) { volumes.removeVolumeFailureInfo(storageLocToRemove); } } setupAsyncLazyPersistThreads(); } for (Map.Entry<String, List<ReplicaInfo>> entry : blkToInvalidate.entrySet()) { String bpid = entry.getKey(); List<ReplicaInfo> blocks = entry.getValue(); for (ReplicaInfo block : blocks) { invalidate(bpid, block); } } try (AutoCloseableLock lock = datasetLock.acquire()) { for (String storageUuid : storageToRemove) { storageMap.remove(storageUuid); } }}
再来看看DataStorage
的移除磁盘卷具体做了什么:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 synchronized void removeVolumes (storageLocations) throws IOException { if (storageLocations.isEmpty()) return ; StringBuilder errorMsgBuilder = new StringBuilder(); for (var it = getStorageDirs().iterator(); it.hasNext();) { StorageDirectory sd = it.next(); StorageLocation sdLocation = sd.getStorageLocation(); if (storageLocations.contains(sdLocation)) { for (Map.Entry<String, BlockPoolSliceStorage> entry : bpStorageMap.entrySet()) { String bpid = entry.getKey(); BlockPoolSliceStorage bpsStorage = entry.getValue(); File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, sd.getCurrentDir()); bpsStorage.remove(bpRoot.getAbsoluteFile()); } getStorageDirs().remove(sd); try { sd.unlock(); } catch (IOException e) { errorMsgBuilder.append("Failed to remove %s" , sd.getRoot()); } } } if (errorMsgBuilder.length() > 0 ) throw new IOException(errorMsgBuilder.toString()); }
至此, 整个DN启动后第一次, 和之后定时的同步 磁盘卷检查流程就全部说完了, 然后在下一章再单独说说另外一种常见触发异步 磁盘检查的可能.
3. 热刷新磁盘目录配置 主要的实现在DataNode类的reconfigurePropertyImpl
方法中, 简单说就是不重启DN的前提下可以热加载磁盘配置项, 从而达到平滑更换磁盘的效果.
refreshVolumes
(热刷新)
removeVolumes
(摘盘, 并从map中剔除之前记录异常的磁盘卷信息, 是为了NN更新么?)
1 2 bin/hdfs dfsadmin -reconfig datanode dnIP:RPC-port start
这里待补充, 整体来说如果搞明白了前面的, 这里思路就比较清晰了 (注意这里可能有坑, 所以线上勿随意使用, 以后会补充这里的阅读测试)
0x04. 后续 1. 对磁盘的IO操作异常是否应普遍触发检查? 之前觉得问题主要在异步磁盘检查这里, 觉得是很多IO相关操作抛出的任何异常, 都会触发磁盘检查, 一旦检查读写权限不足, 就会认为这个盘坏了进入摘盘逻辑.
那么这里根本问题是不是说细化一下IO操作抛出的异常处理方式, 让某些异常不去做磁盘检查呢? 感觉这个方式也不能解决根本问题, 只能一定程度缓解, 所以目前的结论是, 磁盘IO异常, 应触发检查 .
2. 磁盘卷容量已块满, 是否可以继续写入, 写入会导致失败被摘盘么? 满盘不会继续写入, 会换盘, 但是满盘不应被摘掉, 也应该正常可读, 详见下一篇的测试
小结:
这篇主要是分整体和细节介绍了HDFS_Datanode处理磁盘的逻辑, 下一篇 来说一下这里面藏有的一个小坑, 以及关于触发摘盘的一系列测试, 一些遗留问题也放在下一篇解答.
参考资料:
Datanode-disk-balancer(Cloudera-blog)
HDFS3.1.2官方源码