A quick tutorial for middleware products

Sunday, June 14, 2015

On June 14, 2015 by Kamlesh   2 comments

Hadoop Distributed Filesystem (HDFS)


Built to support high throughput, streaming reads and writes of extremely large files.
NAS and SAN's offer centralized, low-latency access to either a block device or a filesystem on the order
of terabytes in size. They do not scale to meet the need of thousands of machines pulling hundreds of Gigs
of content all at one time.

Goals for HDFS

* Store millions of large files, each greater than tens of gigabytes, and filesystem sizes
  reaching tens of petabytes.
* Use a scale-out model based on inexpensive commodity servers with internal JBOD  ("Just a bunch of disks")
  rather than RAID to achieve large-scale storage. Accomplish availability and high throughput through
  application-level replication of data.
* Optimize for large, streaming reads and writes rather than low-latency access to many small files.
  Batch performance is more important than interactive response times.
* Gracefully deal with component failures of machines and disks.
* Support the functionality and scale requirements of MapReduce processing.

HDFS Design

- HDFS is a userspace filesystem. It runs as a process of the OS. It uses the local machines filesystem to
  store it's own files. HDFS is not a POSIX-compliant filesystem.
- HDFS is a distributed filesystem. Each machine in a cluster stores a subset of the data (blocks) that
  makes up the complete filesystem. Filesystem metadata is stored on a centralized server (NameNode),
  acting as a directory of block data and providing a global picture of the filesystem's state. The admin
  controls the number of blocks that are replicated across the cluster. The default replication for each
  block is 3 times. Should the number of copies of a block drop below the configured replication factor,
  the filesystem automatically makes a new copy from one of the remaining replicas
    Having this replication allows:
      - Multiple machine failures can be more easily tolerated.
      - Read data from a machine closest to an application on the network
      - During processing any copy of the data can be used, giving the scheduler a better chance of finding
        available resources
      - No need for special expensive specialized storage system for data protection of block data. Ex. RAID
- HDFS has a block size higher than most other filesystems. The default is 64M and some go as high as 1G.
  Increasing the block size means data will be written in larger contiguous chunks on disk, which in turn
  means data can be written and read in larger sequential operations. This minimizes drive seek operations
  and results in better performance when doing large streaming I/O operations.
- A block consists of the raw bytes of a portion of the file being stored. All block files start with the prefix
  blk_. The metadata file (with a .meta suffix) is made up of a header with version and type information,
  followed by a series of checksums for sections of the block.
- Files in HDFS are write once, once a replica is written, it is not possible for it to change.
  Benefits of this include:
    - Removing the need for complex reasoning about the consistency between replicas.
    - Applications being able to read any of the available replicas when accessing a file
- A file in HDFS that is smaller than a single block does not occupy a full block's worth of underlying storage.
  So if a file is 10MB but the block size is 64MB the block only occupies 10MB until more data is added.
- Multiple files in HDFS can use different block sizes. HDFS provides api to specify block size when you
  create a file. FileSystem.create(Path, overwrite, bufferSize, replication, blockSize, progress)
- Every datanode runs a block scanner periodically verified every three weeks, this verifies all the blocks stored on
  the datanode. This allows bad blocks to be detected and fixed before they are read by clients. The DataBlockScanner
  maintains a list of blocks to verify and scans them one by one for checksum errors.
- HDFS permits a client to read a file that is open for writing. When reading a file open for writing, the length of
  the last block still being written is unknown to the NameNode.  The last incomplete block is not visible and neither are
  any subsequent blocks which are going to be completed. Only the blocks which were completed before opening the stream
  for reading are available. In other words if the block being written is not finished yet the reader can't see it.
- HAR file (Hadoop Archive)  groups together small files into a single Hadoop archive file.
  - Useful when there are already lots of small files in HDFS, which need to be grouped together before some expensive jobs.
  - Implemented as a MapReduce job.
  - Use a har:// URL to access each file from the archive and view the archive as a folder.
  - Use a normal hdfs:// URL to access the actual content of the archive in HDFS. HARs are stored in HDFS as folders which
    contain a file with the concatenation of all its containing input files.

