MasterofProject

Solution of a large number of remaining replication blocks after Hadoop abort off line operation

label SolutiondataData backupDatanodeDecommision
2081 people read comment(0) Collection report
Classification:

preface

If you are a Hadoop cluster of daily maintenance, then you must have experienced many nodes offline work. For example, with the rapid expansion of business scale, when cluster resources gradually enough to use, the normal practice is by increasing the machine to achieve linear expansion effect. Of course, when these machines in the process of using, appeared when their problem caused by the aging of the machine, such as disk is broken, and some machines such as network occasionally not on the connection, this time, to remove these machines in the cluster, the interests of small can not avoid momentary, these machines will be left in the cluster. Often the unusual machine like this will affect the running efficiency of the whole cluster, because the task will run on different machines connected, you did not finish the task, I must wait today. This article is to focus on the The theme is related to the operation of the machine's off line.


What is the meaning of the node offline operation

Here to explain a slightly more professional term: node offline. The corresponding word is Decommision, it is said the 1 node of the cluster is removed, and does not affect the cluster. So it is obvious that, offline operation will inevitably lead to a computing cluster lost about 1 node resources but, offline or the 2 most important words in the Hadoop data. How to ensure the offline data in the node can be completely transferred to other machines, this is the most critical. So in the assembly process of DataNode, the main work is the block block to re replication, data backup to HDFS the default of 3 copies, when these data are copied after the node state is datanode decommissioned, this time can stop datanode, will completely remove machine shutdown.


A large block of copy residue after termination of the assembly line

A section is actually a foreshadowing, offline operation is the emphasis on. When you are ordinary machine off the assembly line in, of course, the process will be very smooth. But when you executed nodes offline operation, under two kinds of situations when:

1 you found that 1 is the wrong operation, the nodeA node to join the dfs.exclude file

You have 2 instructions, this node temporarily offline, back to normal service state

After the occurrence of the above two kinds of circumstances, your first reaction is node from the exclude files removed, and re dfsadmin -refreshNodes, of course you will see very happy and node state does to in service of the state. But if you more careful, you will found that there was no reduction in the home page of the namenode underReplicatedBlocks the number of blocks is still suspended off the assembly line operation of numerical. These to be duplicated basic block is stored in the original offline node those blocks. Is as shown below:


Copy back to already exists, HDFS namenode is obviously when offline node after recovery, the large number of replication block request does not need, and will continue to occupy the time to deal with these pending replication block, and finally will frequently found enough copies of the block, when there are a large number of pending replication blocks, the namenode is simply a disaster, even will directly affect the namenode normal request processing. OK, from here can be seen, this is definitely not a small problem in space will be given a set of complete solutions in this before, in order to help you expand your horizons, on the introduction of a similar amounts of residual block of the scene.


Similar scene:Node Dead"resurrection"

The emergence of a large number of replication blocks of another scene is Dead Node. when a DataNode heart long time does not report, more than the heartbeat timeout after the task is Dead Node. will be the emergence of Dead Node, in order to achieve the balance block copy, copy will also be big block, and Decommision operation is offline similar. But there are 1 main different point, after the restart when Dead Node, the residual copy block will be reduced to Dead Node before the normal value. (do not believe that students can perform this operation to verify).2 scenarios, a similar phenomenon, different results.Dead Node recovery situation what we want to see the results. So why is Dead the recovery of Node will make the reduced replication blocks, and the line will not resume operation, the only way to solve this problem is to find from the source, Guess is never solve the problem, once the answer is found, it will help us to solve the problem of a large number of duplicate blocks in the offline operation.


Node DeadResurrection - block elimination"

Role in this article always around the block copy ", then in HDFS code, the variable exactly is controlled by the class object, find the variables, the method is very critical. Answers in the FSNamesystem and code are as follows:

@Override / / FSNamesystemMBean
@Metric
Long getUnderReplicatedBlocks public () {
BlockManager.getUnderReplicatedBlocksCount return ();
}
Go further and go into BlockManager:

Used by metrics * / / * *
Long getUnderReplicatedBlocksCount public () {
UnderReplicatedBlocksCount return;
}
This variable is assigned by whom

