Re-Indexing Taking time : Need Improvement

Hi Druid Community,

We are trying to set up Druid for our project.

We have a set up where raw events are going to one datasource through kafka indexing service with segment size as FIFTEEN_MINUTE. KAFKA topic partitions are 3 and there are 3 tasks running - each for one partition.

While doing re-indexing to merge the segments or roll up raw data of FIFTEEN_MINUTE - it is taking almost 45-50 minutes to complete the task. Attaching the re-indexing task here and please let me know if anything could be changed and tried to get the better results.

I having 10cr rows in the segment of 15 minutes each having 10-12 columns including dimensions and metrics.

Thanks in advance.

Regards,

Arpan Khagram

+91 8308993200

re-indexing_task.txt (1.02 KB)

Hey Arpan,

The IndexTask (“type”:“index”) is known to not be particularly performant, and is usually not recommended except for indexing small amounts of data. The best option for re-indexing is to run a Hadoop batch job with a dataSource inputSpec as described here: http://druid.io/docs/latest/ingestion/update-existing-data.html. Ideally you’ll have access to an external Hadoop cluster, but even if you don’t, you can try the Hadoop job anyway and it’ll run using a LocalJobRunner which should still be more efficient than the IndexTask.

The second best option would be patching in this PR here: https://github.com/druid-io/druid/pull/3611 which improves the performance of the IndexTask. However if you’re fairly new to Druid and not comfortable building from master, I would give the Hadoop LocalJobRunner a try first to see if that works good enough for you.

Thanks a lot David, will try and let you know the results :slight_smile:

Hi David,

One more
help was required – wanted to understand the exact use or significance of
segment-cache. I am quite confused by reading multiple documents and threads on
segment-cache.

Segment-cache
are those segments which are not loaded yet into deep storage ? OR

Is it
something which was extracted from deep storage to serve query ? OR

Druid keep
all the segments by default on local storage between on historical nodes ?

Also what
does Pending Segments signify ?

Thanks in
advance.

Regards,

Arpan
Khagram

Mobile:
+91 8308993200

Hey Arpan,

Segment cache is closest to option #2. Historical nodes do not pull segments from deep storage in response to queries, but serve queries using the segments that have already been previously pulled into the segment cache. Hence your historicals should be sized (using druid.server.maxSize) such that all the segments for your dataSource can be contained by the segment caches of your historicals, otherwise your queries will be missing data.

The pending segments mechanism is currently only used by the Kafka indexing service, and it’s used to ensure that when replication is used (multiple tasks doing the same thing on different nodes for redundancy) these replica tasks generate the same sequence of segments with the same identifiers deterministically.

Thanks David for the quick response J

With respect to same question, how multiple historical nodes are
supposed to work with respect to segment-cache. If I have configured 2 historical,
both of them are supposed to have same dataset in segment-cache ? How having multiple historical nodes in my
cluster helps me ?

Thanks in advance.

Regards,

Arpan Khagram

Mobile: +91 8308993200

Hey Arpan,

Having multiple historical nodes is the way that Druid scales and achieves query parallelization. The design overview here is a good starting reference: http://druid.io/docs/latest/design/design.html.

The Druid coordinator is the service responsible for distributing segments across multiple historical nodes. It has internal algorithms to determine the optimal segment distribution to maximize query parallelization and does this automatically. In the general case, the segment caches of your historical nodes will not contain the same segments.

Hi David,Community

Thanks for the reply :slight_smile: Few more things -

  1. 400-500 MB is recommended size for the complete segment or the partition ? What if I have 2-3 GB size of segment but partition is only 80-90 MB. Is this good to have or should change anything ?

  2. What exactly segment loading means ? I wanted to understand the complete flow of the data for KAFKA indexing Service.

I can see segments getting loaded first on Deep Storage and then it comes to segment-cache - is this correct understanding ?

  1. I have configured 2 historical nodes and I can see both are having same segment-cache. I understood from your below statement and druid docs that it should load balance

In the general case, the segment caches of your historical nodes will not contain the same segments.

Is there any specific settings that is required in coordinator to do load balancing between multiple historical nodes ?

Regards,

Arpan Khagram

+91 8308993200

Responses inline.

Hi David,Community

Thanks for the reply :slight_smile: Few more things -

  1. 400-500 MB is recommended size for the complete segment or the partition ? What if I have 2-3 GB size of segment but partition is only 80-90 MB. Is this good to have or should change anything ?

That is the recommended size for each partition. You could try adjusting your segmentGranularity a bit to see if you can get slightly larger segments, if you’re using fifteen minutes right now you can try hourly.

  1. What exactly segment loading means ? I wanted to understand the complete flow of the data for KAFKA indexing Service.

