Parquet Native Batch Ingestion is Too Slow (Druid 0.18.0)

Hi All,

I already asked the same questions in this thread but my configuration evolved so much that I now have a different cluster tuning.

Also, most of the Druid parquet problems I found on the net were asked years ago.

The Basic Cluster tuning page doesn’t help me much either since it arbitrarily gives you some values to set without explaining what it does.

Apologies in advance if this is considered as spam.

The Problem:

It seems that when ingesting parquet files stored in S3 using index_parallel Native Batch Ingestion, it is quite slow.

Details:

  • I want to ingest 8k+ parquet files, totaling to around 1.5 Billion rows

  • I want to segment this by DAY and each day might have a million rows due to 7-8 high-cardinality dimensions

  • Some of the single_phase_sub_task fails due to GC overhead limit or it has exceeded its direct memory

  • For successful single_phase_sub_task, it takes 2 hours to finish but only ingesting 24-30 parquet files as per payload
    My Cluster Setup:

  • 3x c3.8xlarge spot instances as MiddleManager/Historical nodes

  • 1x r5.4xlarge as the Router, Coordinator, Broker node
    Some insights from the previous thread:

  • I initially configured the MM/Historical nodes to have a 3-worker capacity; but looking at the indexer fork JVM properties, I allocated too much heap and direct memory which forced me to set it at 3

  • It was noted that there is a potential 15 workers per MM node; But this is not feasible due to the heap size and direct memory required to be able for a single_phase_sub_task to finish successfully
    My Current Indexer configuration:

druid_indexer_runner_javaOptsArray=["-server", “-Xmx1g”, “-Xms1g”, “-XX:MaxDirectMemorySize=4g”, “-Duser.timezone=UTC”, “-Dfile.encoding=UTF-8”, “-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager”]

druid_indexer_fork_property_druid_processing_buffer_sizeBytes=100000000

druid_indexer_fork_property_druid_processing_numMergeBuffers=2

druid_indexer_fork_property_druid_processing_numThreads=1

druid_worker_capacity=9

``

You can see that I increased the direct memory to 4GB, but I am still encountering GC overhead limit errors given this setup. The reason why I set the worker capacity to 9 since each worker was allowed 5GB (1GB heap size + 4GB Direct Memory) to spend and I can only allot 45GB out of the 60GB RAM per node.

My Questions:

  • Are there TuningConfig, PartitionConfig that I should tweak?
  • How can I speed up the ingestion process? The setup above allows each worker to produce 600 row segment before spilling to disk (due to 1/6 of 100MB)
  • Is my cluster still too small for the data I want to ingest?
    I would really appreciate detailed answers and would explain to me how each JVM config does (I am not really familiar with how JVM works).

I already spent countless hours reading the configuration reference and basic cluster tuning page to no avail.

Sorry for demanding too much but I already spent hours in trial and error just to make the cluster working for our use case.

Update:

After spending countless hours trying to ingest Parquet datasets, I gave up and just regenerated the data in CSV output.

It now works fine with ~30mins spent for each sub phase task.

I guess Druid’s parquet ingestion process isn’t optimized yet.

Hi Von,

thank you for the detailed report. AFAIK, the only difference between ingesting Parquet and CSV is that ingesting Parquet requires for the task to fetch each file from s3 to local disk before starting ingestion. However, the task can read CSV files directly from S3. I don’t think this can make such a big difference in ingestion performance unless your local disk is somehow very slow.

Flushed in-memory data for segment[my_dataset_2018-02-05T00:00:00.000Z_2018-02-06T00:00:00.000Z_2020-05-09T11:41:30.626Z] spill[1312] to disk in [93] ms (653 rows).

Looking at the previous thread, this seems suspicious. 653 rows per spill is pretty low and frequent spills can make ingestion slow. Do you still see small rows per spill in the task logs?

Regarding OOM, there could be roughly two cases which can cause OOM. One is that maxRowsInMemory or maxBytesInMemory is set higher than the JVM max memory. Another is that the task created too many partial segments which are spilled on disk. When the task publishes segments, it first merges spilled partial segments if they belong to the same segment. The merge needs more memory as it merges more partial segments. If you still see the above logs about small number of rows per spill, it could mean that the task spilled a lot of partial segments which need to be merged later. Can you check the task logs whether this is what happened?

Thanks,

Jihoon

the only difference between ingesting Parquet and CSV is that ingesting Parquet requires for the task to fetch each file from s3 to local disk before starting ingestion. However, the task can read CSV files directly from S3.

Is it possible that compression and how the data is structured has something to do with this? For both CSV and Parquet versions, I used Spark to generate them with GZIP compression.

Between the 2 formats, CSV still has better ingestion speed and performance despite having the same number of rows per spill (~600).

Each CSV ingestion task finishes successfully around 15-30mins while Parquet tasks fails after about an hour. Just imagine how frustrating that is when testing ingestion using the Parquet version, haha.

653 rows per spill is pretty low and frequent spills can make ingestion slow.

If I set my MaxBytesInMemory parameter higher than the default 1/6 of JVM memory, the number of rows spilled to disk increases.

Given that I allocated 1GB heap per task, if I set MaxBytesinMemory to 700MB, the rows per spill increased to around 2k.

The merge needs more memory as it merges more partial segments. If you still see the above logs about small number of rows per spill, it could mean that the task spilled a lot of partial segments which need to be merged later. Can you check the task logs whether this is what happened?

That’s weird. As presented, given same cluster configurations (and ~600 rows per spill), the ingestion succeeds using the CSV version of the data. So I am not quite sure if the memory errors I am encountering has something to do with the quantity of partial segments to merge.

Each CSV ingestion task finishes successfully around 15-30mins while Parquet tasks fails after about an hour. Just imagine how frustrating that is when testing ingestion using the Parquet version, haha.

I agree. This is really not easy to understand what’s happening behind. Maybe we could improve some log messages for better debugging…

Anyway, I think this is because of a side effect of better compression ratio with Parquet format. In Druid 0.18.0, native batch ingestion supports size-based split hint spec (https://druid.apache.org/docs/latest/ingestion/native-batch.html#size-based-split-hint-spec) which assigns input files to sub tasks based on their total size. Since Parquet supports efficient compression due to its columnar nature, I guess the size of each input file is smaller when it’s written in Parquet format compared to CSV. This means, each sub task will be likely assigned to process more rows (since split hint spec is based on the total size of input files) which may require different settings from CSV. I guess your ingestion could be better if you set maxSplitSize to something lower than default (500 MB) in your splitHintSpec. I’m not sure what could be an optimal number, but you may start with 100 MB.

Also, for your JVM memory configuration for tasks, I would recommend to increase the max heap size and reduce the max direct memory size. The direct memory is mostly used for realtime query processing on stream data. If you use only batch ingestion, 1 or 2 GB of direct memory would be enough. However, the task usually needs more heap memory for creating and merging segments. I would say 2 GB for max heap size would be a good start and you can adjust it based on your observation.

Thanks for the insights Jihoon! Will try to allocate more memory for heap and see if it works :slight_smile: