HDFS源码分析(5):datanode数据块的读与写

March 28, 2012 / block, datanode, HDFS, read, source, write

前提

Hadoop版本:hadoop-0.20.2

概述

现在已经知道datanode是通过DataXceiver来处理客户端和其它datanode的请求,在分析DataXceiver时已经对除数据块的读与写之外的操作进行了说明,本文主要分析比较复杂而且非常重要的两个操作:读与写。对于用户而言,HDFS用得最多的两个操作就是写和读文件,而且在大部分情况下,是一次写入,多次读取,满足高吞吐量需求而非低延迟,除去客户端与namenode的协商,剩下的部分主要是客户端直接与datanode通信(数据流的头部在上篇文章中已介绍),发送或接收数据,而这些数据在datanode如何接收并写入磁盘、如何从磁盘读出并发送出去就是本文所要介绍的内容。

DataChecksum

无论是读数据还是写数据,都会涉及到checksum,我们先来看看DataChecksum的结构,该类位于org.apache.hadoop.util这个包下,有以下几个主要属性:

DataChecksum的header有5个字节,其中type占1个字节,bytesPerChecksum占4个字节。

DataChecksum有如下几类方法:

BlockMetadataHeader

一直在说一个块有数据文件和元数据文件,有了上边对checksum的分析,下面我们来揭开datanode上管理数据块元数据的BlockMetadataHeader的面纱,数据块无数据的最大部分是块的CRC,这部分与namenode与块相关的功能无关。

有两个属性:

那么元数据文件的header总共有7个字节,元数据文件的结构大概如下所示:

 +---------------------------------------------------+
 |     2 byte version       |   1 byte checksum type |
 +---------------------------------------------------+
 | 4 byte bytesPerChecksum  |   4 byte checksum      |
 +---------------------------------------------------+
 |   Sequence of checksums  |
 +--------------------------+

BlockMetadataHeader提供以下几类方法:

BlockSender

从BlockSender这个名字我们就能够知道它的作用是用于发送块文件,首先,我们来看看其重要的属性:

下面,我们来看看其构造方法,其定义如下:

 BlockSender(Block block, long startOffset, long length,
     boolean corruptChecksumOk, boolean chunkOffsetOK,
     boolean verifyChecksum, DataNode datanode, String clientTraceFmt);

参数有很多:

初始化的过程如下:

首先,我们来看看sendBlock方法,其定义如下:

 long sendBlock(DataOutputStream out, OutputStream baseStream,
     BlockTransferThrottler throttler) throws IOException;

其中:

sendBlock的处理流程是这样的:

到此我们知道发送的块数据如下所示:

 +-----------------------------------------------------+
 | 1 byte checksum type     | 4 byte bytesPerChecksum  |
 +-----------------------------------------------------+
 |       8 byte offset if chunkOffsetOK=true           |
 +-----------------------------------------------------+
 | Sequence of data PACKETs |         4 byte 0         |
 +-----------------------------------------------------+

接下来,我们来看看sendChunks的处理流程,sendChunks的功能是发送一个packet,具体发送的chunk数由参数maxChunks指定:

由以上分析,我们可知packet的结构如下:

 +-----------------------------------------------------+
 | 4 byte packet length (excluding packet header)      |
 +-----------------------------------------------------+
 | 8 byte offset in the block | 8 byte sequence number |
 +-----------------------------------------------------+
 | 1 byte isLastPacketInBlock                          |
 +-----------------------------------------------------+
 | 4 byte Length of actual data                        |
 +-----------------------------------------------------+
 | x byte checksum data. x is defined below            |
 +-----------------------------------------------------+
 | actual data ......                                  |
 +-----------------------------------------------------+

其中x是根据以下表达式计算出来的:

 x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM * 
       CHECKSUM_SIZE

BlockReceiver

BlockReceiver主要作用是接收块文件,首先,我们来看看其重要的属性:

下面,我们来看看其构造方法,其定义如下:

 BlockReceiver(Block block, DataInputStream in, String inAddr,
     String myAddr, boolean isRecovery, String clientName,
     DatanodeInfo srcDataNode, DataNode datanode) throws IOException {

参数也有不少:

初始化的过程如下:

BlockReceiver这个类比较复杂,有一千行左右代码,我们以客户端写文件为例来说明其处理过程,如下图所示:

从上图可以看出数据被分成64KB的packet从客户端沿着pipeline逐一发送到所有的datanode,到达最后一个datanode后,应答信息ACK从最后一个datanode沿着pipeline送回客户端,客户端收到ACK就能够知道数据是否发送成功。对于每个datanode,其职责是接收数据包并将数据包发送到其下游datanode,收到ACK后,对ACK进行加工后发送给上游的datanode或client。如果是拷贝块数据操作,是不需要发送应答包的,过程比上图要简单,只需要把数据从一个datanode发送到另一个datanode。

那么,可以将下面的内容分成接收数据和发送应答包两部分,首先,我们来看看接收数据的入口receiveBlock方法:

  void receiveBlock(
      DataOutputStream mirrOut, // output to next datanode
      DataInputStream mirrIn,   // input from next datanode
      DataOutputStream replyOut,  // output to previous datanode
      String mirrAddr, BlockTransferThrottler throttlerArg,
      int numTargets) throws IOException;

先来分析其参数:

处理的过程如下面的流程图所示:

在这个过程中,需要注意的是setBlockPosition这个方法,如果块文件之前已经finalize了,并且isRecovery为false或者offsetInBlock超过块的大小,那么会抛异常。前边已经讲到每个块文件会被分割成多个chunk,然后对每个chunk做checksum,在这里,如果offsetInBlock不与chunk的边界对齐,那么需要先读出offsetInBlock所位于chunk在offsetInBlock之前数据的checksum,再更新接收到的数据,这样才能确保checksum的正确性。

下面就来看看发送应答包是怎么回事,相关的类有PacketResponder、Packet和PipelineAck,PipelineAck是接口org.apache.hadoop.hdfs.protocol.DataTransferProtocol的内部静态类。先来看看简单的Packet,纯粹就是一个数据结构,有两个属性:

PipelineAck封装了应答的内容,我们来看看其属性:

一个ACK的内容如下所示:

 +-----------------------------------------------------+
 |    8 byte seqno       | Sequence of 2 byte replies  |
 +-----------------------------------------------------+

如何判断一个ACK是否是成功呢?很简单,只要replies中有值不为OP_STATUS_SUCCESS,那么就不成功。

好了,只剩下一个PacketResponder了,先看其属性:

PacketResponder的处理有两种不同的方式:numTargets=0,说明这是pipeline的最后一个datanode;有下游datanode。

先来看看最后一个datanode是如何处理每个packet的:

如果不是最后一个datanode又是如何处理的:

后记

文中若有错误或疏漏之处,烦请批评指正。