Daemons that make up HDFS

There are three daemons that make up a standard HDFS cluster.
NameNode           - 1 per cluster. Filesystem metadata is stored on a centralized server, acting as a
                     directory of block data and providing a global picture of the filesystem's state.
Secondary NameNode - 1 per cluster.  Performs internal NameNode transaction log checkpointing.
Datanode           - Many per cluster.  Stores block data (contents of files).

NameNode

- Clients connect to the NameNode to perform filesystem operations
- Datanodes regularly report their status to the NameNode in a heartbeat. They carry information about total storage capacity,
  fraction of storage in use, and the number of data transfers currently in progress. These statistics are used for the
  NameNode's block allocation and load balancing decisions.
- The NameNode does not directly send requests to DataNodes. It uses replies to heartbeats to send instructions to the DataNodes.
  The instructions include commands to replicate blocks to other nodes, remove local block replicas, re-register and send an
  immediate block report, and shut down the node.
- At any given time, the NameNode has a complete view of all datanodes in the cluster, their current health,
  and what blocks they have available.
- Datanode initially starts up, as well as every hour thereafter, a block report to the NameNode. A block report
  is simply a list of all blocks the datanode currently has on its disks. The NameNode keeps track of all the
  changes.
- File to block mapping on the NameNode is stored on disk. The host specific location of the blocks are not
  recorded. DataNodes send out their block lists on startup and periodically after that. The NameNode uses
  these reports to see where all of it's blocks are. This allows you to move disks from one datanode to another
  and not worry about a changing hostname or IP.


- NameNode stores its filesystem metadata on local filesystem disks in a few different files, but the two most
  important of which are fsimage and edits.
  - Fsimage contains a complete snapshot of the filesystem metadata including a serialized form of all the directory
    and file inodes in the filesystem. Each inode is an internal representation of a file or directory's metadata
    and contains such information as the file's replication level, modification and access times, access permissions,
    block size, and the blocks a file is made up of. For directories, the modification time, permissions, and quota
    metadata is stored.
  - Edits file (journal) contains only incremental modifications made to the metadata. It uses a write ahead log which reduces I/O
    operations to sequential, append-only operations (in the context of the NameNode, since it serves directly from
    RAM), which avoids costly seek operations and yields better overall performance.
  - Upon NameNode startup, the fsimage file is loaded into RAM and any changes in the edits file are replayed,
    bringing the in-memory view of the filesystem up to date.
  - Each client-initiated transaction (copy,move,etc) is recorded in the journal, and the journal file is flushed and synced before
    the acknowledgment is sent to the client. The NameNode also updates its in-memory (RAM) representation of the filesystem metadata,
    which it updates after the edit log has been modified.

- Starting a NameNode will bring it into service after it loads the fsimage, replays the transaction log, sees
  some percentage of blocks (minimally replicated) from the datanodes, and is stable for some additional amount of time.
- This threshold is almost always 100% but can be changed with dfs.safemode.extension parameter.
- During safe mode replication of blocks is prohibited, but it does offer a read-only view of the filesystem to clients.
  The NameNode awaits when all or majority of DataNodes report their blocks. Depending on how safe mode parameters
  are configured the name-node will stay in safe mode until a specific percentage of blocks of the system is minimally replicated.
  Safe mode is exited when the minimal replication condition is reached, plus an extension time of 30 seconds. The minimal
  replication condition is when 99.9% of the blocks in the whole filesystem meet their minimum replication level (which defaults
  to one and is set by dfs.replication.min.   hadoop dfsadmin -safemode get

- NameNode filesystem metadata is served entirely from RAM. This makes it fast, but limits the amount of
  metadata a box can handle. Roughly 1 million blocks occupies roughly 1 GB of heap.
- Hadoop's default strategy is for the NameNode to place the first replica on the same node as the client (for
  clients running outside the cluster, a node is chosen at random, although the system
  tries not to pick nodes that are too full or too busy). The second replica is placed on a
  different rack from the first (off-rack), chosen at random. The third replica is placed on
  the same rack as the second, but on a different node chosen at random. Further replicas
  are placed on random nodes on the cluster, although the system tries to avoid placing
  too many replicas on the same rack.

