Main Factors of Data Ingestion (S3)

I have a quite large group of parquet files stored in S3 dataset. I want to ingest this into Druid. The parquet files sums up to about 1.5 billion rows.

Now, my problem is that the ingestion process takes forever, I am already 3 hours in and it seems it’s not done yet.

What could I tweak in my cluster so that I can make the ingestion process faster?

My cluster setup has 2x c3.8xlarge (32cores/60GB RAM) as the Historical/MiddleManager servers.

I made some changes in the config file, but alas, it’s still a bit slower than I anticipated.

Here are my current configs for indexer:

druid_indexer_runner_javaOptsArray=["-server", “-Xmx12g”, “-Xms1g”, “-XX:MaxDirectMemorySize=6g”, “-Duser.timezone=UTC”, “-Dfile.encoding=UTF-8”, “-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager”]

druid_indexer_fork_property_druid_processing_buffer_sizeBytes=1000000000

druid_indexer_fork_property_druid_processing_numMergeBuffers=2

druid_indexer_fork_property_druid_processing_numThreads=2

druid.worker.capacity 3

One thing to note with my ingestion script is that I increase the *maxBytesInMemory *to around 11GB. This somehow made my ingestion a bit faster according to logs since it reports of higher number of rows (around 40k) when spilling to disk.

Is it correct to assume that maxBytesInMemory relies on the indexer heap size (-Xmx)? And what it does is it collects 11GB worth of input data and aggregate it before spilling to disk?

Would appreciate if someone can guide me on what should I do to optimize my ingestion process.

I’ve read also that SplitHintSpec, might be able to help on this since at default, each task only processes around 500MB worth of input data. (https://druid.apache.org/docs/latest/ingestion/native-batch.html#split-hint-spec)

I find it weird though since despite having that configuration, it takes far too long for each sub task to process 500MB worth of parquet data. If I increase SplitHintSpec, would it make a difference?

You have set the worker capacity to 3. Is that tweak your memory? You basically have only 6 workers across two nodes where you could potentially have 30 (32-2/2 in each nodes; assuming you need even distribution of cores between middlemanagers and historicals)

I would recommend splitting historical and middle manager nodes and scaling up middle managers as you need.

This is a good idea. Would this mean I have to set 1 thread per worker and adjust my heap and direct memory sizing accordingly right?

Thanks for this tip!

Another question that I have is this log:

** Flushed in-memory data for segment[my_dataset_2018-02-05T00:00:00.000Z_2018-02-06T00:00:00.000Z_2020-05-09T11:41:30.626Z] spill[1312] to disk in [93] ms (653 rows).**

Does this mean that the worker was only able spill from the input file 653 rows? Or was it able to rollup / aggregate data (worth maxBytesInMemory) and produce 653 AGGREGATED rows?

I ended up having 20 workers since I need to account for direct memory errors. I increased each worker to have 6GB direct memory.

I am not familiar with Java memory consumption though, is it safe to assume that direct memory isn’t something that each worker takes up on the onset? Because with this setup, it might take up the whole RAM of each node.

Am I correct that it is just the “allowable” direct memory they can consume in the case they need it?

Yeah, that’s correct. I strongly encourage going through the config reference guide. They specify the cadence of all parameters you need to tune your JVM. Typically, as a rule of thumb, set aside one core for zookeeper and work with remaining cores.