Ingestion from s3 does not seem to stuck since there are no errors, but it is not progressing

Hi All,

I have been trying to ingest 140GB of parquet data from one s3 folder and it looks like it might be stuck? It has not errored out, but it has been over 12+ hours and the task log keeps stating that it has spent 0% reading and 99% processing

2022-06-25T15:51:50,233 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - at row 0. reading next block
2022-06-25T15:51:50,311 INFO [task-runner-0-priority-0] org.apache.hadoop.io.compress.CodecPool - Got brand-new decompressor [.gz]
2022-06-25T15:51:50,312 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - block read in memory in 79 ms. row count = 9110100
2022-06-25T15:52:42,143 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - Assembled and processed 9110100 records from 2 columns in 51827 ms: 175.77904 rec/ms, 351.55807 cell/ms
2022-06-25T15:52:42,143 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - time spent so far 0% reading (79 ms) and 99% processing (51827 ms)
2022-06-25T15:52:42,143 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - at row 9110100. reading next block
2022-06-25T15:52:42,222 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - block read in memory in 79 ms. row count = 8970100
2022-06-25T15:53:35,297 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - Assembled and processed 18080200 records from 2 columns in 104898 ms: 172.35982 rec/ms, 344.71964 cell/ms
2022-06-25T15:53:35,297 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - time spent so far 0% reading (158 ms) and 99% processing (104898 ms)
2022-06-25T15:53:35,297 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - at row 18080200. reading next block
2022-06-25T15:53:35,373 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - block read in memory in 76 ms. row count = 8670100
2022-06-25T15:54:30,084 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - Assembled and processed 26750300 records from 2 columns in 159604 ms: 167.6042 rec/ms, 335.2084 cell/ms
2022-06-25T15:54:30,084 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - time spent so far 0% reading (234 ms) and 99% processing (159604 ms)
2022-06-25T15:54:30,084 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - at row 26750300. reading next block
2022-06-25T15:54:30,161 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - block read in memory in 77 ms. row count = 8650100
2022-06-25T15:55:24,763 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - Assembled and processed 35400400 records from 2 columns in 214202 ms: 165.26643 rec/ms, 330.53287 cell/ms
2022-06-25T15:55:24,763 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - time spent so far 0% reading (311 ms) and 99% processing (214202 ms)
2022-06-25T15:55:24,763 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - at row 35400400. reading next block
2022-06-25T15:55:24,840 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - block read in memory in 77 ms. row count = 8610100
2022-06-25T15:56:19,362 INFO [task-runner-0-priority-0] org.apache.parquet.hadoop.InternalParquetRecordReader - Assembled and processed 44010500 records from 2 columns in 268720 ms: 163.77829 rec/ms, 327.55658 cell/ms

There are a lot of these parquet files in that directory. If I ingest one or two of the parquet files directly using uris instead of the entire folder prefix, it will ingest them fine.

I am running a 4 node cluster with Druid 0.22.1 on EC2 with the recommend machines specs i.e. 2x Data servers @i3.4xlarge and 1 Master Server @m5.2xlarge and 1 Query Server @m5.2xlarge.

This is the ingestion spec that I am using to try and ingest 1 day:

{
  "type": "index_parallel",
  "spec": {
    "ioConfig": {
      "type": "index_parallel",
      "inputSource": {
        "type": "s3",
        "prefixes": ["s3://mybucket/dt=2022-05-26/"]
      },
      "inputFormat": {
        "type": "parquet"
      }
    },
    "tuningConfig": {
      "type": "index_parallel",
      "partitionsSpec": {
        "type": "dynamic"
      }
    },
    "dataSchema": {
      "dataSource": "segments_rollup",
      "timestampSpec": {
        "column": "!!!_no_such_column_!!!",
        "missingValue": "2022-05-26T00:00:00Z"
      },
      "transformSpec": {},
      "metricsSpec" : [
        { "type" : "thetaSketch", "name" : "auid_sketch", "fieldName" : "audigent_id", "isInputThetaSketch": false, "size": 16384}
      ],
      "dimensionsSpec": {
        "dimensions": [
          "segment_id"
        ]
      },
      "granularitySpec": {
        "queryGranularity": "day",
        "rollup": true,
        "segmentGranularity": "day"
      }
    }
  }
}

I have the following in my configs –
Middlemanager runtime.properties:

druid.service=druid/middleManager
druid.plaintextPort=8091

# Number of tasks per middleManager
druid.worker.capacity=8

# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xms40g -Xmx100g -XX:MaxDirectMemorySize=60g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=var/druid/task

# HTTP server threads
druid.server.http.numThreads=60

# Processing threads and buffers on Peons
druid.indexer.fork.property.druid.processing.numMergeBuffers=2
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=1GiB
druid.indexer.fork.property.druid.processing.numThreads=1

# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp

Indexer runtime.properties:

druid.service=druid/indexer
druid.plaintextPort=8091

# Number of tasks per indexer
druid.worker.capacity=4

# Task launch parameters
druid.indexer.task.baseTaskDir=var/druid/task

# HTTP server threads
druid.server.http.numThreads=60

# Processing threads and buffers on Indexer
druid.processing.numMergeBuffers=2
druid.processing.buffer.sizeBytes=1GiB
druid.processing.numThreads=4

# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp

Historical jvm.config:

-server
-Xms12g
-Xmx24g
-XX:MaxDirectMemorySize=13g
-XX:+ExitOnOutOfMemoryError
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

Historical runtime.properties:

druid.service=druid/historical
druid.plaintextPort=8083

# HTTP server threads
druid.server.http.numThreads=60

# Processing threads and buffers
druid.processing.buffer.sizeBytes=1GiB
druid.processing.numMergeBuffers=4
druid.processing.numThreads=7
druid.processing.tmpDir=var/druid/processing

# Segment storage
druid.segmentCache.locations=[{"path":"var/druid/segment-cache","maxSize":"300g"}]

# Query cache
druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true
druid.cache.type=caffeine
druid.cache.sizeInBytes=1Gib

So I was able to improve the bottleneck by setting the tuning config in my ingestion spec for maxNumConcurrentSubTasks. I guess by default it set to 1 so I increased it to 4 and ingestion was completed with in 4 hours. I further bumped it up to 100 and my ingestion finished with in 1 hour. I also tuned down the xmx and directMemory for middle manager after using this tuning config.

4 Likes

Looks like we need to get some better docs and tutorials on scaling ingestion… it keeps coming up…