Secondary NameNode

- IT IS NOT A BACKUP FOR THE NameNode! The name is horrible, but it is what it is.
- The NameNodes edits file needs to be periodically applied to the fsimage file. NameNode may not have the
  available resources (CPU or RAM) to do this while continuing to provide service to the cluster so the
  secondary NameNode applies the updates from the edits file to the fsimage file and sends it back to the
  primary. This is known as the checkpointing process.
- The checkpoint file is never changed by the NameNode only the Secondary NameNode.
- This application of the updates (checkpointing) to the fsimage file occurs every 60 mins by default or whenever the NameNodes
  edits file reaches 64Meg. Which ever happens first. Newer versions of Hadoop use a defined number of transactions
  rather than file size to determine when to perform a checkpoint.
- If the secondary NameNode is not running at all, the edit log will grow significantly and it will slow the system down.
  Also, the system will go into safemode for an extended time since the NameNode needs to combine the edit log and the
  current filesystem checkpoint image.

Datanode

- Daemon responsible for storing and retrieving block (chunks of a file) data is called the Datanode (DN).
- Datanode has direct local access to one or more disks, (commonly called data disks) in a server on which
  it's permitted to store block data
- Point Datanode to new disks in existing servers or adding new servers with more disks increases the amount
  of storage in the cluster.
- Block data is streamed to and from datanodes directly, so bandwidth is not limited by a single node
- Datanodes regularly report their status to the NameNode in a heartbeat
- Datanode initially starts up, as well as every hour thereafter, a block report to the NameNode. A block report
  is simply a list of all blocks the datanode currently has on its disks. The NameNode keeps track of all the
  changes.

Process of reading a file

Reading file in HDFS called /foo/bar.txt.
1. The client uses a Hadoop client program to make the request.
2. Client program reads the cluster config file on the local machine which tells it where the namemode
   is located. This has to be configured ahead of time.
3. The client contacts the NameNode and requests the file it would like to read.
4. Client validation is checked by username or by strong authentication mechanism like Kerberos.
5. Once client is validated request is  checked against the owner and permissions of the file.
6. If the file exists and the user has access to it then the NameNode responds with the first block id and
   provides a list of datanodes a copy of the block can be found, sorted by their distance to the client (reader).
7. The client now contacts the most appropriate datanode directly and read the block data it needs. This process
   repeats until all blocks in the file have been read or the client closes the file stream.

- If while reading the file the datanode dies, library will automatically attempt to read another replica of the
  data from another datanode. If all replicas are unavailable, the read operation fails and the client receives an exception.
- If information returned by the NameNode about block locations are outdated by the time the client attempts to contact a
  datanode, a retry will occur if there are other replicas or the read will fail.

Process of writing a file

Writing a new file to HDFS called /foo/babar.txt.
1. The client uses a Hadoop client program to make the request.
2. Client program reads the cluster config file on the local machine which tells it where the namemode
   is located. This has to be configured ahead of time.
3. A request is sent to the NameNode to create the file metadata
4. Client validation is checked by username or by an authentication mechanism like Kerberos.
5. If the user has the necessary permissions to do so, the metadata entry for the new file is made. However, it
   initially has no associated blocks.
6. NameNode responds to the client and indicates the open request was successful and that it may now begin writing data.
7. The client starts breaking up the file into pieces (packets, not TCP ones), queues them in memory and starts a data stream
   from this queue.
8. The client contacts the NameNode requesting a set of datanodes to which replicas of the next block should be written.
9. The namemode responds and the clients data packets are then streamed to the first datanode, which writes the data to disk,
   and to the next datanode, which writes to its disk, and so on. This is called a replication pipeline. Each datanode in the
   replication pipeline acknowledges each packet as it's successfully written.
