首页 百科知识 源代码分析(四零)

源代码分析(四零)

时间:2023-09-22 百科知识 版权反馈
【摘要】:有了上面的基础,我们可以来解剖DFSOutputStream了。同时,append方法调用时,Client已经知道了最后一个Block的信息和文件的一些信息,如FileStatus中包含的Block大小,文件权限位等等。createBlockOutputStream用于建立到第一个DataNode的连接,它的声明如下:privatebooleancreateBlockOutputStreamnodes是所有接收数据的DataNode列表,client就是客户端名称,recoveryFlag指示是否是为错误恢复建立的连接。如果出错,记录出错的DataNode在nodes中的位置,设置errorIndex并返回false。

有了上面的基础,我们可以来解剖DFSOutputStream了。先看构造函数:

    privateDFSOutputStream(String src, longblockSize, Progressable progress,

        intbytesPerChecksum) throwsIOException

 

    DFSOutputStream(String src, FsPermissionmasked, boolean overwrite,

        shortreplication, long blockSize,Progressable progress,

        intbuffersize, intbytesPerChecksum) throwsIOException

 

    DFSOutputStream(String src, intbuffersize, Progressable progress,

        LocatedBlock lastBlock, FileStatusstat,

        intbytesPerChecksum) throwsIOException {

这些构造函数的参数主要有:文件名src;进度回调函数progress(预留接口,目前未使用);数据块大小blockSize;Block副本数replication;每个校验chunk的大小bytesPerChecksum;文件权限masked;是否覆盖原文件标记overwrite;文件状态信息stat;文件的最后一个Block信息lastBlock;buffersize(?未见引用)。

后面两个构造函数会调用第一个构造函数,这个函数会调用父类的构造函数,并设置对象的src,blockSize,progress和checksum属性。

第二个构造函数会调用namenode.create方法,在文件空间中建立文件,并启动DataStreamer,它被DFSClient的create方法调用。第三个构造函数被DFSClient的append方法调用,显然,这种情况比价复杂,文件拥有一些数据块,添加数据往往添加在最后的数据块上。同时,append方法调用时,Client已经知道了最后一个Block的信息和文件的一些信息,如FileStatus中包含的Block大小,文件权限位等等。结合这些信息,构造函数需要计算并设置一些对象成员变量的值,并试图从可能的错误中恢复(调用processDatanodeError),最后启动DataStreamer。

我们先看正常流程,前面已经分析过,通过FSOutputSummer,HDFS客户端能将流转换成package,这个包是通过writeChunk,发送出去的,下面是它们的调用关系。


mhtml:file://I:\技术文章下载\2010-4-23整理到notebook\Hadoop汇总2010-4-22\Hadoop源码分析\Hadoop源代码分析(四零)%20-%20-%20JavaEye技术网站.mht!http://caibinbupt.javaeye.com/upload/attachment/70140/02d33797-7324-3a51-998f-0e22191f2653.jpg
 

在检查完一系列的状态以后,writeChunk先等待,直到dataQueue中未发送的包小于门限值。如果现在没有可用的Packet对象,则创建一个Packet对象,往Packet中写数据,包括校验值和数据。如果数据包被写满,那么,将它放入发送队列dataQueue中。writeChunk的过程比较简单,这里的写入,也只是把数据写到本地队列,等待DataStreamer发送,没有实际写到DataNode上。

createBlockOutputStream用于建立到第一个DataNode的连接,它的声明如下:

privatebooleancreateBlockOutputStream(DatanodeInfo[] nodes, String client,

                    booleanrecoveryFlag)

nodes是所有接收数据的DataNode列表,client就是客户端名称,recoveryFlag指示是否是为错误恢复建立的连接。createBlockOutputStream很简单,打开到第一个DataNode的连接,然后发送下面格式的数据包,并等待来自DataNode的Ack。如果出错,记录出错的DataNode在nodes中的位置,设置errorIndex并返回false。

mhtml:file://I:\技术文章下载\2010-4-23整理到notebook\Hadoop汇总2010-4-22\Hadoop源码分析\Hadoop源代码分析(四零)%20-%20-%20JavaEye技术网站.mht!http://www.javaeye.com/upload/attachment/55868/7acaf42f-11aa-3c06-84e1-70583b43f621.jpg

 

当recoveryFlag指示为真时,意味着这次写是一次恢复操作,对于DataNode来说,这意味着为写准备的临时文件(在tmp目录中)可能已经存在,需要进行一些特殊处理,具体请看FSDataset的实现。

当Client写数据需要一个新的Block的时候,可以调用nextBlockOutputStream方法。

    privateDatanodeInfo[] nextBlockOutputStream(String client) throwsIOException

这个方法的实现很简单,首先调用locateFollowingBlock(包含了重试和出错处理),通过namenode.addBlock获取一个新的数据块,返回的是DatanodeInfo列表,有了这个列表,就可以建立写数据的pipe了。下一个大动作就是调用上面的createBlockOutputStream,建立到DataNode的连接了。

有了上面的准备,我们来分析processDatanodeError,它的主要流程是:

l           参数检查;

l           关闭可能还打开着的blockStream和blockReplyStream;

l           将未收到应答的数据块(在ackQueue中)挪到dataQueue中;

l           循环执行:

1.      计算目前还活着的DataNode列表;

2.      选择一个主DataNode,通过DataNode RPC的recoverBlock方法启动它上面的恢复过程;

3.      处理可能的出错;

4.      处理恢复后Block可能的变化(如Stamp变化);

5.      调用createBlockOutputStream到DataNode的连接。

l           启动ResponseProcessor。

这个过程涉及了DataNode上的recoverBlock方法和createBlockOutputStream中可能的Block恢复,是一个相当耗资源的方法,当系统出错的概率比较小,而且数据块上能恢复的数据很多(平均32M),还是值得这样做的。

写的流程就分析到着,接下来我们来看流的关闭,这个过程也涉及了一系列的方法,它们的调用关系如下:


mhtml:file://I:\技术文章下载\2010-4-23整理到notebook\Hadoop汇总2010-4-22\Hadoop源码分析\Hadoop源代码分析(四零)%20-%20-%20JavaEye技术网站.mht!http://caibinbupt.javaeye.com/upload/attachment/70139/a824de45-9f3d-329c-bd06-f7007e741af6.jpg
 

 

flushInternal会一直等待到发送队列(包括可能的currentPacket)和应答队列都为空,这意味着数据都被DataNode顺利接收。

sync作用和UNIX的sync类似,将写入数据持久化。它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。然后调用namenode.fsync,持久化命名空间上的数据。

closeInternal比较复杂一点,它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。接着结束两个工作线程,关闭socket,最后调用amenode.complete,通知NameNode结束一次写操作。close方法先调用closeInternal,然后再本地的leasechecker中移除对应的信息。


免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