06 February 2017

Writing Data To HDFS From Java

When you want to write a file into HDFS, things are quite different from writing to a local file system. Writing to a file on any file system is an operation that can fail, but with HDFS there are many more potential problems than with a local file, so your code should be designed to handle failures.

At a very high level, when you want to open a file for write on HDFS, these are the steps the client must go through:

  1. Contact the namenode and tell it you want to create file /foo/bar

  2. Assuming you have the relevant permissions, the Namenode will reply with the list of datanodes to write the data to.

  3. The client will then open a TCP connection to the first datanode to establish a write pipeline

  4. The first part of this write pipeline involves starting a thread within the datanode process (called an xciever). This thread will open a local file to store the data into, and it will also make a TCP connection to the next datanode in the pipeline, which will start up a similar xciever thread. Assuming a replication factor of 3, this second datanode will open a connection to the third and final datanode.

  5. The client will start writing data to the first datanode, which will save it to disk and forward to the second datanode, which will also write to disk and forward to the third.

  6. After the client has written the blocksize of data (128MB generally), this pipeline will be closed and the client will ask the namenode where to write the next block, and this process will repeat until the file is closed.

There are a few not so obvious things to consider here:

  1. The TCP connection from the client to the first datanode and then the chain onto the second and third node will remain open indefinitely, assuming the client process stays alive, the file is not closed and no routers or firewalls are involved to drop the TCP connection.

  2. While the file is still open, there is a thread, an open file and a TCP socket tied up on all three datanodes. So if you have a very large number of open files (either for read or write) against a small number of datanodes, the xciever thread limit (dfs.datanode.max.xcievers) and OS open file limits may need increasing.

Locking

HDFS is an append only file system - you cannot seek to a position in a file and perform writes in random locations. For that reason it only makes sense that a single process can have a file open for writing at one time. To enforce this, the Namenode grants a lease to the process which opens the file for writing. If a second process attempts to open the same file for append or writing, an error like the following will be returned:

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to create file [/user/vagrant/testwriter] for [DFSClient_NONMAPREDUCE_-642075008_1] for client [192.168.33.6], because this file is already being created by [DFSClient_NONMAPREDUCE_-608672003_1] on [192.168.33.6]

When a client is writing a file, it is responsible for renewing the lease periodically. Even if a client has many files open, it only requires a single lease for all of them. The namenode tracks all the current leases, and if a client does not renew its lease each 1 minute, another process can forcibly take over the lease and open the file. After 1 hour of no updates from the client, the namenode assumes the client has died and closes the file automatically. Luckily, the HDFS client code takes care of lease renewal, and end users of the API don't need to worry about it, but it is important to be aware of it.

If your client process crashes, or exits in such a way that it does not close any HDFS files that are already open for writing, you may find that when the process is restarted it cannot reopen the file, giving the same error as above. Eventually (after about 1 hour) the namenode will close this file, but it may not be convenient to wait that long. To work around this, you can use the hdfs debug command to force the file closed:

$ hdfs debug recoverLease -path /user/vagrant/testwriter  -retries 5
recoverLease SUCCEEDED on /user/vagrant/testwriter

This blog post on the hdfs block recovery process gives a very good overview of the lease and what happens if all the block replicas on each node do not match.

Simple Java Code for Writing to HDFS

The following code snippet is all that is required to write to HDFS:

package com.sodonnel.Hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HdfsWriter {
  public static void main(String[] args) throws IOException, InterruptedException {

    Configuration configuration = new Configuration();
    FileSystem hdfs = FileSystem.get( configuration );
    Path file = new Path("/user/vagrant/testwriterl");

    FSDataOutputStream os;

    if ( hdfs.exists( file )) {
      //hdfs.delete( file, true );
      os = hdfs.append( file ); 
    } else {
      os = hdfs.create( file );
    }
    // Note this writes to the file in unicode, where each 
    // character has a single 
    os.writeChars("this is a string to write");

    os.close();
    hdfs.close();
   }
}

To run it, you can compile the code into a JAR, set the classpath to include the Hadoop jars and run it like a normal Java program:

export CLASSPATH=$(hadoop classpath):HdfsWriter-1.0-SNAPSHOT.jar:.
java com.sodonnel.Hadoop.HdfsWriter

Current File Size

One side effect of how HDFS works, is that you cannot really tell how large a file currently is while it is being written from the normal hadoop fs -ls command. The namenode decides which datanodes will receive the blocks, but it is not involved in tracking the data written to them, and the namenode is only updated periodically. After poking through the DFSClient source and running some tests, there appear to be 3 scenarios where the namenode gets an update on the file size:

  1. When the file is closed
  2. When a block is filled and a new one is created, the namenode size will be incremented by the blocksize. This means that the file will look like it is growing in block sized chunks.
  3. The first time a sync / hflush operation is called on a block, it updates the size in the namenode too. If you write 1MB, and then sync it, the Namenode will report the size as 1MB. If you then write another 1MB and sync again, the Namenode will still report 1MB. Assuming a 128MB blocksize, the next update to size on the Namenode will be at 128MB, then 129MB, then 256MB and so on.