10. The client application maintains a list of packets for which acknowledgments have not yet been received and when it receives
    a response, it knows the data has been written to all nodes in the pipeline. This process of writing packets to the pipeline
    continues until the block size is reached, at which point the client goes back to the NameNode for the next set of datanodes
    to write to.
11. Eventually, the client indicates it's finished sending data by closing the stream, which flushes any remaining packets out to
    disk and updates the NameNode to indicate the file is now complete.

- If a datanode in the pipeline fails to write the pipeline is immediately closed and all packets that had been sent since
  the last acknowledgment are pushed back into the queue to be written so that any datanodes past the failed node in the
  pipeline will receive the data. The current block is given a new ID on the remaining healthy datanodes. This is done so that,
  should the failed datanode return, the abandoned block will appear to not belong to any file and be discarded automatically.
  A new replication pipeline containing the remaining datanodes is opened and the write resumes.
- When a new block is created, HDFS places the first replica on the node where the writer is located. The second and the third
  replicas are placed on two different nodes in a different rack. The rest are placed on random nodes with restrictions that no
  more than one replica is placed at any one node and no more than two replicas are placed in the same rack, if possible.

NameNode High Availability

- NameNode high availability (or HA) is deployed as an active/passive(standby) pair of NameNodes. The edits write ahead log
  needs to be available to both NameNodes, and therefore is stored on a shared storage device. Currently, an NFS filer is required
  as the shared storage, although there are plans to remove this dependency. As the active NameNode writes to the edits log, the
  Standby NameNode is constantly replaying transactions to ensure it is up to date and ready to take over in the case of failure.
- Datanodes are also aware of both NameNodes in an HA configuration and send block reports to both servers.
- High-availability pair of NameNodes can be configured for manual or automatic failover. Default is manual failover.
- In a manual failover a command must be sent to effect a state transition from one NameNode to the other.
- In automatic failover, each NameNode runs an additional process called a "failover controller" that monitors the health of the
  process and coordinates state transitions.
  - Graceful failover is initiated by the admin. 
  - A Nongraceful failover is a detected fault in the active failover controller process.