Here is a high level overview of the flow of the KafkaIndexTask:

  • The tasks reads from the Kafka partitions and generates a series of in-memory partitions (one for each Kafka partition / segmentGranularity interval). When maxRowsInMemory is reached, these partitions are spilled to disk. Both the in-memory and spilled partitions are available for query requests.
  • When taskDuration is reached, the task stops reading from Kafka and merges the spilled partitions together into a final set of partitions.
  • This final set of partitions is pushed into deep storage (S3, HDFS, NFS) and the segment descriptors are written into the metadata table. The task waits for handoff.
  • The coordinator periodically checks the metadata table for new, unserved segments, and when it detects these segments, it instructs the historical nodes to load the segments according to a cost balancing algorithm. The historical nodes do this by pulling the segment from deep storage into their local segment cache, and memory mapping the file for reading.
  • Once the historical nodes have loaded all the segments, the indexing task is notified that it can stop serving queries for the segments it generated since the historicals will now take over that responsibility. It’s now done and the process terminates.

If you’re interested in more details about how the Kafka indexing service works, you can take a look at this post: https://imply.io/post/2016/07/05/exactly-once-streaming-ingestion.html

I can see segments getting loaded first on Deep Storage and then it comes to segment-cache - is this correct understanding ?

Yes.

  1. I have configured 2 historical nodes and I can see both are having same segment-cache. I understood from your below statement and druid docs that it should load balance

In the general case, the segment caches of your historical nodes will not contain the same segments.

Ah, your setup is the exception to the general case :slight_smile: The default replication factor (defined by load rules: http://druid.io/docs/0.9.1.1/operations/rule-configuration.html) is 2, so if you have 2 historical nodes, each node will load the full set of segments to achieve the desired level of redundancy. You can try modifying the load rule to set a replication of 1 and the segments should get split between the two nodes.

Hi David,

Thanks a lot for your valuable inputs. I was able to load balance segments between 2 historical nodes by setting up rule now. Your post is very informative and now I could clearly understood the data flow for kafka indexing service.

I had one more question regarding server max size -druid.server.maxSize for historical node

what I understood and could see is this parameter is related to how much disk space we want to allocate to historical node including segment cache and generally we should prefer having same amount of size for both druid.server.maxSize and segment.cache size.

But when I looks the druid documentation I am getting confused on if it is disk space or the memory allocation for historical node. Snapshot from druid documentation.

Server maxSize sets the maximum cumulative segment size
(in bytes) that a node can hold. Changing this parameter will affect
performance by controlling the memory/disk ratio on a node. Setting this parameter to a
value greater than the total memory capacity on a node and may cause disk
paging to occur. This paging time introduces a query latency delay.

What is server maxSize?

I am also having query performance issue and need this maxSize and segment cache to understand properly to tune:) Kindly let me know if you want to see any other config file of mine to see if it’s correct for having tuned performance for query.

Regards.

Arpan Khagram

+91 8308993200

Hey Arpan,

An example would probably be helpful here. Let’s say that after starting up all your processes and allocating the necessary memory for them (JVM heaps, direct byte buffers, overhead memory for OS and processes), you have 60 GB of remaining available memory. Let’s say you also have 120 GB of available disk space:

The way that Druid reads segments is that it maps the files stored in the segment cache (i.e. disk) into whatever memory is available on the machine and then reads them from memory; this is handled automatically for us by the OS. When we try to read a segment that has not been mapped to memory, the OS will page the file from disk into memory which will incur a latency penalty (and will also result in the eviction of data that was previously sitting in memory). Hence, in this scenario:

  • The maximum amount of segments (druid.server.maxSize) you can set is 120 GB (since that’s the max available disk). If you set 120 GB, but have 60 GB of available memory, at any given time half of your data will be in memory, meaning that if you run a query that has to hit every segment, 60 GB of data will have to be paged in during the query.

  • If you set druid.server.maxSize to 60GB or less, all the segments on that historical will be mapped into memory and disk paging will not occur since there’s no more data that might have to be paged in. Hence your queries will return much faster.

How you set this parameter is your cost/performance tradeoff for historicals.

Regarding performance tuning in general, it’s a pretty big subject. Typically a good starting place is to enable Druid metrics and learn how to use them to identify where your bottleneck is. This page contains information on metrics: http://druid.io/docs/0.9.1.1/operations/metrics.html and enabling them is described on the common configuration page.

Hi David,

Wanted to Check if we can force historic nodes to specific segments mapping (date wise) - such that we can have older segments go on nodes which have a higher druid.server.maxSize.

Example - Is there a way i can keep current(latest) 5 days of data on few historical nodes and data older than 5 days on another historical nodes.

Reason - Most of my queries will be for recent 4/5 days of data, i can manage to get servers(VMs) with large RAM (memory) so that queries are faster. It’s OK for me if it takes little bit of time while querying older data ( i can keep older data in separate historical nodes with low RAM as well). I have requirement to keep 28 days of data and it is becoming difficult to get so many large servers which can accommodate all days of data.

Also is there any way i can force some amount of data to be present permanently (says last 1/2/3 days of data) in memory for query.

Thanks in advance.

