Indexing performance issues

Hi guys,

I’m trying to understand how can I tune druid indexing properties to achieve a high throughput with the indexing service.

I’m using samza + tranquility to inject events into druid indexing service.

I’m producing at a constant rate of ~12K events/s to two partitions with a replication factor of 1 (2 indexing tasks in total).

I think my dimensions don’t have a high cardinality and I have ensured that samza does not produce a bottleneck.

Attached to this message you will find examples of my events (which are related to traffic and network behaviour), the log from one of the indexing tasks and a few screenshots from jconsole.

Also, you can find attached the properties from my middleManager nodes. I have two middleManagers which can spawn four tasks each. Each task have 7 GB of heap.

With this configuration, I can’t even get close to 12K events/s indexed by druid.

In this case, from what I can understand from jconsole plots, I think that the GC is the bottleneck. A lot of GC’s are ocurring, which decreases the overall throughput of the task.

I’m also having a few ‘to-space-overflow’ collections on the G1GC, which makes that even worse.

From this point, I think I have three options:

1- Increase the heap for each druid indexing task

2- Decrease the maxRows of my task

3- Increase the number of partitions

I’ve tried the option number two. Instead of using a maxRows of 500.000, I decreased it to 220.000.

Doing that, the garbage collections of the task decreases, but the intermediate persists increases a lot.

When my task finishes (my segment duration is 1 hour) I have like 100 intermediate persists in the disk.

The task then tries to merge them, which results in LOTS of time doing just that, which with enough time ends up filling my middleManagers capacities which tasks.

My questions right now are:

  • How much is the ideal time between intermediate persists? Should I optimize for this metric and when achieved, just increment my heap to avoid GC’s?

  • I don’t use autoscaling, and my worker capacity is limited. In that case, what do you think works better? Small heap tasks with a big partition number or big heap tasks with a fewer partition number?

Having said that, I would like to ask for general guidelines to improve the behaviour of this scenario.

Any ideas? Any druid properties that I did not mention which can be related?

Thanks you very much,

Carlos

middleManagerProperties.txt (2.57 KB)

msgs.txt (97.8 KB)

taskLog.txt (242 KB)

Hi Carlos,