You can see this process in action with a simple test program:

public class HdfsWriter {
  public static void main(String[] args) throws IOException, InterruptedException {

    Configuration configuration = new Configuration();
    FileSystem hdfs = FileSystem.get( configuration );
    Path file = new Path("/user/vagrant/testwrites");

    int ONE_MB = 524288;

    FSDataOutputStream os;

    if ( hdfs.exists( file )) {
      hdfs.delete( file, true );
    }
    // Create a file with a rep factor of 1 and blocksize of 1MB
    os = hdfs.create( file, true, 2, (short)1, (long)1048576 );

    // Blocks size is 1MB, 524288 is 0.5MB, each write is 2 bytes
    // so 524288 writes will be 1MB, so in this case we will write 3MB
    // into 3 blocks in total
    for(int i = 0; i < 3*ONE_MB; i++) {
      os.writeChars("a");

      // Sync every 256KB, but not a the 1MB boundaries
      if ( i > 0 && (i+1) % (ONE_MB/4) == 0 && (i+1) % ONE_MB != 0 ) {
        System.out.println("SYNC");
        os.hsync();
      } 
      // Print file every 16KB to see where the size changes
      if (i > 0 && (i+1) % (ONE_MB / 16) == 0) {
        FileStatus[] files = hdfs.listStatus(file);
        System.out.println("Data written: "+ (i+1)*2/1024.0/1024.0 +" MB; Current file size: "+ files[0].getLen()/1024.0/1024.0 +" MB");
      } 
    }
    os.close();
    FileStatus[] files = hdfs.listStatus(file);
    System.out.println("Size on closing: "+ files[0].getLen()/1024.0/1024.0 +" MB");
    hdfs.close();
   }
}

This program creates a file with a 1MB block size, and then writes a single 2 byte character over and over to create a file exactly 3MB and 3 blocks in length. After writing each 65KB, we print out the bytes written and query the Namenode for the size of the file. Additionally, every 256KB, we perform a file sync unless we have written an even MB, which is a block boundary. The output looks like this:

Data written: 0.0625 MB; Current file size: 0.0 MB
Data written: 0.125 MB; Current file size: 0.0 MB
Data written: 0.1875 MB; Current file size: 0.0 MB
SYNC
Data written: 0.25 MB; Current file size: 0.25 MB
Data written: 0.3125 MB; Current file size: 0.25 MB
Data written: 0.375 MB; Current file size: 0.25 MB
Data written: 0.4375 MB; Current file size: 0.25 MB
SYNC
Data written: 0.5 MB; Current file size: 0.25 MB
Data written: 0.5625 MB; Current file size: 0.25 MB
Data written: 0.625 MB; Current file size: 0.25 MB
Data written: 0.6875 MB; Current file size: 0.25 MB
SYNC
Data written: 0.75 MB; Current file size: 0.25 MB
Data written: 0.8125 MB; Current file size: 0.25 MB
Data written: 0.875 MB; Current file size: 0.25 MB
Data written: 0.9375 MB; Current file size: 0.25 MB
Data written: 1.0 MB; Current file size: 0.25 MB
Data written: 1.0625 MB; Current file size: 1.0 MB
Data written: 1.125 MB; Current file size: 1.0 MB
Data written: 1.1875 MB; Current file size: 1.0 MB
SYNC
Data written: 1.25 MB; Current file size: 1.25 MB
Data written: 1.3125 MB; Current file size: 1.25 MB
Data written: 1.375 MB; Current file size: 1.25 MB
Data written: 1.4375 MB; Current file size: 1.25 MB
SYNC
Data written: 1.5 MB; Current file size: 1.25 MB
Data written: 1.5625 MB; Current file size: 1.25 MB
Data written: 1.625 MB; Current file size: 1.25 MB
Data written: 1.6875 MB; Current file size: 1.25 MB
SYNC
Data written: 1.75 MB; Current file size: 1.25 MB
Data written: 1.8125 MB; Current file size: 1.25 MB
Data written: 1.875 MB; Current file size: 1.25 MB
Data written: 1.9375 MB; Current file size: 1.25 MB
Data written: 2.0 MB; Current file size: 1.25 MB
Data written: 2.0625 MB; Current file size: 2.0 MB
Data written: 2.125 MB; Current file size: 2.0 MB
Data written: 2.1875 MB; Current file size: 2.0 MB
SYNC
Data written: 2.25 MB; Current file size: 2.25 MB
Data written: 2.3125 MB; Current file size: 2.25 MB
Data written: 2.375 MB; Current file size: 2.25 MB
Data written: 2.4375 MB; Current file size: 2.25 MB
SYNC
Data written: 2.5 MB; Current file size: 2.25 MB
Data written: 2.5625 MB; Current file size: 2.25 MB
Data written: 2.625 MB; Current file size: 2.25 MB
Data written: 2.6875 MB; Current file size: 2.25 MB
SYNC
Data written: 2.75 MB; Current file size: 2.25 MB
Data written: 2.8125 MB; Current file size: 2.25 MB
Data written: 2.875 MB; Current file size: 2.25 MB
Data written: 2.9375 MB; Current file size: 2.25 MB
Data written: 3.0 MB; Current file size: 2.25 MB
Size on closing: 3.0 MB