UpdateState void () {
PendingReplicationBlocksCount = pendingReplications.size ();
UnderReplicatedBlocksCount = neededReplications.size ();
CorruptReplicaBlocksCount = corruptReplicas.size ();
}
This is the middle line, the variable name can indicate his meaning, neededReplications, need to copy, copy of the OK, we can infer that in the datanode after the restart, will call the neededReplication remove block similar, so that the variable size reduction. Then we again Lenovo, after the datanode restart the first node registration after the action, will send the heartbeat, when sending a heartbeat, will be reported to the block block, when the block reported, is obviously a great opportunity, of course, this is only the current conjecture, we analyze the code to verify the initial guess we go directly to. BpServiceActor#offerService: cycle method related to the heart

/ * *
* loop for each BP Main thread. Run until shutdown,
* calling remote NameNode functions. forever
* /
Void offerService private () Exception throws {
LOG.info ("namenode For" + nnAddr + "using""
+ "of DELETEREPORT_INTERVAL + dnConf.deleteReportInterval +" msec"
+ "of BLOCKREPORT_INTERVAL + dnConf.blockReportInterval +" msec"
+ "of CACHEREPORT_INTERVAL + dnConf.cacheReportInterval +" msec"
+ "delay: Initial + dnConf.initialBlockReportDelay +" msec"
+ "; heartBeatInterval=" + dnConf.heartBeatInterval);

/ /
Now loop for a long / / time....
/ /
While (shouldRun ()) {
{try
Long startTime final = scheduler.monotonicNow ();

/ /
So often / / Every, send heartbeat or block-report
/ /
Boolean sendHeartbeat final = scheduler.isHeartbeatDue (startTime);
If (sendHeartbeat) {
...
}
If (sendImmediateIBR ||
(lastDeletedReport - dnConf.deleteReportInterval > startTime) {
ReportReceivedDeletedBlocks ();
LastDeletedReport = startTime;
}

CMDS List<DatanodeCommand> = blockReport ();
ProcessCommand (CMDS = = null? Null: cmds.toArray (New)] (DatanodeCommand[cmds.size));
...
In the operation, you can see that there will be blocksReport of the operation, and get the namenode back to the datanode feedback command, enter the blcokReport method:

/ * *
* the list blocks to the Namenode Report
* DatanodeCommands returned by the @return NN. May be null.
* IOException @throws
* /
BlockReport throws () IOException List<DatanodeCommand> {
...

The reports to the NN. / / Send
NumReportsSent int = 0;
NumRPCs int = 0;
布尔成功= false;
brsendstarttime = monotonicnow(长);
reportid = generateuniqueblockreportid(长);
try {
{ if(totalblockcount<dnconf.blockreportsplitthreshold)
//发送下面的分割门限,在所有报告的单个消息。
datanodecommand(Cmd bpnamenode.blockreport
bpregistration(,),bpos.getblockpoolid报告
新blockreportcontext(1,0),reportid);
……
在这里,就可以看到了datanode将块真正汇报给了namenode.对应到namenode的rpcserver端:

“/ datanodeprotocol覆盖
公共datanodecommand(datanoderegistration nodereg,blockreport
字符串poolid,storageblockreport [ ]报告
blockreportcontext { context)抛出IOException
checknnstartup();
verifyrequest(nodereg);
……
//
blockmanager.processreport accumulates /现有的呼叫信息
/ /同一个节点和存储,这是由负载返回的值
//调用这个环是nostalestorage最终更新价值。
//
nostalestorages = bm.processreport(nodereg,报告〔R〕.getstorage(),
块,上下文,(R = = 1);reports.length)
metrics.incrstorageblockreportops();
}
……
在这里还有几层方法,进入processblock addstoredblock在这里直接给出最后会调用remove动作的方法如下:,,,

*
*修改(图块-->数据节点)。删除从组块
*需要replications如果需要护理的问题。
* @返回一块这是存储在blockmap。
*/
私人addstoredblock(最终blockinfocontiguous块,块
datanodestorageinfo storageinfo,
datanodedescriptor delnodehint,
布尔logeveryblock)
{抛出IOException
……
/ / / overreplication underreplication手柄
短filereplication = bc.getblockreplication();
if(!isneededreplication(storedblock,filereplication,numcurrentreplica)){
neededreplications.remove(storedblock,numcurrentreplica,
num.decommissionedreplicas(),filereplication);
} {
……
在这里,会重新进行副本块的判断,如果不需要副本了,则会从neededreplications对象中进行删除,因此才会出现待复制块减少的现象,其实还是多亏了datnode的重新注册的动作,把自身的所有块全部重新上报给了namenode,而下线节点从下线状态变为正常服务状态节点是不会进行重新注册的动作而原始的块没有被修改过是不会上报的才会有了以上2种截然不同的结果,,,。

下线操作如何运作

这个部分可以作为本文的一个”分水岭”,上半部是渐渐通过现象找出问题的根源,而下半部则是学习原理解决问题.所以要解决decommision下线操作中大量块的问题,就要首先明白他的运作逻辑.我们都知道,下线相关动作都是通过- refreshnodes的命令触发的,对应到下面的方法:

*
* 1。添加到主机→NO进一步的工作需要在这里。
* 2。删除从主机adminstate AS decommissioned >标记。
* 3。添加到排除——>启动它。
* 4。从站模块——>删除它。
*/
私人refreshdatanodes(){ void
for(datanodedescriptor节点:datanodemap.values()){
//检查如果需要include。
if(!hostfilemanager.isincluded(节点)){
node.setdisallowed(true);//案例2。
} {
如果(hostfilemanager.isexcluded(节点)){
decommanager.startdecommission(节点);//案例3。
} {
decommanager.stopdecommission(节点);//案例4。
}
}
}
}
在这里我们关注的层面分为2个,其一是开始下线操作,另外一个则是中止下线操作。

发射质量

在开始下线操作后,待复制块是如何被加入到needreplications这个对象里去的。

*
*启动指定的良好质量。
* @param节点
*/
“visiblefortesting
public void startdecommission(datanodedescriptor节点){
如果(!节点。isdecommissioninprogress()){
如果(!节点。IsAlive){
日志信息(“死结{ }是立即退役。”节点);
setdecommissioned()节点;
如果(!节点。isdecommissioned()){
对于(datanodestorageinfo存储节点。getstorageinfos()){
日志信息(“从退役中{ } { }与{ }块”,
节点,存储,存储numblocks());
}
/ /更新数据维护heartbeatmanager DN
hbmanager startdecommission(节点);
节点。decommissioningstatus setstarttime(monotonicnow());
pendingnodes添加(节点);
}
}
…
在代码的最后一行,这个节点被加入到了pendingnodes列表中了。如果各位同学之前研究过decommisionmanager这个类,应该知道里面会有一个专门的线程用以监视,下线中的节点是否已经结束。

* * * *
*查看DNS完成退役。
***
因为这是保持系统锁了,
*每监测蜱的工作数量是有限的。
*
私人班班长实现Runnable {
…

“重写”
run() { public void
如果(!系统。isrunning()){
日志信息(“系统不运行,跳过退役检查”
+“。”);
返回;
}
…
在运行方法,会进行2个操作:

“重写”
run() { public void
…
尝试{
processpendingnodes();
check();
最后{
writeunlock()系统;
}
如果(numblockschecked + numnodeschecked > 0){
日志信息(“检查{ } { }块和节点这滴答”,numblockschecked,
numnodeschecked);
}
}
processpendingnodes的作用是将之前加入到pendingnodes对象中的节点逐步移出到下线节点中。

* * * *
*流行数据节点下未决名单和decomnodeblocks,
*受限制的maxconcurrenttrackednodes。
*
processpendingnodes() { private void
而(!pendingnodes。isempty() &
(maxconcurrenttrackednodes = = 0 | |
decomnodeblocks。size() < maxconcurrenttrackednodes)){
decomnodeblocks。把(pendingnodes。poll(),null);
}
}
然后,检查方法才是真正的块扫描,判断是否还有副本数不够的块块:

check() { private void
最后的迭代器<地图。进入<< datanodedescriptor abstractlist blockinfocontiguous > > >,<
它为新的cycliciteration <>(decomnodeblocks,iterkey)。();
最后datanodedescriptor > <去除=新链表LinkedList <>();

而hasnext()(它。
& &!exceedednumblockspercheck()
& &!exceedednumnodespercheck()){
…
如果(块=空){
/这是一个新添加的节点,通过其表附表
复制和收集块复制的复制块/ / / / / / / / / / / / / / / /
/ / / /这是不够的进一步跟踪复制
日志。调试(“新添加的节点{ },做全扫描以查找”
“不够,DN)复制块。”;
块= handleinsufficientlyreplicated(DN);
decomnodeblocks放(DN,块);
fullscan =真;
{ }
/这是一个已知的DataNode,检查其#不够
/ /复制块下降至零,如果可以decommed
日志。调试(“处理善后进度节点{ }”,DN);
prunesufficientlyreplicated(DN,块);
}
…
检查内的判断逻辑比较多,大致意思就是持续判断是否还存在不足够副本的块块,不够则继续监控,直到这个数值为0,然后从下线节点中移除,感兴趣的同学,可以自行研究。在handleinsufficientlyreplicated内部函数的操作中,就会将不足副本数的块块加入到neededreplication中。

processblocksfordecominternal(private void
最后datanodedescriptor DataNode,
最后blockinfocontiguous > <它的迭代器,
最后的名单insufficientlyreplicated blockinfocontiguous > <,
布尔prunesufficientlyreplicated){
…
而(它。hasnext()){
…
最后numberreplicas Num = blockmanager countnodes(块);
最后livereplicas =民livereplicas();
最后curreplicas = livereplicas;

复制复制复制块复制的时间/ /或进度表,如果尚未
/ /等待
如果(blockmanager。isneededreplication(块,getblockreplication() BC,
livereplicas)){
如果(!blockmanager。neededreplications。包含(块)&
blockmanager。pendingreplications。getnumreplicas(块)= = 0 &
系统。ispopulatingreplqueues()){
当有源神经网络是在安全模式下,这些块。
blockmanager。neededreplications添加(块,
curreplicas,
民数decommissionedreplicas(),
公元前getblockreplication());
}
}
…
}
这也就是为什么待复制块骤然增加的原因。

停止退役

那停止下线操作又做了什么操作呢,至少有1点我们可以确定的是,他没有将原本存在与下线节点中的块块从neededreplications对象中移除掉。

* * * *
*停止退役指定DataNode。
* @param节点
*
无效stopdecommission(datanodedescriptor节点){
如果(节点。isdecommissioninprogress() | |节点。isdecommissioned()){
日志信息(“停止退役节点{ }”,节点);
/ /更新数据维护heartbeatmanager DN
hbmanager stopdecommission(节点);
/ / / /过度复制的块将被检测和处理时
死节点返回并发送完整的报表。
/ /在decomnodes原块将被移除
/ / neededreplications如果节点是退役的进展。
如果(节点。IsAlive){
blockmanager processoverreplicatedblocksonrecommission(节点);
}
从跟踪decommissionmanager / /
pendingnodes删除(节点);
decomnodeblocks删除(节点);
{ }
日志。微量(“stopdecommission:结{ }不退役进程”+
“或退役,什么都不做。”节点);
}
}
操作并不多,可以看到,这里只是将下线节点移除,就是decomnodeblocks,还做了多余副本块的清除,这些副本块就是之前下线状态中复制的哪些副本块。的确,少了我们所需要的那个移除动作,所以我们需要增加这样一个方法,在processoverreplicatedblocksonrecommission之后。


中止下线操作后移除残余副本块解决方案

我们首先在decommissionmanager这个类中定义一个新的方法,如下:

removeneededreplicatedblocksindecomnodes(private void
最后datanodedescriptor DataNode){
最后blockinfocontiguous > < =迭代器getblockiterator() DataNode;

而(它。hasnext()){
最后blockinfocontiguous块=它。next();
blockcollection BC = blockmanager。blocksMap getblockcollection(块);
如果(公元前=零){
/ /孤儿块,将被视为最终。跳跃。
继续;
}

最后numberreplicas Num = blockmanager countnodes(块);
最后livereplicas =民livereplicas();
最后curreplicas = livereplicas;

如果(!blockmanager。isneededreplication(块,getblockreplication() BC,
livereplicas)){
blockmanager。neededreplications。删除(块,curreplicas,
民数decommissionedreplicas(),公元前getblockreplication());
}
}
}
逻辑很简单,判断是否还需要副本,如果不需要,则移除即可,而传入的节点就是原下线节点,有点重新注册的味道在里面。然后加入到stopdecommission操作中,修改后的结果如下:

* * * *
*停止退役指定DataNode。
* @param节点
*
无效stopdecommission(datanodedescriptor节点){
如果(节点。isdecommissioninprogress() | |节点。isdecommissioned()){
日志信息(“停止退役节点{ }”,节点);
adminstates adminstate =节点。getadminstate();
/ /更新数据维护heartbeatmanager DN
hbmanager stopdecommission(节点);
/ / / /过度复制的块将被检测和处理时
死节点返回并发送完整的报表。
/ /在decomnodes原块将被移除
/ / neededreplications如果节点是退役的进展。
如果(节点。IsAlive){
blockmanager processoverreplicatedblocksonrecommission(节点);

如果(adminstate = = adminstates。decommission_inprogress){
removeneededreplicatedblocksindecomnodes(节点);
}
}
从跟踪decommissionmanager / /
pendingnodes删除(节点);
decomnodeblocks删除(节点);
{ }
日志。微量(“stopdecommission:结{ }不退役进程”+
“或退役,什么都不做。”节点);
}
}
这里是需要判断一下节点的状态的,因为如果节点是退役状态,那么待复制块基本已经被复制完了,所以意义不大附上单元测试,测试已通过:

“测试”
公共无效testdecommissionremovingneededreplicatedblocks()
抛出InterruptedException异常,{
国际underreplicatedblocksnum;
国际neededreplicatedblocksnum;
国际sleepintervaltime = 5000;
国际numnamenodes = 1;
国际numdatanodes = 2;
//设置重复数等于DataNode的民
/ /所以块的数量每个节点实际上是文件块的数量
国际numdatanodes副本=;
conf.setint(dfsconfigkeys.dfs_replication_key,副本);
startcluster(numnamenodes,numdatanodes,机密);

< < > > datanodeinfo ArrayList ArrayList namenodedecomlist =
新datanodeinfo > > < < ArrayList ArrayList(numnamenodes);
为(int i = 0;i < numnamenodes;i++){
namenodedecomlist。添加(我,新<< datanodeinfo ArrayList >(numdatanodes));
}

//计算文件块总数Num
neededreplicatedblocksnum =(int)数学。细胞(1×文件大小/块);
路径文件=新路径(“testdecommission .dat”);
为(int迭代= 0;迭代< numdatanodes - 1;迭代+ +){
//开始退役一个Namenode
为(int i = 0;i < numnamenodes;i++){
filesys =集群文件系统。getfilesystem(我);
fsnamesystem ns =集群。getnamesystem(我);
blockmanager blcokmanager = NS。getblockmanager();

WriteFile(filesys,文件副本);

dfsclient客户端= getdfsclient(集群。getnamenode(我),机密);
datanodeinfo [ ]信息=客户。datanodereport(datanodereporttype。生活);

ArrayList <字符串> decommissionednodes =新的ArrayList <字符串>();
decommissionednodes。添加(信息[ 0 ]。getxferaddr());
writeconfigfile(excludefile,decommissionednodes);
refreshnodes(集群。getnamesystem(我),机密);
//返回给定的DataNode datanode描述符。
namenodeadapter。getdatanode(集群。getnamesystem(I)、信息[ 0 ]);

/ /睡眠一些时间让decommissionmanager监控线程扫描
/ /块
线程睡眠(sleepintervaltime);
underreplicatedblocksnum =
blcokmanager。getunderreplicatednotmissingblocks();
assertEquals(neededreplicatedblocksnum,underreplicatedblocksnum);

/ /删除decommissionednodes从排除文件
/ / neededreplications块将被删除
decommissionednodes。clear();
writeconfigfile(excludefile,decommissionednodes);
refreshnodes(集群。getnamesystem(我),机密);

underreplicatedblocksnum =
blcokmanager。getunderreplicatednotmissingblocks();
assertEquals(0,underreplicatedblocksnum);

cleanupfile(filesys,文件);
}
}

/ /重启群和确保退役数据节点
/ /被允许与NameNode寄存器
shutdown()集群;
startcluster(numnamenodes,numdatanodes,机密);
shutdown()集群;
}
OK, the solution is more than dozens of lines of code, but to write these few lines of code, you need for a set of mechanisms and the surrounding principle understanding, in fact, is not so simple, I hope you all have a harvest, this issue, I have submitted to the open source community, HDFS-9685.


Related links

Issue link:Https://issues.apache.org/jira/browse/HDFS-9685

Patch Github link:Https://github.com/linyiqun/open-source-patch/blob/master/hdfs/HDFS-9685/HDFS-9685.001.patch


top
One
step on
Zero
Guess you're looking for
View comments
* the above user comments only represent their personal views, does not represent the views or position of the CSDN website
    personal data
    • visit221997 times
    • Integral:Three thousand eight hundred and twenty-eight
    • Grade
    • Rank:4733rd name
    • original192
    • Reproduced:0
    • Translation:0
    • Comments:90
    Blogger description
      Graduated from the computer department of HDU, research in the field of distributed computing, data, data mining, machine learning, algorithm, Hadoop open-source community to contribute actively, at present the inaugural in domestic vertical women business platform, mushroom street, data platform, name: Tetsuya Xuan
    Blog column
    Latest comments