[druid-user] Number of segments per historical node?

Hi Pritesh,

I ran into this problem as well and it took me quite a while to figure out what was going on. Linux has a limit to the number of memory mapped areas per process vm.max_map_count=65530 by default. When you hit the limit you get out of memory errors that are very unhelpful in diagnosing the problem. I was hitting this limit because the Kafka Indexing Service produces many many small files. For performance reasons this isn’t ideal so I created a hadoop indexing task that runs over the existing index and re-compacts it, so instead of having on the order of 800 files per day I have 1 or 2. If for some reason you really have that many segments of a size you are happy with allocated to your historicals then you can increase the vm.max_map_count setting (http://ask.systutorials.com/1969/maximum-number-of-mmap-ed-ranges-and-how-to-set-it-on-linux), or you need to spin up more Historical nodes.

Regards,

–Ben

Hi Pritesh,

ulimit is something else - that is the number of file handles that a process can have open. It is separate from memory mapped areas.

If you are using the kafka indexing service to ingest data you are almost assuredly creating a bunch of really small shards and small segments (if you are using batch ingestion then compaction is not going to help you). Our process for example may create on the order of 20 different segments which may have 5-300+ shards each even though we segment by day. You can see this in the coordinator console. Click on your data source and view the timeline and look at the number of segments and how many shards there are. If you aren’t seeing exactly 1 segment of 1 shard for each hour then you could compact all the shards & segments into a single one (again assuming that the resulting file size is within an acceptable range).

The documentation is somewhat poor in this area unfortunately. Basically it is a hadoop indexing task. What we do is run a compaction task on data that is 2+ days old (because we have late arriving data and don’t want the real time ingestion to conflict with the batch job) - we also use a day granluarity. What the task is going to do is read from the existing index and rebuild it - which it can do more efficiently now that all the data is present.

{

“type”:“index_hadoop”,

“spec”:{

“ioConfig”:{

“type”:“hadoop”,

“inputSpec”:{

“type”:“dataSource”,

“ingestionSpec”:{

“dataSource”:"<your_data_source_name>",

“intervals”:[

“2016-10-01T00:00:00Z/2016-10-02T00:00:00Z”

],

“granularity”:“day”

}

}

},

“tuningConfig”:{

“type”:“hadoop”,

“partitionsSpec”:{

“targetPartitionSize”:5000000

},

“jobProperties”:{

“mapreduce.job.user.classpath.first”:true,

“mapreduce.map.memory.mb”:4096,

“mapreduce.reduce.memory.mb”:16384,

mapred.job.queue.name”:“druid”

},

“forceExtendableShardSpecs”:true

},

“dataSchema”:{

“dataSource”:"< your_data_source_name >",

“granularitySpec”:{

“type”:“uniform”,

“segmentGranularity”:“day”,

“queryGranularity”:“none”,

“intervals”:[

“2016-10-01T00:00:00Z/2016-10-02T00:00:00Z”

]

},

“parser”:{

“type”:“parquet”,

“parseSpec”:{

“format”:“timeAndDims”,

“timestampSpec”:{

“column”:"<your_timestamp_column>",

“format”:“auto”

},

“dimensionsSpec”:{

“dimensions”:[

<your_dimensions_here>

]

}

}

},

“metricsSpec”:[

<your_metrics_here>

]

}

}

}

Hope this helps,

–Ben

There is a simpler solution. Coordinator has a setting by which you can ask it to auto-merge small segments together. Look into coordinator configuration.

Hi Aseem,

How do you control which segments the Coordinator will auto-merge? I see only a setting for how often it will run. I do not see a way to specify the time window of segments that it should consider, and if it starts running over recent segments it will run into locking issues with my real-time kafka indexing tasks which receive late arriving data.

Thanks,

–Ben

Hi,

Has the coordinator auto merge feature been updated to merge NumberedSpecs
shards based segments?

According to this thread it doesnt/didnt? support it and make it
incompatible with the kafka indexing service resulting segment.
https://groups.google.com/forum/#!topic/druid-user/4t6qEq06Zy4

The index_hadoop task should work but need to be scheduled externally.

Hey guys,

The coordinator auto merge still doesn’t handle extendable shard specs and only operates on unsharded segments. There is a PR to support automated compaction of arbitrary segments using Hadoop here: https://github.com/druid-io/druid/pull/1998 , although progress on the PR seems to have stalled at the moment. Eventually, we will implement support for automated compaction without requiring Hadoop.

Ben,

Wanted to thank you for the response. Not sure why my thread keeps getting deleted. Something is up with google groups…
The vm.max_map_count definitely fixed it for us… Still looking into the other comments.

Thanks,

This is makes sense.

We have an architecture where kafka to druid is basically just for our realtime view. Our historical reporting is processed from kafka => s3=> druid/cassandra.
It is simpler for me to just delete the older irrelevant segments from the realtime database.

I have it on my todos to just create a rule for dropping, I was reading somewhere that Druid has a feature for scheduled segment deletion?

Thanks,

Scheduled segment deletion utilizes the rule mechanism to determine when segments should be marked as ‘unused’. See the druid.coordinator.kill.* properties described here: http://druid.io/docs/0.9.2/configuration/coordinator.html