We can see that the file looks like it has zero bytes until a sync is performed at 256KB. Then, despite two further syncs, it stays at 256KB proving that further syncs against the same block will not update the Namenode size. Then it updates to 1MB at slightly over 1MB written. The namenode size is actually updated when the new block is created, but you may have to write up to 65KB over the block size before that happens due to the default packet size of 65KB in DFSClient.java. Then the same pattern repeats for the second and third block.

One thing that sometimes confuses users, is that if you perform a get on the file while it is being written, the size of the file pulled from HDFS will be the actual size of the file at the time it was pulled, and not the size reported by the namenode.

When Datanodes Fail

If you have a long running process writing data to HDFS, you need to be concerned with what happens when a datanode fails. In general each piece of data written to HDFS is persisted onto 3 datanodes in a write pipeline, as the default replication factor is 3, but it is possible to override this to a lower value if you wish.

With the default replication factor, if one datanode in pipeline fails, the write should continue unaffected, and the client will produce an WARN message like the following:

17/02/04 04:19:00 WARN hdfs.DFSClient: DFSOutputStream ResponseProcessor exception  for block BP-1223970327-10.17.81.191-1483542430690:blk_1073767685_26861
java.io.IOException: Bad response ERROR for block BP-1223970327-10.17.81.191-1483542430690:blk_1073767685_26861 from datanode DatanodeInfoWithStorage[10.17.81.194:50010,DS-a1e673dd-4d4f-4c27-b437-2a61657e2c97,DISK]
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:1022)
17/02/04 04:19:00 WARN hdfs.DFSClient: Error Recovery for block BP-1223970327-10.17.81.191-1483542430690:blk_1073767685_26861 in pipeline DatanodeInfoWithStorage[10.17.81.193:50010,DS-7049acbc-93ae-4574-9f39-2c6a9a0e81ac,DISK], DatanodeInfoWithStorage[10.17.81.194:50010,DS-a1e673dd-4d4f-4c27-b437-2a61657e2c97,DISK]: bad datanode DatanodeInfoWithStorage[10.17.81.194:50010,DS-a1e673dd-4d4f-4c27-b437-2a61657e2c97,DISK]

It is possible for two of the datanodes to fail, but if the third also fails, you will get an error like the following and the write will fail:

17/02/04 04:39:27 ERROR hdfs.DFSClient: Failed to close inode 42255
java.io.IOException: All datanodes DatanodeInfoWithStorage[10.17.81.193:50010,DS-7049acbc-93ae-4574-9f39-2c6a9a0e81ac,DISK] are bad. Aborting...
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1386)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1147)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:632)

Remember that writes are on a block by block basis, so if you get an issue where a couple of the datanodes fail and you are left with only one, that is only the case until the next block is started. Assuming there are still enough live nodes in the cluster, and new write pipeline will then be established containing 3 datanodes once again.

If you are writing a file with a replication factor of 3, and one or two of the datanodes fails, then it means there will only be one complete block on the cluster. The namenode will notice this and create new replicas quite quickly, but until this happens there is a higher than usual risk of data loss on that file.

Replacing Failed Nodes

The HDFS client code has some options to handle datanode failures during writes in different ways, known as the "DataNode Replacement Policy". This blog post has some good information about the replacement policies. Assuming you are writing a file with the default replication factor, the general case is that if 1 datanode fails, the writes will continue with just 2 datanodes. However if another fails, then the client will attempt to replace one of the failed nodes. If there are not enough datanodes left in the cluster to do the replacement, then the write will still continue. There are options to make this more strict, in that the failed node must always be replaced or an exception will be thrown.

Replication Factor 1

If you create a file with replication factor 1, and the only datanode in the write pipeline crashes, then the file write will fail, either with one of the errors shown above, or one like the following:

17/02/04 12:02:44 ERROR hdfs.DFSClient: Failed to close inode 17733
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/vagrant/testwrites could only be replicated to 0 nodes instead of minReplication (=1).  There are 1 datanode(s) running and 1 node(s) are excluded in this operation.
    at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1622)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3325)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:679)
    at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:214)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:489)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)

There isn't really much else that can happen in this scenario, as the datanode that failed will have a partially written block on it, and it will not be possible to replicate it onto another node if the original data source is no longer present.

Writing files with a replication factor of 1 can be faster, but the there is a much higher risk of data loss, both during the write and later when the file needs to be read. Therefore only temporary files or files that can be easily reconstructed from another source should use a replication factor of 1.

blog comments powered by Disqus