Performance issue: indexing task takes 2 passes over the dataset

Hello,
I have noticed that the indexing task reads the complete dataset 2 times when creating a non partitioned segment:

-1st time to get the data intervals

-2nd time to actually create the segment

Is there a way to reduce this to a single pass?

Thanks,
Sandeep

IndexTask does the first pass to determine the number of shards,
If you already know that the data is small enough and will never be sharded, you can set the targetPartitionSize in tuningConfig to -1.

This will avoid the first pass.

Nishant,
1st time is not related to targetPartitionSize but to getting valid intervals. targetPartitionSize related is an extra pass that i am already avoiding by using targetPartitionSize=-1. I have highlighted the 2 calls below. getDataIntervals does a full pass on the data.

final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
final int targetPartitionSize = ingestionSchema.getTuningConfig().getTargetPartitionSize();

final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Set<DataSegment> segments = Sets.newHashSet();

final Set<Interval> validIntervals = Sets.intersection(granularitySpec.bucketIntervals().get(), getDataIntervals());
if (validIntervals.isEmpty()) {
  throw new ISE("No valid data intervals found. Check your configs!");
}

for (final Interval bucket : validIntervals) {
  final List<ShardSpec> shardSpecs;
  if (targetPartitionSize > 0) {
    shardSpecs = determinePartitions(bucket, targetPartitionSize, granularitySpec.getQueryGranularity());
  } else {
    int numShards = ingestionSchema.getTuningConfig().getNumShards();
    if (numShards > 0) {
      shardSpecs = Lists.newArrayList();
      for (int i = 0; i < numShards; i++) {
        shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, jsonMapper));
      }
    } else {
      shardSpecs = ImmutableList.<ShardSpec>of(new NoneShardSpec());
    }
  }

Thanks,

Sandeep

There’s 3 phases:

  1. determine intervals

  2. determine partitions

  3. make segments

if u provide info about intervals and # partitions the first 2 steps will be skipped

How do i provide info about intervals? I am using a custom firehose and looking at the firehose interface i don’t see a way to pass the time interval back to the framework.

You can only provide intervals and partitions as part of batch ingestion. I thought it is your batch ingestion that is slow? There’s no way to provide information about intervals using firehoses. FWIW, in Druid 0.9.0, all the docs have been updated to recommend using Tranquility for realtime ingestion. With Tranquility you can specify # of partitions. Also, with 0.9.0, segment building has been heavily optimized and shoudl be faster.

Yes, it is batch ingestion. I am using indexing service’s index task with a custom firehose.

Unfortunately, 0.9.0 has the same limitations where IndexTask still uses the same steps and the firehose interface has no methods to provide time intervals back to the framework.

Hi Sandeep, are you using the index task or hadoop index task? The index task is really meant for POCs and the Hadoop index task is much better suited for production use. Do you have Hadoop in house?

Hi Fangjin, I am using index task. We ingest in small batches from Mongo using a custom firehose.

Does hadoop index task support reading from a custom source?

Hey Sandeep,

The Hadoop task supports reading from any Hadoop InputFormat but not from firehoses.

The Index Task has not been very heavily optimized as it is not commonly used in production. The most common batch ingestion method in production is Hadoop indexing with a remote Hadoop cluster (not the local runner- although that works too it is not common in production) and the most development effort has been focused on that case.

In other words, the Index Task could definitely use some love if you are willing to get your hands dirty :slight_smile:

Hi Gian,

Online documentation at http://druid.io/docs/0.8.3/ingestion/batch-ingestion.html says this for Indexing Service…

The indexing service was created such that external systems could programmatically interact with it and run periodic indexing tasks. Long-term, the indexing service is going to be the preferred method of ingesting data.

…and this for Hadoop…

The HadoopDruidIndexer runs hadoop jobs in order to separate and index data segments. It takes advantage of Hadoop as a job scheduling and distributed job execution platform. It is a simple method if you already have Hadoop running and don’t want to spend the time configuring and deploying the Indexing service just yet.

…But looks like you are suggesting Hadoop over Indexing Service. The concern i have now is around how well the indexing service has been tested in prod. Is performance optimization the only unaddressed issue with Indexing Service or we may discover other issues?

-Sandeep

Hey Sandeep,

There are two methods of using Hadoop indexing with Druid.

  1. Indexing service’s “Hadoop Index Task”.

  2. Standalone HadoopDruidIndexer.

Both of these use the same underlying code and both of these are suitable for production. The only real difference is the code is started up in different ways.

The indexing service also has a plain “Index Task”, this is not commonly used in production as far as I know. With the indexing service you can use a variety of different task types – some are more production tested than others.

Hopefully this helps clarify things.

Thanks Gian! It’s clear now.