Archive for category: Big Data

Apache Spark, Amazon S3 and Apache Mesos

Apache Spark, Amazon S3 and Apache Mesos

 
13 Kudos
Don't
move!

The hardest problem with having nice technologies at your service is making them collaborate.

Amazon S3

Let’s begin with Amazon S3. As the name suggests, it is a Simple Storage Service which has only one purpose: to give you unlimited cloud-based storage. If you are accessing S3 from EC2 instances (or, in general, from Amazon services) then you don’t pay anything besides the cost for hosting the data; on the other hand, if you download the data outside the Amazon network, you will also pay for the outbound data transfer. This piece of tech is quite easy to understand: files on the cloud.

Apache Spark and Mesos

Apache Spark is an engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing. In practical terms, it is an implementation of the MapReduce concept extended to support interactive queries and stream processing. At the moment I am writing this article, the reference version is Apache Spark 1.5.1.

Apache Mesos, instead, is a cluster manager, especially useful for running heterogeneous tasks. There are many discussions comparing Mesos to Yarn so I will not go in details here.

How is Spark running on Mesos?

There is already the official guide that thoroughly explains how to configure Spark to work with Mesos but I want here to linger on how a Spark application is executed on Mesos, in practical terms. So, the Spark configuration is essentially split in two files: conf/spark-defaults.conf and conf/spark-env.sh. The first file contains the defaults values that are only passed to spark-submit, the second one is sourced when running spark programs in general. Thereof, if we read the documentation we find that we have to set the spark.executor.uri in the following way

or just add that the following line the spark-defaults.conf file

So what happens when we run our Spark application on Mesos? Well, Mesos does not really know what Spark is (to a certain extent), thus it wants to “start” Spark in all the slaves and let Spark handle the rest. Mesos does not know where Spark is, therefore it will fetch the “Spark’s executor” from the given URI, then will uncompress the tarball and run Spark.

However, setting the spark.executor.uri this is not the only way to run Spark via Mesos. There is another option, which is setting the spark.mesos.executor.home parameter. In this case, we have to choose a local path (local to all the nodes), in which Spark has already been installed. Therefore, the same path must exist and be valid for all the nodes in the cluster. In this case, each slave will just execute Spark from the given path.

How can I configure Amazon S3?

Now that we have a working Mesos+Spark cluster and we got the basics, we have to configure Amazon S3. Around this topic, there is a lot of confusion on the Internet on the different protocol and which one to use and how. You can read the S3 wiki by Apache to understand the differences between the protocols. Bottom line is this one: use only s3a. At one condition, though: Use s3a protocol ONLY with Hadoop 2.7.1 or higher! I really mean this. If you do not have Hadoop 2.7.1 installed, then stop reading and upgrade it right away. I mean it. The long-story short is that the s3a protocol has been added to Hadoop 2.6.0 so, in practice, you can use it with such a version. However, it is far from being stable (and I wish they had never released it). It costed my company more than one week of investigations and slowdowns to figure this out. Anyway, let’s go back to the real topic. At this point you may wonder: let’s just use s3a://bucketname/mydata and I am set, right? No. Unfortunately, Spark with Hadoop-2.6.0 does not include the s3a filesystem nor the Amazon AWS SDK.

Nevertheless, do not lose hope! We can still inject the jar files we need during Spark’s execution. The best way to do that is to add the following two lines to spark-defaults.conf (as the use of SPARK_CLASSPATH is deprecated)

You have, of course to substitute /path/to/hadoop/ with the real path in which you installed Hadoop on your system. Again an important obervation: this path is local to all the nodes in the cluster. Therefore, it must exist in all the machines.

Ok, now you have the libraries, the filesystem, the protocols but you are still missing one key element: the access and secret keys. In order to access Amazon S3, you need both to authenticate to the server. On the Internet there are thousands of different comments, suggestions, methods on how to do that. Of course, you can export those in the execution scripts but this is rather stupid: you will have to adapt each and every script to export those keys before executing a job. The best way is the following: create a hdfs-site.xml and place it in the Spark’s conf directory. The file can look as the following:

This hdfs-site.xml must reside in all the nodes. Now the tricky part is the following: if you read the initial part of this post, you will be remembering that Mesos can run Spark in two ways: executing it from the Spark’s executor URI or executing it from the given local path. If you decided for the first way, then you will have to repack the Spark’s tarball and place the hdfs-site.xml in the conf folder. If, instead, you have chosen the second option, you will have to copy the hdfs-site.xml to all the nodes in the Spark’s conf folder.

Tiptop! We are good to go!

We are almost done

At this point we have the keys, protocols, libraries and everything we need. If you start your application now, it will most likely work. Anyway, this is not yet over. There are a couple of default settings which are slowing down your processing or even jeopardising the stability of your jobs. The most subtle one is spark.hadoop.fs.s3a.connection.maximum which is, by default, set to 15. This translates to the following: if you have more than 15 cores, you will only have 15 connections open at the same time. Therefore all the other N-15 cores are not fetching data. Even worse, if you are processing large unsplittable files (e.g., GZIP), then you will get timeout errors. Why? Well, because Spark wants to access those files but there will be only 15 actually working. If the GZIP files are big enough (let’s say a couple of GBs), then the other to-be-opened connections will timeout compromising the whole job itself. The best way is to set this value to the maximum number of cores in your cluster (or just to MAX_INT to cut to the chase).

Known bugs

There is one small bug which I really do not know how to fix: sometimes it happens that a job hangs with one task being “stale” forever. This means that the job does not crash nor proceeds. The only solution I have found is to set spark.speculation=true which is recovering from this stale situation. However, be careful: if you are writing on a database then you will end up having multiple insertions because speculation mode re-execute slow tasks and prefers the one that terminates first (you can tune the other speculation’s settings to try to mitigate this phenomenon).

October 29, 2015 0 comments Read More