Lambda Architecture

I’m been researching lamdba architectures (http://lambda-architecture.net/) and I’ve been instructed with setting up a production environment for such an architecture. I’ve decided to use an existing RabbitMQ message broker as an event source, which forwards events into Apache Storm (for the real-time part) and Apache Hadoop (for the batch part). To index both parts, Druid seemed like the best choice available.

I’ve setup a pseudo-distributed hadoop cluster and I’m trying to connect Druid to it, but I’m having some doubts. First of all, here’s my druid _common configuration:

druid.storage.type=hdfs

druid.storage.storageDirectory=/tmp/hadoop-atnogcloud/dfs/data/

So, I’m guessing the path I put here is the where my hadoop is storing data (which defaults to the path “/tmp/hadoop-/dfs/data”). However, i ran some examples from your twitter feed and, when I turn on my historical node and query it, it reads the data previously stored in the localFS. 1) Why? Should it not read data from hadoop and ignore the local one, since we’ve changed the storage location and restarted the node? Or are locally cached files never reset?

Next, I’ve followed http://druid.io/docs/latest/Batch-ingestion.html and I’m using the HadoopDruidIndexer to index hadoop data into my druid cluster. According to the tutorial, I don’t need indexing services running if I use this, so I should have no need for overlords, middlemanagers or peons. 2) But then, where is this run? Because I have only 1 machine, all services are here and I execute this here as well, but in a distributed cluster, which machine runs this? A dedicated one? A historical one?

As far as I understood it, this indexer loads data from Hadoop and indexes it, so that druid can understand the data. By the specFile, i have 2 paths: the inputSpec one and the segmentOutputPath. According to their description, the inputSpec path is “A String of input paths indicating where the raw data is located.” and the segmentPath is “the path to dump segments into.”. 3) These are all in the HDFS, right? Something like “/tmp/hadoop-atnogcloud/dfs/data/raw/” and “/tmp/hadoop-atnogcloud/dfs/data/patched/” . Does druid handle all the issues of knowing which data has been processed, which hasn’t, etc?

I’ve only recently come into contact with these technologies and I’m feeling overwhelmed with many new concepts. Any guidance would be appreciated!

Dave

Hi David, please see inline.

I’m been researching lamdba architectures (http://lambda-architecture.net/) and I’ve been instructed with setting up a production environment for such an architecture. I’ve decided to use an existing RabbitMQ message broker as an event source, which forwards events into Apache Storm (for the real-time part) and Apache Hadoop (for the batch part). To index both parts, Druid seemed like the best choice available.

I’ve setup a pseudo-distributed hadoop cluster and I’m trying to connect Druid to it, but I’m having some doubts. First of all, here’s my druid _common configuration:

druid.storage.type=hdfs

druid.storage.storageDirectory=/tmp/hadoop-atnogcloud/dfs/data/

So, I’m guessing the path I put here is the where my hadoop is storing data (which defaults to the path “/tmp/hadoop-/dfs/data”). However, i ran some examples from your twitter feed and, when I turn on my historical node and query it, it reads the data previously stored in the localFS. 1) Why? Should it not read data from hadoop and ignore the local one, since we’ve changed the storage location and restarted the node? Or are locally cached files never reset?

If you want to use hdfs as a deep storage, make sure to also include the hdfs module in your common configuration extensions. Otherwise, things will default to using localFS.

Next, I’ve followed http://druid.io/docs/latest/Batch-ingestion.html and I’m using the HadoopDruidIndexer to index hadoop data into my druid cluster. According to the tutorial, I don’t need indexing services running if I use this, so I should have no need for overlords, middlemanagers or peons. 2) But then, where is this run? Because I have only 1 machine, all services are here and I execute this here as well, but in a distributed cluster, which machine runs this? A dedicated one? A historical one?

The standalone hadoop indexer acts as a driver and submits mapReduce jobs to a remote hadoop cluster. If you include hadoop configs in the classpath of the driver, you should be able to tell the driver where to send the jobs.

As far as I understood it, this indexer loads data from Hadoop and indexes it, so that druid can understand the data. By the specFile, i have 2 paths: the inputSpec one and the segmentOutputPath. According to their description, the inputSpec path is “A String of input paths indicating where the raw data is located.” and the segmentPath is “the path to dump segments into.”. 3) These are all in the HDFS, right? Something like “/tmp/hadoop-atnogcloud/dfs/data/raw/” and “/tmp/hadoop-atnogcloud/dfs/data/patched/” . Does druid handle all the issues of knowing which data has been processed, which hasn’t, etc?

These are HDFS paths yes. Druid takes the raw data and indexes it to create segments. Druid will take care of knowing what segments have been created from the raw data and what hasn’t.

