Abracadabra

Hadoop Distributed Filesystem notes

HDFS Concepts

Blocks

  • 128 MB by default
    • HDFS blocks are large compared to disk blocks, and the reason is to minimize the cost
      of seeks.
  • Having a block abstraction for a distributed filesystem brings several benefits
    • A file can be larger than any single disk in the network.
    • Simplifies the storage subsystem.
    • Providing fault tolerance and availability.
  • % hdfs fsck / -files -blocks

Namenodes and Datanodes

  • An HDFS cluster has two types of nodes operating in a master−worker pattern
    • namenode (the master)
    • datanodes (workers)
  • Without the namenode, the filesystem cannot be used
    • For this reason, it is important to make the namenode resilient to failure
    • The first way is to back up the files that make up the persistent state of the filesystem
      metadata.
    • It is also possible to run a secondary namenode

Block Caching

  • For frequently accessed files the blocks may be explicitly cached in the datanode’s memory, in an off-heap block cache. By default, a block is cached in only one datanode’s memory.

HDFS Federation

  • one namenode might manage all the files rooted under /user, say, and a second name‐
    node might handle files under /share.
    • namespace volume
    • block pool
  • namenodes do not communicate with one another

HDFS High Avalibility

  • The new namenode is not able to serve requests until it has
    • loaded its namespace image into memory
    • replayed its edit log
    • received enough block reports from the datanodes to leave safe mode.
  • On large clusters with many files and blocks, the time it takes for a namenode to start from cold can be 30 minutes or more.
  • Hadoop 2 remedied this situation by adding support for HDFS high availability (HA).
    • there are a pair of namenodes in an active-standby configuration. In the event of the failure of the active namenode, the standby takes over its duties to continue servicing client requests without a significant interruption.
  • There are two choices for the highly available shared storage
    • NFS filer
    • quorum journal manager (QJM)
    • The actual observed failover time will be longer in practice (around a minute or so)
  • The transition from the active namenode to the standby is managed by a new entity in
    the system called the failover controller
    • default implementation uses ZooKeeper to ensure that only one namenode is active.
    • The QJM only allows one namenode to write to the edit log at one time

The Command-Line Interface

Basic Filesystem Operations

  • copying a file from the local filesystem to HDFS

    • 1
      2
      % hadoop fs -copyFromLocal input/docs/quangle.txt \
      hdfs://localhost/user/tom/quangle.txt
  • copy the file back to the local filesystem and check whether it’s the same

    • 1
      2
      3
      4
      % hadoop fs -copyToLocal quangle.txt quangle.copy.txt
      % md5 input/docs/quangle.txt quangle.copy.txt
      e7891a2627cf263a079fb0f18256ffb2 input/docs/quangle.txt
      MD5 (quangle.copy.txt) = e7891a2627cf263a079fb0f18256ffb2
  • create a directory first just to see how it is displayed in the listing

    • 1
      2
      3
      4
      5
      % hadoop fs -mkdir books
      % hadoop fs -ls .
      Found 2 items
      drwxr-xr-x - tom supergroup 0 2014-10-04 13:22 books
      -rw-r--r-- 1 tom supergroup 119 2014-10-04 13:21 quangle.txt

The Java Interface

Reading Data from a Hadoop URL