- The system can use a series of increasingly drastic fencing techniques to ensure the failed node (which could still think it's active)
  is actually stopped. Tell it to stop via RPC, or Send a IPMI reboot message to the failed host.
- It's impossible to know if a NameNode has relinquished active status or if it's simply inaccessible from the standby.
- When running in HA mode the standby NameNode takes over the role of the secondary NameNode. There is no separate secondary
  NameNode process in an HA cluster, only a pair of NameNode processes. Most repurpose their secondary NameNode machine to be
  a second NameNode.
- Manual failover from primary NN to Secondary NN looks like this: haadmin -failover hadoop1 hadoop2
  This shuts down (fences) hadoop1 and brings up hadoop2 as the active NN.

NameNode Federation

- Helps overcome the limit of how much metadata the NameNode can store in memory by splitting it up across multiple NameNodes. This
  gives us one logical namespace from a bunch of different NameNodes. Similar to the Linux filesystem where many different devices
  can be mounted to different points, but still form under one named root /.
- Each datanode has a block pool for each namespace. While blocks from different pools are stored on the same disks (there is no
  physical separation), they are logically exclusive. Each datanode sends heartbeats and block reports to each NameNode.
- NameNodes do not communicate with one another and failure of one does not affect the other
- Clients view the namespace via an API implementation called ViewFS. This maps slices of the filesystem to the proper NameNode.
  It is configured on the client side via the local core-site.xml file.
- Federation does not support overlapping mount points as of right now.

Clients

- Client can read and write data to HDFS using different tools and API's.
- Clients can be on the same physical machines as any of the Hadoop daemons, or they can be on a host separate from the cluster.
- Clients that regularly use one datanode can cause the node to become unbalanced because of the block placement policy.
  The NameNode will assign the local machine as the destination for the first replica when an HDFS client is running on
  a datanode. This causes more blocks to kept on the local datanode that others. To help fix this run the balancer.
  hadoop balancer -threshold N (where N is the percentage of blocks within which datanodes should be with one another)
  This can be killed at any time without any repercussions. Apache Hadoop users can use the start-balancer.sh script.

Commands

- Hadoop comes with a number of command-line tools that enable basic filesystem operations.
- HDFS commands are subcommands of the hadoop command-line utility
$ Display basic usage information
hadoop fs 
$ List files in a dir. Uses fs.default.name value in core-site.xml file if full url syntax is not used.
hadoop fs -ls /user/dude
or
hadoop fs -ls hdfs://NameNode.blah.com:8020/home/dude
$ Upload file with -put or -copyFromLocal which copies file form local filesystem
hadoop fs -put /etc/resolv.conf /user/dude/
$ Download file from HDFS using -get or -copyToLocal.
hadoop fs -get /user/dude/resolv.conf ./
$ Set a replication factor for a file or dir of files with the -R
hadoop fs -setrep 5 -R /user/dude/rep5/
$ Run a fsck on the files we set the rep factor on and see if it looks correct
hadoop fsck /user/dude/rep5 -files -blocks -locations


MapReduce 

- Their are 2 versions on MapReduce in the ecosystem right now. V1 and V2.
- V1 is the orginal MapReduce that uses tasktracker and jobtracker daemons.
- V2 is called YARN. YARN to splits up the two major functionalities of the JobTracker, resource management
  and job scheduling/monitoring, into separate daemons. Resource manager, application master, and node manager.

MapReduce v1

- Developers write jobs (code) that contains of a map function and a reduce function, along with job
  configuration information that controls various aspects of its execution
- The jobs are broken up into tasks, the tasks are scheduled to run on machines, each task's health is
  monitored, in case of any failures the necessary tasks are retried.
- TaskTracker is a process that runs on slave (data) nodes and is responsible for instantiating and
  monitoring individual Map and Reduce tasks. It starts a separate JVM processes to do the actual work
  (called as Task Instances).
- MapReduce job is made up of four distinct stages, executed in order: client job submission, map task
  execution, shuffle and sort, and reduce task execution
- MapReduce framework provides a set of APIs for submitting jobs and interacting with the cluster.
- A job is made up of code written by a developer against the MapReduce APIs and the configuration which
  specifies things such as the input and output datasets
- The jobtracker process (running on another host in the cluster) is responsible for accepting the job
  submissions, scheduling tasks to run on worker nodes, and providing administrative functions such as
  worker health and task progress monitoring to the cluster.
- Speculative execution - The speculative execution helps to offset the slow workers. The jobtracker
  will create multiple instances of the same task and will take the first result into
  consideration and the second instance of the task will be killed.
- Job submission occurs over the network from any machine. It does not have to be one from
  the cluster.
- There is one jobtracker per MapReduce cluster. If it dies all running jobs fails. So put it on a
  reliable server.
- Tasktrackers inform the jobtracker as to their current health and status by way of regular heartbeats.
  Each heartbeat contains the total number of map and reduce task slots available, the number occupied,
  and detailed information about any currently executing tasks. After a configured period of no
  heartbeats the tasktracker is considered dead.
- When a job is submitted to jobtracker, information about each task that makes up the job is stored in memory.
  After the  job completes, this information is retained for a configurable window of time or until a specified number
  of jobs have been executed. On an active cluster where many jobs, each with many tasks, are running, this
  information can consume a considerable amount of RAM. Because of this monitoring jobtracker memory
  utilization is critical
- The act of deciding which tasks of a job should be executed on which worker nodes is referred to as task
  scheduling. The scheduler decides when tasks get executed and in what order.
- Their is 1 tasktracker process on all worker nodes, it accepts task assignments from the jobtracker,
  instantiates the user code, executes those tasks locally, and reports progress back to the jobtracker periodically
- Both tasktrackers and datanodes processes run on the same machines, which makes each node both a compute
  node and a storage node.
- Each tasktracker is configured with a specific number of map and reduce task slots that indicate how many of each
  type of task it is capable of executing in parallel.
- Tasktrackers allow more map tasks than reduce tasks to execute in parallel because they consume resources
  differently.
- Upon receiving a task assignment from the jobtracker, the tasktracker executes an attempt of the task in a
  separate process. A task is the logical unit of work, while a task attempt is a specific, physical instance
  of that task being executed. Attempts may fail, but each task in a job will have at least 1 attempt.
  Communication between the task attempt (usually called the child, or child process) and the tasktracker is
  maintained via an RPC connection over the loopback interface called the umbilical protocol.
- Tasktracker uses a list of user-specified directories to hold the intermediate map output and reducer input
  during job execution. These user-specificed dir's are kept on the local machines filesystem.
- When a failure is detected by the tasktracker, it is reported to the jobtracker in the next heartbeat. The
  job is rescheduled. If enough tasks from the same job fail on the same tasktracker then the node is added
  to a job-level blacklist. If multiple tasks from different jobs repeatedly fail on a specific tasktracker,
  the tasktracker in question is added to a global blacklist for 24 hours
- If there is a loss of the tasktracker daemon or the entire worker node. The jobtracker, after a configurable
  amount of time with no heartbeats, will consider the tasktracker dead along with any tasks it was assigned.
  Tasks are rescheduled on other nodes.
- If the jobtracker fails all jobs will fail eventually. This is a single point of failure in Hadoop.
- Benefit of MapReduce is it has data locality. This is the ability to execute computation on the same
  machine where the data being processed is stored. This helps remove the "Store Effect" of all machines
  smashing a SAN for large datasets, and bogging down the network.


MapReduce v2 (YARN)

- YARN to splits up the two major functionalities of the JobTracker, resource management and job scheduling/monitoring,
  into separate daemons. ResourceManager, ApplicationMaster, and NodeManager.
- Resource managment (what was the JobTracker in v1) is now done by a daemon called the ResourceManager(RM). It is
  responsible for creating and allocating resources to multiple applications. Each application is an individual
  MapReduce job. The resource manager daemon is still centralized (global) on one machine. assumes the responsibility
  to negotiate a specified container in which to start the ApplicationMaster and then launches the ApplicationMaster.
  On successful container allocations, the ApplicationMaster launches the container by providing the container launch
  specification to the NodeManager. These containers can be launched on any node with NodeManager in the cluster.
- NodeManager(NM) is another deamon that runs on each worker node in the cluster. It runs in place of the traditional
  tasktracker. NodeManager launches any type of process, dictated by the application, in an application container.
  It also manages the ApplicationMaster daemon.
- Jobs (applications) are now managed and executed by the per application ApplicationMaster(AM) daemon. This deamon
  can run on any node of the cluster.  It is tasked with negotiating resources from the ResourceManager and working with
  the NodeManager(s) to execute and monitor the tasks. Jobs are now isolated from each other and are decentrailized.
  Every application has its own instance of an ApplicationMaster. AM is essentially user code and it is not to be
  trusted. It is not run at a privileged level.
- An application can ask for specific resource requests via the ApplicationMaster to satisfy its resource needs.
  The Scheduler responds to a resource request by granting a container, which satisfies the requirements laid out
  by the ApplicationMaster in the initial ResourceRequest. The ApplicationMaster has to take the Container and present
  it to the NodeManager managing the host, on which the container was allocated, to use the resources for launching its tasks.
- MRV2 maintains API compatibility with previous stable release (hadoop-0.20.205). This means that all MapReduce jobs
  should still run unchanged on top of MRv2 with just a recompile.
- Don't run  MRv1 and YARN on the same set of nodes at the same time. It is not supported.



2 comments:

  1. Thanks Kamlesh.. Can you please post on how to install Hadoop on windows.?

    ReplyDelete
    Replies
    1. Sure Mukesh, will post Hadoop installation steps soon

      Delete