Regards,

Arpan Khagram

+91 8308993200

Hey Arpan,

What you’re trying to do can be accomplished using load rules (http://druid.io/docs/0.9.1.1/operations/rule-configuration.html) along with server tiers. You would have a load rule that loads the latest 5 days of data into a hot tier composed of historical nodes with more RAM, then a load rule that loads the latest 28 days of data into a cold tier that keeps more data on disk, then a drop forever rule to drop data older than 28 days. Rules are evaluated in sequence until one matches so the first 5 days would go to the hot tier, the next 23 days of data would go into the cold tier, and older data would be dropped.

Likewise, you could implement keeping the latest 1/2/3 days in-memory by using another tier that loads the latest x days and ensuring that the machines have enough memory to keep all their segments in memory.

Hi David,

As suggested by you, i am trying to do the same thing now by configuring one host and one cold tier (both having single server) but my segments are getting dropped and not getting loaded on historical node.

I am repeatedly getting below logs in coordinator logs [ tasks of kafka indexing are getting completed successfully and there are no exceptions in historical or middle manager logs]

2016-12-01T14:15:44,527 INFO [Coordinator-Exec–0] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“alerts”,“timestamp”:“2016-12-01T14:15:44.527Z”,“service”:“druid/coordinator”,“host”:“172.25.182.168:8081”,“severity”:“component-failure”,“description”:“Unable to find matching rules!”,“data”:{“class”:“io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner”,“segmentsWithMissingRulesCount”:6,“segmentsWithMissingRules”:[“telemetry_2016-12-01T13:00:00.000Z_2016-12-01T14:00:00.000Z_2016-12-01T13:00:23.658Z_2”,“telemetry_2016-12-01T13:00:00.000Z_2016-12-01T14:00:00.000Z_2016-12-01T13:00:23.658Z_1”,“telemetry_2016-12-01T13:00:00.000Z_2016-12-01T14:00:00.000Z_2016-12-01T13:00:23.658Z”,“telemetry_2016-12-01T12:00:00.000Z_2016-12-01T13:00:00.000Z_2016-12-01T12:00:23.627Z_5”,“telemetry_2016-12-01T12:00:00.000Z_2016-12-01T13:00:00.000Z_2016-12-01T12:00:23.627Z_4”,“telemetry_2016-12-01T12:00:00.000Z_2016-12-01T13:00:00.000Z_2016-12-01T12:00:23.627Z_3”]}}]

2016-12-01T14:15:44,527 INFO [Coordinator-Exec–0] io.druid.server.coordinator.helper.DruidCoordinatorBalancer - [cold]: One or fewer servers found. Cannot balance.

2016-12-01T14:15:44,527 INFO [Coordinator-Exec–0] io.druid.server.coordinator.helper.DruidCoordinatorBalancer - [hot]: One or fewer servers found. Cannot balance.

2016-12-01T14:15:44,527 INFO [Coordinator-Exec–0] com.metamx.emitter.core.LoggingEmitter - Event [{“feed”:“metrics”,“timestamp”:“2016-12-01T14:15:44.527Z”,“service”:“druid/coordinator”,“host”:“172.25.182.168:8081”,“metric”:“segment/overShadowed/count”,“value”:0}]

2016-12-01T14:15:44,527 INFO [Coordinator-Exec–0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Load Queues:

2016-12-01T14:15:44,527 INFO [Coordinator-Exec–0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Server[172.25.182.169:8083, historical, cold] has 0 left to load, 0 left to drop, 0 bytes queued, 0 bytes served.

2016-12-01T14:15:44,527 INFO [Coordinator-Exec–0] io.druid.server.coordinator.helper.DruidCoordinatorLogger - Server[172.25.182.158:8083, historical, hot] has 0 left to load, 0 left to drop, 0 bytes queued, 0 bytes served.

Initially i tried by configuring 3 rules [ one on host with P1D load, one on cold with P3D load, and drop forever ] but behaviour of segments getting dropped was seen with and without rules.

Can you please suggest.

Regards,

Arpan Khagram

+91 8308993200

Hey Arpan,

I’ve attached some screenshots of what it should look like. Note that the period rules are “from now”, i.e. P1D is “segments in the past 24 hours” and not “the most recent day of data”.

Also note that all the rules have to be entered at the same time. Some in the past have tried to configure rules by first setting a P1D hot tier rule and saving it, and then setting another P3D cold tier and saving it, and then setting a drop forever and saving it. What happens in this case is that the latest version of the ruleset (which would be just the drop forever) is applied and all the segments get unloaded from the historicals.

Thanks David, I have got it working in similar manner now.

So it means that all the hosts/historical nodes for which we keep the same name will be part of the same tier. e.g. I keep 2 nodes druid.server.tier=hot and 2 nodes druid.server.tier=cold then both cold and host will have 2 nodes each right ?

Regards,

Arpan Khagram

Yes, that is correct.

Thanks David.