I’ve only recently come into contact with these technologies and I’m feeling overwhelmed with many new concepts. Any guidance would be appreciated!

Lambda architectures can definitely to a lot to take in. We’ll try to help as much as possible. I recently wrote up some literature on the subject: http://static.druid.io/docs/radstack.pdf. It hasn’t been peer reviewed yet, but it can provide some more info as to how to architect a lambda architecture stack.

If you want to use hdfs as a deep storage, make sure to also include the hdfs module in your common configuration extensions. Otherwise, things will default to using localFS.

Ok, I have included HDFS in my extensions list, ty for that!

The standalone hadoop indexer acts as a driver and submits mapReduce jobs to a remote hadoop cluster. If you include hadoop configs in the classpath of the driver, you should be able to tell the driver where to send the jobs.

Ok, that makes sense.

These are HDFS paths yes. Druid takes the raw data and indexes it to create segments. Druid will take care of knowing what segments have been created from the raw data and what hasn’t.

Right. The “druid.storage.storageDirectory” in the _common configurations should be the patched data directory, right?

Lambda architectures can definitely to a lot to take in. We’ll try to help as much as possible. I recently wrote up some literature on the subject: http://static.druid.io/docs/radstack.pdf. It hasn’t been peer reviewed yet, but it can provide some more info as to how to architect a lambda architecture stack.

I’ll check it out, it seems very interesting!

fullLog.txt (20.7 KB)

Inline.

If you want to use hdfs as a deep storage, make sure to also include the hdfs module in your common configuration extensions. Otherwise, things will default to using localFS.

Ok, I have included HDFS in my extensions list, ty for that!

The standalone hadoop indexer acts as a driver and submits mapReduce jobs to a remote hadoop cluster. If you include hadoop configs in the classpath of the driver, you should be able to tell the driver where to send the jobs.

Ok, that makes sense.

These are HDFS paths yes. Druid takes the raw data and indexes it to create segments. Druid will take care of knowing what segments have been created from the raw data and what hasn’t.

Right. The “druid.storage.storageDirectory” in the _common configurations should be the patched data directory, right?

Yeah, in general we try to define Druid’s external dependencies in the common configs.

Now I have another problem, running the HadoopDruidIndexer. I have inserted a wiki.json file into the hdfs, in the directory /raw, as my be seen by this line:

atnogcloud@localhost$ hadoop fs -ls /raw

Found 1 items

-rw-r–r-- 1 atnogcloud supergroup 1675 2015-04-16 20:11 /raw/wiki.json

When i use a MapReduce program (word count) in the raw folder, everything works fine:

hadoop jar /usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount /raw /output

The results are stored in the output folder and they match the expected results.

My problem shows when i run the Indexer, with this line:

java -Xmx256m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath lib/:/usr/local/hadoop/etc/hadoop/ io.druid.cli.Main index hadoop specFile.json

My specFile.json is the one in http://druid.io/docs/latest/Batch-ingestion.html but with these modifications:

“ioConfig” : {

“type” : “hadoop”,

“inputSpec” : {

“type” : “static”,

“paths” : “/raw/wiki.json”

},

“metadataUpdateSpec” : {

“type”:“mysql”,

“connectURI” : “jdbc:mysql://localhost:3306/druid”,

“password” : “diurd”,

“segmentTable” : “druid_segments”,

“user” : “druid”

},

“segmentOutputPath” : “/patched”

}

Nothing really fancy changed, just my metadata and the hadoop paths. We may see that the “paths” : “/raw/wiki.json” match the file that exists in the HDFS, as seen by the previous ls command.

When i run this command, however, i get the following error:

2015-04-17T10:30:03,353 WARN [main] org.apache.hadoop.security.UserGroupInformation - PriviledgedActionException as:atnogcloud (auth:SIMPLE) cause:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/raw/wiki.json

Full log attached.

Why does this happen? Have i put some configuration wrong? Do I need to change anything on hadoop’s configs to get this working?

The paths are in the wrong format.

“paths” : “hdfs://localhost:9000/raw/wiki.json”

“segmentOutputPath” : “hdfs://localhost:9000/patched”

I have also had some issues with my JAR dependencies.

When i first ran the indexer, i had a “mysql module not loaded” sort of error, which I fixed by manually accessing the maven repository, downloading the jar and inserting it into my classpath. Should this not happen automatically?

I also had to download the mysql jdbc jar and insert it into the classpath

There is also another issue, all my logs show that druid uses hadoop-2.3.0 jars, but I’m using 2.6.0. Where do I change that or is that not something i should worry about? Can this be related to why my HadoopIndexer doesn’t work?

I deployed 2.3.0 and still had the same issue, the source of the problem was the path format, as explained above. For the time being, I’ll continue using Hadoop2.3.0, just so no version issues become significant