I think you’re basically on the right track. The idea with ingestion tuning is:

  1. Have enough partitions such that none of them are bottlenecked on CPU (each partition has a single ingestion thread, and it can get overwhelmed)

  2. After that, divide RAM equally amongst all the partitions. You want to leave some amount for compute buffers (# query threads * compute buffer size), some amount for page cache for the disk spills (probably 500MB-1GB), then use the rest for the jvm heap.

  3. Set maxRowsInMemory as high as you can without leading to too many GCs. The exact number is a bit fuzzy and depends on what your data looks like. This will limit the amount of disk spilling. Fwiw, we use G1GC and generally see less than 1% of our time being spent in GC.

  4. If that’s not enough, faster disks or more RAM would help.

Hi Gian, thanks for your tips.

Let me ask you a few more questions about how persists works.

I took the following log fragment from one of my tasks. Note that I’m using the CMS collector instead of G1 with a 4G heap.

2015-04-13T11:07:39,332 INFO [rb_flow-incremental-persist] io.druid.segment.IndexMerger - Starting persist for interval[2015-04-13T11:00:00.000Z/2015-04-13T11:08:00.000Z], rows[500,000]
2015-04-13T11:07:41,698 INFO [rb_flow-incremental-persist] io.druid.guice.PropertiesModule - Loading properties from runtime.properties
2015-04-13T11:07:41,710 INFO [rb_flow-incremental-persist] org.skife.config.ConfigurationObjectFactory - Assigning value [510101964] for [druid.processing.buffer.sizeBytes] on [io.druid.query.DruidProcessingConfig#intermediateComputeSizeBytes()]
2015-04-13T11:07:41,711 INFO [rb_flow-incremental-persist] org.skife.config.ConfigurationObjectFactory - Assigning value [3] for [druid.processing.numThreads] on [io.druid.query.DruidProcessingConfig#getNumThreads()]
2015-04-13T11:07:41,711 INFO [rb_flow-incremental-persist] org.skife.config.ConfigurationObjectFactory - Using method itself for [{base_path}.columnCache.sizeBytes] on [io.druid.query.DruidProcessingConfig#columnCacheSizeBytes()] 2015-04-13T11:07:41,711 INFO [rb_flow-incremental-persist] org.skife.config.ConfigurationObjectFactory - Assigning default value [processing-%s] for [{base_path}.formatString] on [com.metamx.common.concurrent.ExecutorServiceConfig#getFormatString()]
2015-04-13T11:07:41,719 INFO [rb_flow-incremental-persist] io.druid.guice.JsonConfigurator - Loaded class[interface io.druid.segment.data.BitmapSerdeFactory] from props[druid.processing.bitmap.] as [io.druid.segment.data.BitmapSerde$DefaultBitmapSerdeFactory@381a9b4d]
2015-04-13T11:07:41,720 INFO [rb_flow-incremental-persist] io.druid.segment.IndexMerger - outDir[/tmp/persistent/task/index_realtime_rb_flow_2015-04-13T11:00:00.000Z_1_0/work/persist/rb_flow/2015-04-13T11:00:00.000Z_2015-04-13T12:00:00.000Z/0/v8-tmp] completed index.drd in 48 millis.
2015-04-13T11:07:41,747 INFO [rb_flow-incremental-persist] io.druid.guice.PropertiesModule - Loading properties from runtime.properties
2015-04-13T11:07:41,764 INFO [rb_flow-incremental-persist] io.druid.guice.JsonConfigurator - Loaded class[interface io.druid.segment.data.BitmapSerdeFactory] from props[druid.processing.bitmap.] as [io.druid.segment.data.BitmapSerde$DefaultBitmapSerdeFactory@73feb5a0]
2015-04-13T11:07:41,901 INFO [rb_flow-incremental-persist] io.druid.segment.IndexMerger - outDir[/tmp/persistent/task/index_realtime_rb_flow_2015-04-13T11:00:00.000Z_1_0/work/persist/rb_flow/2015-04-13T11:00:00.000Z_2015-04-13T12:00:00.000Z/0/v8-tmp] completed dim conversions in 181 millis.
761.521: [GC 761.521: [ParNew: 1198080K->133120K(1198080K), 0.3946020 secs] 3599524K->2696918K(4061184K), 0.3947480 secs] [Times: user=1.16 sys=0.08, real=0.39 secs]
761.916: [GC [1 CMS-initial-mark: 2563798K(2863104K)] 2698994K(4061184K), 0.1591610 secs] [Times: user=0.16 sys=0.00, real=0.16 secs]
762.076: [CMS-concurrent-mark-start]
763.835: [GC 763.835: [ParNew (promotion failed): 1198080K->1198080K(1198080K), 24.3957030 secs]788.231: [CMS790.226: [CMS-concurrent-mark: 3.734/28.150 secs] [Times: user=38.55 sys=8.06, real=28.15 secs]
(concurrent mode failure): 2678329K->2456874K(2863104K), 10.6634380 secs] 3761878K->2456874K(4061184K), [CMS Perm : 55554K->55535K(83968K)], 35.0595580 secs] [Times: user=42.02 sys=7.97, real=35.05 secs]
2015-04-13T11:08:20,275 INFO [main-SendThread(samza01:2181)] org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 39963ms for sessionid 0x14c980518e206d5, closing socket connection and attempting reconnect

As you see, these logs are written when a persist occurs. Next, it starts loading configs about the compute buffer size that you mentioned.

Then, the GC starts, and I got a promotion failed and concurrent mode failure, probably because my heap is not large enough, and my application stops for like 30-40 secs.

At that point even the ZK conn drops because of a timeout.

I’m trying to understand how this process works, so let me ask you a few questions:

  • I thought that the compute buffer size (druid.processing.buffer.sizeBytes) were used only in queries. Is it used in persists too as I suppose from that logs?

  • From what I understood about your last reply, the buffer size is stored off-heap, so why does my heap fill when a persist occurs? Does that make sense?

  • Is incrementing the size of the heap the only solution here?

Thanks,

Carlos

BTW, let me add one more thing.

Very often my tasks don’t complete successfully, but neither they fail.

They are just stalled at the ‘running’ state, and that provokes that newer tasks get queued up as ‘pending’.

When I open the log from one of the stalled tasks, these are the last lines:

2015-04-13T12:20:51,306 INFO [rb_flow-2015-04-13T11:00:00.000Z-persist-n-merge] io.druid.storage.s3.S3DataSegmentPusher - Uploading [/tmp/persistent/task/index_realtime_rb_flow_2015-04-13T11:00:00.000Z_1_0/work/persist/rb_flow/2015-04-13T11:00:00.000Z_2015-04-13T12:00:00.000Z/merged] to S3
2015-04-13T12:20:51,320 INFO [rb_flow-2015-04-13T11:00:00.000Z-persist-n-merge] com.metamx.common.CompressionUtils - Adding file[/tmp/persistent/task/index_realtime_rb_flow_2015-04-13T11:00:00.000Z_1_0/work/persist/rb_flow/2015-04-13T11:00:00.000Z_2015-04-13T12:00:00.000Z/merged/meta.smoosh] with size[1,000]. Total size so far[0]
2015-04-13T12:20:51,321 INFO [rb_flow-2015-04-13T11:00:00.000Z-persist-n-merge] com.metamx.common.CompressionUtils - Adding file[/tmp/persistent/task/index_realtime_rb_flow_2015-04-13T11:00:00.000Z_1_0/work/persist/rb_flow/2015-04-13T11:00:00.000Z_2015-04-13T12:00:00.000Z/merged/version.bin] with size[4]. Total size so far[1,000]
2015-04-13T12:20:51,322 INFO [rb_flow-2015-04-13T11:00:00.000Z-persist-n-merge] com.metamx.common.CompressionUtils - Adding file[/tmp/persistent/task/index_realtime_rb_flow_2015-04-13T11:00:00.000Z_1_0/work/persist/rb_flow/2015-04-13T11:00:00.000Z_2015-04-13T12:00:00.000Z/merged/00000.smoosh] with size[113,598,025]. Total size so far[1,004]
2015-04-13T12:20:57,696 INFO [rb_flow-2015-04-13T11:00:00.000Z-persist-n-merge] io.druid.storage.s3.S3DataSegmentPusher - Pushing S3Object [key=rbdata/rb_flow/2015-04-13T11:00:00.000Z_2015-04-13T12:00:00.000Z/2015-04-13T11:00:00.378Z/1/index.zip, bucket=redborder, lastModified=null, dataInputStream=null, Metadata={x-amz-acl=bucket-owner-full-control, Content-Length=18639057, Content-MD5=OPYUciJ3UsRPQCIcnWEYdA==, md5-hash=38f61472227752c44f40221c9d611874, x-amz-content-sha256=69a9523bb1b63f0525799de6a6512bbaa9b2d313f5371e5a71d2fb4dd035f5d2, Content-Type=application/zip}].
2015-04-13T12:20:59,163 INFO [rb_flow-2015-04-13T11:00:00.000Z-persist-n-merge] io.druid.storage.s3.S3DataSegmentPusher - Pushing S3Object [key=rbdata/rb_flow/2015-04-13T11:00:00.000Z_2015-04-13T12:00:00.000Z/2015-04-13T11:00:00.378Z/1/descriptor.json, bucket=redborder, lastModified=null, dataInputStream=null, Metadata={x-amz-acl=bucket-owner-full-control, Content-Length=701, Content-MD5=iwtWtE1kX/SKBN9iPu7Phg==, md5-hash=8b0b56b44d645ff48a04df623eeecf86, x-amz-content-sha256=c34f872961150ce62f889042cfb9a61995dbf16d64bb308c61459fc3060611cd, Content-Type=application/octet-stream}]
2015-04-13T12:20:59,187 INFO [rb_flow-2015-04-13T11:00:00.000Z-persist-n-merge] io.druid.storage.s3.S3DataSegmentPusher - Deleting zipped index File[/tmp/druid7270834366079247476index.zip]
2015-04-13T12:20:59,195 INFO [rb_flow-2015-04-13T11:00:00.000Z-persist-n-merge] io.druid.storage.s3.S3DataSegmentPusher - Deleting descriptor file[/tmp/druid7391375022124383805descriptor.json]
2015-04-13T12:20:59,205 INFO [rb_flow-2015-04-13T11:00:00.000Z-persist-n-merge] io.druid.indexing.common.actions.RemoteTaskActionClient - Performing action for task[index_realtime_rb_flow_2015-04-13T11:00:00.000Z_1_0]: SegmentInsertAction{segments=[DataSegment{size=113599029, shardSpec=LinearShardSpec{partitionNum=1}, metrics=[events, sum_bytes, sum_pkts, sum_rssi, sum_dl_score, clients, wireless_stations], dimensions=[application_id_name, biflow_direction, client_building, client_campus, client_floor, client_latlong, client_mac, client_rssi, client_rssi_num, direction, dot11_status, dst, dst_net_name, engine_id_name, ip_protocol_version, l4_proto, sensor_name, src, src_net_name, src_vlan, wireless_id, wireless_station], version=‘2015-04-13T11:00:00.378Z’, loadSpec={type=s3_zip, bucket=redborder, key=rbdata/rb_flow/2015-04-13T11:00:00.000Z_2015-04-13T12:00:00.000Z/2015-04-13T11:00:00.378Z/1/index.zip}, interval=2015-04-13T11:00:00.000Z/2015-04-13T12:00:00.000Z, dataSource=‘rb_flow’, binaryVersion=‘9’}]}

What could be the reason? What’s going on in that very moment?

Thanks,

Carlos

Hi Carlos,

In response to your questions,

  • The compute buffer size is indeed only used in queries. Realtime nodes load the property because they do serve queries.

  • During persists, some extra memory is used on-heap in addition to off-heap. Most of the extra on-heap memory is due to the fact that you now have two indexes on heap: the one being persisted (which is now frozen), and the current one (which was created fresh when the persist started). To limit the amount of on-heap memory used during persists, it’s a good idea to set maxPendingPersists = 0. This is also the default.

  • To reduce heap usage, you could also lower maxRowsInMemory or lower maxPendingPersists (if it’s anything above 0).

The last line of that log is the task registering a new segment with the overlord. This is usually a really fast operation. If it’s taking a long time, it probably means something is wrong with the overlord or with network connectivity. Taking a thread dump of the task should make it clear if it’s actually stuck there, or if it’s moved on and is just doing something else that doesn’t involve a log message.

I think that in this case the problem was that my historical nodes had no capacity left for the segment. Does that make sense? Could a task get stalled at running for that reason?

In fact, whenever I found a stalled task at that state, the last log file is pretty random. It doesnt seem to get stalled at a certain point or at a certain log line. Sometimes I check the same task log just a minutes later and it keeps writing some logs to the file, but nothing that caughts my attention in particular. I will try to get a thread dump the next time it occurs and get back to you.

Thanks,
Carlos

Yeah, that does make sense. If your historical nodes are full, handoff can’t happen, because historicals cannot load new data. This will delay the shutdown of realtime tasks, since they will wait for handoff to occur before shutting down.

Hi Gian,

I’ve been taking a look at the production configuration that you guys have published at http://druid.io/docs/0.7.1.1/Production-Cluster-Configuration.html

In the middle manager section I can see that you have 9 peons with 3g heap each, 2 threads per peon and a compute buffer size of about 500m.

As I understand it, that means that you’re using (500m * 2 * 9 + 3 * 9) of memory. Thats a total of 36 GB, plus some more memory for disk cache in a node with 244 GB the ram.

Why is that? I suppose you share the same machine with other processes? There is any other reason?

Thanks,

Carlos

We actually devote about 60GB of memory to each middle manager in production. The 60 - 36 = 24GB is mostly useful for extra disk cache.

Ok Gian, thanks for your help.

I will describe how I finally improved the performance of the indexing process based on what you said, just in case it is useful to other people.

I ended up using a 4gb heap for each task and the G1GC collector, since I was having promotion failures with the CMS collector (probably caused by fragmentation of the heap).

I decreased the maxPendingPersist to 0 on tranquility, and started to produce at the higher rate that I could.

The idea is to increase the maxNumRows property in this scenario as high as you can as long as the GC does not waste too much time doing garbage collections.

A higher value of maxNumRows will result in lesser intermediate persists, which will finally increment the number of events/s that you can ingest.
If you see lots of old collections you probably should reduce the maxNumRows or increase the heap. Lots of young collections is common and reasonable as long as they don’t stop your application too much time.

As Gian said, around the 1% of the time spent on GC collections looks like a reasonable result.

Fwiw, in our case 300.000 rows was the better case, but as I understand that depends on the data being indexed.

We improved from 4k events/s to 10k events/s per indexing task, which I think is pretty good at this point, although it could probably be better with further improvements.

Regards,

Carlos

That’s great to hear! Thanks for taking the time to write up your findings.

I’m curious to know how do you measure ingestion rate? Is it from ingestion/events/processed metric?

Yes, you can use metrics that are periodically emitted to determine the ingestion rate.