Example. Displaying files from a Hadoop filesystem on standard output using a URLStreamHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// cc URLCat Displays files from a Hadoop filesystem on standard output using a URLStreamHandler
import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.a
pache.hadoop.io.IOUtils;
// vv URLCat
public class URLCat {
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws Exception {
InputStream in = null;
try {
in = new URL(args[0]).openStream();
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
// ^^ URLCat

There’s a little bit more work required to make Java recognize Hadoop’s hdfs URL scheme. This is achieved by calling the setURLStreamHandlerFactory() method on URL with an instance of FsUrlStreamHandlerFactory. This method can be called only once per JVM, so it is typically executed in a static block.

Here’s a sample run:

1
2
3
4
5
6
% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

Reading Data Using the FileSystem API

Example. Displaying files from a Hadoop filesystem on standard output by using the FileSystem directly

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// cc FileSystemCat Displays files from a Hadoop filesystem on standard output by using the FileSystem directly
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
// vv FileSystemCat
public class FileSystemCat {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
InputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
// ^^ FileSystemCat

The program runs as follows:

1
2
3
4
5
% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

FSDataInputStream

Example. Displaying files from a Hadoop filesystem on standard output twice, by using seek()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// cc FileSystemDoubleCat Displays files from a Hadoop filesystem on standard output twice, by using seek
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
// vv FileSystemDoubleCat
public class FileSystemDoubleCat {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FSDataInputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
in.seek(0); // go back to the start of the file
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}
}
// ^^ FileSystemDoubleCat

Here’s the result of running it on a small file:

1
2
3
4
5
6
7
8
9
% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

Writing Data

Example. Copying a local file to a Hadoop filesystem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// cc FileCopyWithProgress Copies a local file to a Hadoop filesystem, and shows progress
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
// vv FileCopyWithProgress
public class FileCopyWithProgress {
public static void main(String[] args) throws Exception {
String localSrc = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
}
}
// ^^ FileCopyWithProgress

Typical usage:

1
2
3
% hadoop FileCopyWithProgress input/docs/1400-8.txt
hdfs://localhost/user/tom/1400-8.txt
.................

Querying the Filesystem

The FileStatus class encapsulates filesystem metadata for files and directories, including file length, block size, replication, modification time, ownership, and permission information.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class ShowFileStatusTest {
private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing
private FileSystem fs;
@Before
public void setUp() throws IOException {
Configuration conf = new Configuration();
if (System.getProperty("test.build.data") == null) {
System.setProperty("test.build.data", "/tmp");
}
cluster = new MiniDFSCluster.Builder(conf).build();
fs = cluster.getFileSystem();
OutputStream out = fs.create(new Path("/dir/file"));
out.write("content".getBytes("UTF-8"));
out.close();
}
@After
public void tearDown() throws IOException {
if (fs != null) { fs.close(); }
if (cluster != null) { cluster.shutdown(); }
}
@Test(expected = FileNotFoundException.class)
public void throwsFileNotFoundForNonExistentFile() throws IOException {
fs.getFileStatus(new Path("no-such-file"));
}
@Test
public void fileStatusForFile() throws IOException {
Path file = new Path("/dir/file");
FileStatus stat = fs.getFileStatus(file);
assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
assertThat(stat.isDirectory(), is(false));
assertThat(stat.getLen(), is(7L));
assertThat(stat.getModificationTime(),
is(lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(stat.getReplication(), is((short) 1));
assertThat(stat.getBlockSize(), is(128 * 1024 * 1024L));
assertThat(stat.getOwner(), is(System.getProperty("user.name")));
assertThat(stat.getGroup(), is("supergroup"));
assertThat(stat.getPermission().toString(), is("rw-r--r--"));
}
@Test
public void fileStatusForDirectory() throws IOException {
Path dir = new Path("/dir");
FileStatus stat = fs.getFileStatus(dir);
assertThat(stat.getPath().toUri().getPath(), is("/dir"));
assertThat(stat.isDirectory(), is(true));
assertThat(stat.getLen(), is(0L));
assertThat(stat.getModificationTime(),
is(lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(stat.getReplication(), is((short) 0));
assertThat(stat.getBlockSize(), is(0L));
assertThat(stat.getOwner(), is(System.getProperty("user.name")));
assertThat(stat.getGroup(), is("supergroup"));
assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
}
}

Listing files

Example. Showing the file statuses for a collection of paths in a Hadoop filesystem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// cc ListStatus Shows the file statuses for a collection of paths in a Hadoop filesystem
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
// vv ListStatus
public class ListStatus {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path[] paths = new Path[args.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
}
FileStatus[] status = fs.listStatus(paths);
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
}
// ^^ ListStatus

We can use this program to find the union of directory listings for a collection of paths:

1
2
3
4
% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom
hdfs://localhost/user
hdfs://localhost/user/tom/books
hdfs://localhost/user/tom/quangle.txt

DataFlow

Anatomy of a File Read

read

Network Topology and Hadoop

distance

Mathematically inclined readers will notice that this is an example of a distance metric.

Anatomy of a File Write

write

A typical replica pipeline:

replica

Coherency Model

After creating a file, it is visible in the filesystem namespace, as expected:

1
2
3
Path p = new Path("p");
fs.create(p);
assertThat(fs.exists(p), is(true));

However, any content written to the file is not guaranteed to be visible, even if the stream is flushed. So, the file appears to have a length of zero:

1
2
3
4
5
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));

HDFS provides a way to force all buffers to be flushed to the datanodes via the hflush() method on FSDataOutputStream. After a successful return from hflush(), HDFS guarantees that the data written up to that point in the file has reached all the datanodes in the write pipeline and is visible to all new readers:

1
2
3
4
5
Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.hflush();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

Note that hflush() does not guarantee that the datanodes have written the data to disk, only that it’s in the datanodes’ memory (so in the event of a data center power outage, for example, data could be lost). For this stronger guarantee, use hsync() instead.

1
2
3
4
5
FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // flush to operating system
out.getFD().sync(); // sync to disk
assertThat(localFile.length(), is(((long) "content".length())));

Closing a file in HDFS performs an implicit hflush(), too:

1
2
3
4
5
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

Parallel Copying with distcp

One use for distcp is as an efficient replacement for hadoop fs -cp. For example, you can copy one file to another with:

% hadoop distcp file1 file2

You can also copy directories:

% hadoop distcp dir1 dir2