Hadoop, MapReduce and Spark – What’s it all about?

Hadoop is a collection of open-source technologies that facilitate highly distributed processing and storage of large data sets.

At its heart, the Hadoop project itself is comprised of only a handful of components. You can think of the following components as the way in which Hadoop accomplishes the “Hadoop magic” of highly distributed processing:

  • Hadoop Common
  • HDFS (Hadoop Distributed File System)

And the following as the ways in which you can build data processing programs for Hadoop using MapReduce or other programming models:

  • Hadoop YARN
  • Hadoop MapReduce

The Hadoop ecosystem also consists of many other projects which are highly compatible and attuned to work well with Hadoop and its components. Apache Spark is one such project, which is a competitor to MapReduce but can itself use HDFS, so is part of the Hadoop ecosystem.

http://blog.newtechways.com/2017/10/apache-hadoop-ecosystem.html

Why should I care? What can I do with all of this?

So Hadoop and associated projects offer powerful “distributed processing and storage of datasets”. That’s great, but what can you actually do with that capability?

The answer is quite a lot. Here are some examples:

  • Store huge amounts of data

Kind of obvious, but pretty much everyone has data storage needs that can grow beyond what was first anticipated. HDFS is a distributed file system so scaling it just requires adding additional clusters.

  • Transformation of unstructured data to a structured form

With the programming models available to Hadoop, transforming datasets is simply a case of writing an appropriate program. With Hadoop’s underlying HDFS system, this processing happens with incredible performance and at scale – allowing for rapid, on-the-fly aggregations and transformations.

  • Build “data lakes”

A data lake is a large repository containing vast amounts of raw data, persisted in its native format until it is requested. Hadoop is a perfect candidate for this sort of project – it can store huge amounts of data and transform data rapidly on-the-fly.

What the Heck is MapReduce?

logo-mapreduce

https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html

Historically, Hadoop’s most commonly used programming model is MapReduce; MapReduce originally started life as Google proprietary technology but the concepts and technology were given new life in several different forms including in the Apache Hadoop open-source project.

MapReduce is a programming model that sets out to break down the complex task of developing easy to comprehend, big data processing workloads by dividing them into two procedures: map and reduce.

  • Map procedures typically take the form of data filtering and sorting operations that are required before data reduction/summary operations.
  • Reduce procedures make up the final “summary operation” of the data processing workload and perform operations such as summation, averaging, counting and other similar combinatorial operations.

A data processing workload is often not just a single set of procedures on a single dataset. Hadoop’s MapReduce programming model has the concept of a Job and Jobs can be chained or cascaded together to produce output which is from a sequence of several different MapReduce operations or even from a hybrid of several datasets.

What the Heck is Spark?

https://spark.apache.org/

Spark is a programming model that is considerably easier to use than MapReduce. Amongst the benefits that Spark has over MapReduce include fast data processing (capable of almost real-time) and in-built machine learning with MLlib.

Spark programs can be written in many languages, with APIs available in Java, Scala, Python, and Spark SQL (very similar to conventional SQL). While writing MapReduce programs in other technologies is possible, it is certainly not as well supported as it is in Spark.

Spark programs do not use the same implementation of MapReduce as a design pattern but Spark does share similar concepts. Spark programs often heavily leverage the concept of DataFrames – a distributed collection of data organized into named columns.

Picking a Dataset

Let’s get started and look at a large dataset, writing a MapReduce program and see what is possible with its powers.

Kaggle is a great place to find freely available datasets for just experimenting with data processing workloads, generally playing with data science and even for supporting academic research by generating new insights through the synthesis of different datasets. The datasets are aggregated or manually uploaded by volunteers, governmental and non-governmental agencies.

You will also find great examples of “kernels” / “notebooks” which are REPL (read-eval-print loop) Python shell scripts showing step by step how a dataset can get processed for a specific purpose.

Purely at random, I’ve chosen a dataset of Trending Youtube Video Statistics.

We’ll use Hadoop to generate some high-level descriptive statistics and see how we get on. The data itself is CSV format and has the following columns:

video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description

Building our Hadoop Environment (with Docker-Compose)

Setting up a functional Hadoop environment is very time-consuming and tricky, but we’re definitely going to need one that contains all of the services required to run a Hadoop cluster.

One option we have is to run a Hadoop cluster in the cloud via AWS EMR or Google Cloud Dataproc. Those options will allow us to spin up a Hadoop cluster with relative ease, however, they can still be pricey especially for the Hadoop-hobbyist. As well as that, since we want to get to know more about Hadoop, it’d be nice to run Hadoop services locally rather than having this abstracted away.

Fortunately (borrowing heavily from a guide found here), we can construct a local Hadoop cluster (with spark) using Docker-Compose.

docker compose logo

https://docs.docker.com/compose/

Below, you’ll find a large docker-compose.yml containing definitions for the many services we’ll need for Hadoop, HDFS and to run our MapReduce job:

version: '2'
services:
  namenode:
    image: uhopper/hadoop-namenode:2.8.1
    hostname: namenode
    container_name: namenode
    networks:
      - hadoop
    volumes:
      - /namenode:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=datanode1
      - HDFS_CONF_dfs_permissions=false
    ports:
      - 8020:8020
      - 50070:50070
      - 50470:50470

  datanode1:
    image: uhopper/hadoop-datanode:2.8.1
    hostname: datanode1
    container_name: datanode1
    networks:
      - hadoop
    volumes:
      - /datanode1:/hadoop/dfs/data
    environment:
      - CORE_CONF_fs_defaultFS=hdfs://namenode:8020
      - HDFS_CONF_dfs_datanode_address=0.0.0.0:50010
      - HDFS_CONF_dfs_datanode_ipc_address=0.0.0.0:50020
      - HDFS_CONF_dfs_datanode_http_address=0.0.0.0:50075
    ports:
      - 50010:50010
      - 50020:50020
      - 50075:50075

  resourcemanager:
    image: uhopper/hadoop-resourcemanager:2.8.1
    hostname: resourcemanager
    container_name: resourcemanager
    networks:
      - hadoop
    environment:
      - CORE_CONF_fs_defaultFS=hdfs://namenode:8020
      - YARN_CONF_yarn_log___aggregation___enable=true
    ports:
      - 8030:8030
      - 8031:8031
      - 8032:8032
      - 8033:8033
      - 8088:8088

  nodemanager:
    image: uhopper/hadoop-nodemanager:2.8.1
    hostname: nodemanager
    container_name: nodemanager
    networks:
      - hadoop
    volumes:
      - ./target/:/opt/mapreduce/
      - ./bin/:/opt/scripts/
      - ./input/:/local-input/
      - ./output/:/local-output/
    environment:
      - CORE_CONF_fs_defaultFS=hdfs://namenode:8020
      - YARN_CONF_yarn_resourcemanager_hostname=resourcemanager
      - YARN_CONF_yarn_log___aggregation___enable=true
      - YARN_CONF_yarn_nodemanager_remote___app___log___dir=/app-logs
    ports:
      - 8040:8040
      - 8041:8041
      - 8042:8042

  spark:
    image: uhopper/hadoop-spark:2.1.2_2.8.1
    hostname: spark
    container_name: spark
    networks:
      - hadoop
    environment:
      - CORE_CONF_fs_defaultFS=hdfs://namenode:8020
      - YARN_CONF_yarn_resourcemanager_hostname=resourcemanager
    command: tail -f /var/log/dmesg


networks:
  hadoop:

With the above, in particular, the volumes defined for the nodemanager container, we can mount directories containing our built MapReduce job JAR file so that it is in the correct container ready for Hadoop to use.

Getting our Hadoop environment up and running is a simple case of running: docker-compose up

Writing our MapReduce Job

For this example, I’ve decided to create a very simple MapReduce job to output the sum of views, likes, dislikes and comment count by channel title for the data we have. MapReduce jobs typically consist of a Mapper and Reducer class which extend either:

Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

Or:

Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

The classes themselves then get plugged into another class with the main method and with a Job configuration:

public static void main(String[] args) throws Exception
    {
        Job job = Job.getInstance();

        job.setJarByClass(MapReduceDataset.class);

        job.setJobName("Trending Statistics");

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));


        job.setMapperClass(MapStatistics.class);
        job.setReducerClass(ReduceStatistics.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        boolean success = job.waitForCompletion(true);
        System.exit(success ? 0 : 1);
    }

With the Hadoop boilerplate out of the way, let’s get on with writing some business logic for our Mapper and Reducer. For MapReduce there’s a handy utility, “MRUnit” which allows you to write unit tests for your Mapper:

    @Before
    public void before()
    {
        MapStatistics mapper = new MapStatistics();
        mapDriver = new MapDriver<>();
        mapDriver.setMapper(mapper);
    }

    @Test
    public void map_withStandardExampleLine_writesAppropriateMappedEntryToContext()
    {
        mapDriver.withInput(new LongWritable(12345L), new Text("3s1rvMFUweQ,17.14.11,\"Taylor Swift: …Ready for It? (Live) - SNL\",\"Saturday Night Live\",24,2017-11-12T06:24:44.000Z,\"SNL\"|\"Saturday Night Live\"|\"SNL Season 43\"|\"Episode 1730\"|\"Tiffany Haddish\"|\"Taylor Swift\"|\"Taylor Swift Ready for It\"|\"s43\"|\"s43e5\"|\"episode 5\"|\"live\"|\"new york\"|\"comedy\"|\"sketch\"|\"funny\"|\"hilarious\"|\"late night\"|\"host\"|\"music\"|\"guest\"|\"laugh\"|\"impersonation\"|\"actor\"|\"improv\"|\"musician\"|\"comedian\"|\"actress\"|\"If Loving You Is Wrong\"|\"Oprah Winfrey\"|\"OWN\"|\"Girls Trip\"|\"The Carmichael Show\"|\"Keanu\"|\"Reputation\"|\"Look What You Made Me Do\"|\"ready for it?\",1053632,25561,2294,2757,https://i.ytimg.com/vi/3s1rvMFUweQ/default.jpg,False,False,False,\"Musical guest Taylor Swift performs …Ready for It? on Saturday Night Live.\\n\\n#SNL #SNL43\\n\\nGet more SNL: http://www.nbc.com/saturday-night-live\\nFull Episodes: http://www.nbc.com/saturday-night-liv...\\n\\nLike SNL: https://www.facebook.com/snl\\nFollow SNL: https://twitter.com/nbcsnl\\nSNL Tumblr: http://nbcsnl.tumblr.com/\\nSNL Instagram: http://instagram.com/nbcsnl \\nSNL Pinterest: http://www.pinterest.com/nbcsnl/\""));
        mapDriver.withOutput(new Text("\"Saturday Night Live\""), new Text("1053632-25561-2294-2757"));
        mapDriver.runTest();
    }

And for your Reducer:

    @Before
    public void before()
    {
        ReduceStatistics reducer = new ReduceStatistics();
        reduceDriver = new ReduceDriver<>();
        reduceDriver.setReducer(reducer);
    }

    @Test
    public void reduce_withStandardInput_reducesToStatistics()
    {
        reduceDriver.withInput(new Text("\"Saturday Night Live\""), Arrays.asList(
                new Text("1-2-3-4"),
                new Text("1-2-3-4")));
        reduceDriver.withOutput(new Text("\"Saturday Night Live\",2,4,6,8"), null);
        reduceDriver.runTest();
    }

After writing some failing unit tests for our MapReduce job I was then able to proceed with writing the mapper and reducer code.

Note: I’m not going to explain everything in this article – the full example project can be found in the repo below:

The code isn’t the cleanest or the greatest, but that’s okay. The point of TDD is to write the tests you need for the functionality you need and refactor against them – we can always come back to it, refactor and improve it but this not-very-elegant code is at least sufficient and readable.

Now the Mapper and Reducer code are written, let’s run our example and look at the output we get.

In the project’s Makefile, I’ve defined a simple rule to run all steps required to build the MapReduce job, build a Hadoop cluster and execute the job:

CONTAINER_NAME=nodemanager

build-job:
	mvn clean package

hadoop-cluster:
	docker-compose down
	docker-compose up -d

run-map-reduce: build-job hadoop-cluster
	docker exec -it ${CONTAINER_NAME} /bin/bash "/opt/scripts/exec-hadoop-job.sh"

Executing our MapReduce Job

The bash script below (exec-hadoop-job.sh) gets mounted in the nodemanager container and prepares HDFS and Hadoop to execute our MapReduce job.

function clear_exiting_local_outputs() {
    rm -rf /local-output/*
}

function clear_hdfs_directories() {
  hadoop fs -rm -r /input
  hadoop fs -mkdir -p /input
  hadoop fs -rm -r /output
}

function run_hadoop_mapreduce_job() {
    JAR_NAME=$1
    CLASS_NAME=$2
    hadoop jar ${JAR_NAME} ${CLASS_NAME}  /input /output
}

function copy_input_from_local() {
    hadoop fs -copyFromLocal /local-input/* /input/
}

function copy_output_to_local() {
    hadoop fs -copyToLocal /output/* /local-output
}

cd $HADOOP_PREFIX
echo "Waiting for HDFS startup"
hdfs dfsadmin -safemode leave
clear_hdfs_directories
clear_exiting_local_outputs
copy_input_from_local

run_hadoop_mapreduce_job /opt/mapreduce/MapReduceDataset.jar dev.andymacdonald.mapreduce.MapReduceDataset

copy_output_to_local
Let’s Run It!:

Great! Looks like it did something.

Let’s check the output now the job is complete: cat output/part-r-00000:

It worked!

Of course, it wasn’t all as easy and straightforward as that and there was a lot of trial and error along the way.

So far we have a means of easily spinning up a Hadoop cluster, we’ve written a simple MapReduce job and can see that our MapReduce job successfully sums up the views, likes, dislikes and comment count by channel title for our dataset.

That’s All For Now!

Thanks for reading! 😊 I hope you enjoyed the article and this quick introduction into Hadoop,  MapReduce and Spark. Feel free to pull down the project I’ve put together and experiment with your own MapReduce jobs and datasets:

Next Time: Writing a Spark Job for our Hadoop Cluster

So we have a MapReduce example, so what about Spark?

SHARE ON
Hadoop, MapReduce and Spark – What’s it all about?

You May Also Like

Leave a Reply

Your email address will not be published.