Kafka index task never check TaskDuration

Hi all,

We have used kafka indexing service for a long time. Now some tasks are very weird. They never begin to Handoff.

And I check the overlord log, finding it never check TaskDuration. The log did not include logs like “has run for”, but have “shutdown”.

Is anyone had the same problem, please help me.

Thank you very much,

Xinxin

Hey Xinxin,

Can you post your overlord logs?

Hey Xinxin,

I’m sorry, I’m still confused what issue you’re seeing. In the logs you posted, I see a datasource ‘006’ which looks to have a taskDuration of 4 hours, and another datasource ‘001’ which has a taskDuration of 1 hour. For datasource ‘006’, 1 hour later, it allocates a new set of segments (because segmentGranularity is 1 hour) but it won’t begin handoff yet because taskDuration of 4 hours has not been reached yet.

I’m guessing that the confusion is the expectation that the task will handoff segments each hour and continue on, which is not what will happen. Tasks currently only hand off their segments at the end of their lifetime, which in the case of the KafkaIndexTask is at taskDuration.

I’m also guessing that the reason you’re trying to do this is to get around the spike in ingestion which happens during the transition between tasks which you posted about in another thread. How long are your indexing tasks taking to start up? The supervisor only generates a new set of tasks once it has told the previous set to stop reading (because it needs to know where to tell the new ones to start reading from) and it usually takes a few seconds for tasks to start up. Is this few seconds of startup time problematic?

Also, is there a particular reason you have 10 replicas? That is an extremely high number of replicas.

Hi David,

The problem seems to be the ScheduleExecutorService. scheduleAtFixedRate() not handle the Exception in 0.9.2.

And in 0.9.3, the method handle() in class RunNotice throws Exception. Is try and catch better?

If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. If any execution of this task akes longer than its period, then subsequent executions may start late, but will not concurrently execute.

scheduledExec.scheduleAtFixedRate(
    buildRunTask(),
    ioConfig.getStartDelayx().getMillis(),
    Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
    TimeUnit.MILLISECONDS
);

Code in 0.9.3

在 2016年12月6日星期二 UTC+8上午3:20:01,David Lim写道:

Hey David,

I’m sorry to post the wrong log yesterday. Since we used to solve the problem that task could not stop by post the supervisor spec, but the post might cause another problem like https://groups.google.com/forum/#!topic/druid-user/jRwLhzNvzhs. So we change “006” taskDuration from 1 hour to 4 hours, reducing the times of checking taskDuration.

The problem logs are as follows. Besides allocate pending segments, the logs are all about query from broker until shutdown.

2016-11-28T21:34:24,270 INFO [KafkaSupervisor-006] io.druid.indexing.kafka.supervisor.KafkaSupervisor -

Task group [0] has run for [PT3600S]

2016-11-28T21:34:24,271 INFO [KafkaIndexTaskClient-006-4] com.metamx.http.client.pool.ChannelResourceFac

tory - Generating: http://bi-druid-middlemanager13:8102

2016-11-28T21:34:24,278 INFO [KafkaSupervisor-006-Worker-0] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Setting endOffsets for tasks in taskGroup [0] to {0=758329153, 1=758264055, 2=758347387, 3=758258938, 4=758342665, 5=758289665, 6=758341461, 7=758245075, 8=758406731, 9=758285758} and resuming

2016-11-28T21:34:24,285 INFO [KafkaSupervisor-006] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Creating new task group [0] for partitions [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

2016-11-28T21:34:24,285 INFO [KafkaSupervisor-006] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Number of tasks [0] does not match configured numReplicas [10] in task group [0], creating more tasks

[ The tasks in taskgroup “index_kafka_006_de67f5b958a75a3” started at 2016-11-28T21:34:24.285. ]

2016-11-28T21:34:24,285 INFO [KafkaSupervisor-006] io.druid.indexing.overlord.MetadataTaskStorage - Inserting task index_kafka_006_de67f5b958a75a3_mpjpkapn with status: TaskStatus{id=index_kafka_006_de67f5b958a75a3_mpjpkapn, status=RUNNING, duration=-1}

2016-11-28T21:34:24,286 INFO [KafkaSupervisor-006] io.druid.indexing.overlord.TaskLockbox - Adding task[index_kafka_006_de67f5b958a75a3_mpjpkapn] to activeTasks

2016-11-28T21:34:24,286 INFO [TaskQueue-Manager] io.druid.indexing.overlord.TaskQueue - Asking taskRunner to run: index_kafka_006_de67f5b958a75a3_mpjpkapn

2016-11-28T21:34:24,286 INFO [TaskQueue-Manager] io.druid.indexing.overlord.RemoteTaskRunner - Added pending task index_kafka_006_de67f5b958a75a3_mpjpkapn

…(adding task)

2016-11-28T21:36:44,134 INFO [Curator-PathChildrenCache-0] io.druid.indexing.overlord.MetadataTaskStorage - Deleting TaskLock with id[75891]: TaskLock{groupId=index_kafka_006, dataSource=006, interval=2016-11-28T21:00:00.000Z/2016-11-28T22:00:00.000Z, version=2016-11-28T13:00:00.909Z}

2016-11-28T21:36:44,135 INFO [Curator-PathChildrenCache-0] io.druid.indexing.overlord.MetadataTaskStorage - Updating task index_kafka_006_8531674e07f22fe_ofikbnnm to status: TaskStatus{id=index_kafka_006_8531674e07f22fe_ofikbnnm, status=SUCCESS, duration=3744754}

2016-11-28T21:36:44,135 INFO [Curator-PathChildrenCache-0] io.druid.indexing.overlord.TaskQueue - Task done: KafkaIndexTask{id=index_kafka_006_8531674e07f22fe_ofikbnnm, type=index_kafka, dataSource=006}

2016-11-28T21:36:44,135 INFO [Curator-PathChildrenCache-0] io.druid.indexing.overlord.TaskQueue - Task SUCCESS: KafkaIndexTask{id=index_kafka_006_8531674e07f22fe_ofikbnnm, type=index_kafka, dataSource=006} (3744754 run duration)

2016-11-28T21:36:44,135 INFO [Curator-PathChildrenCache-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_006_8531674e07f22fe_ofikbnnm] status changed to [SUCCESS].

2016-11-28T21:36:44,136 INFO [Curator-PathChildrenCache-0] io.druid.indexing.overlord.RemoteTaskRunner - Task[index_kafka_006_8531674e07f22fe_ofikbnnm] went bye bye.

(previous taskgroup “006_8531674e07f22fe” SUCCESS)

2016-11-28T21:37:08,480 INFO [KafkaIndexTaskClient-006-1] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://bi-druid-middlemanager15:8101

2016-11-28T21:37:08,480 INFO [KafkaIndexTaskClient-006-9] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://bi-druid-middlemanager04:8100

2016-11-28T21:37:08,480 INFO [KafkaIndexTaskClient-006-9] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://bi-druid-middlemanager04:8100

…(all about query from broker, no supervisor logs)

2016-11-28T22:00:00,000 INFO [KafkaIndexTaskClient-006-9] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://bi-druid-middlemanager04:8100

2016-11-28T22:00:00,050 INFO [qtp932312334-182] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_kafka_006_de67f5b958a75a3_ilealkhb]: SegmentAllocateAction{dataSource=‘006’, timestamp=2016-11-28T22:00:00.000Z, queryGranularity=NoneGranularity, preferredSegmentGranularity=HOUR, sequenceName=‘index_kafka_006_de67f5b958a75a3_1’, previousSegmentId=‘006_2016-11-28T21:00:00.000Z_2016-11-28T22:00:00.000Z_2016-11-28T13:00:00.909Z_18’}

…(allocate new pending segments)

2016-11-28T22:00:38,605 INFO [KafkaIndexTaskClient-006-5] com.metamx.http.client.pool.ChannelResourceFac

tory - Generating: http://bi-druid-middlemanager15:8101

…(all about query from broker, no supervisor logs)

2016-11-28T22:59:38,914 INFO [KafkaIndexTaskClient-006-6] com.metamx.http.client.pool.ChannelResourceFac

tory - Generating: http://bi-druid-middlemanager07:8101

2016-11-28T23:00:00,039 INFO [qtp932312334-191] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[inde

x_kafka_006_de67f5b958a75a3_ajlmikgd]: SegmentAllocateAction{dataSource=‘006’, timestamp=2016-11-28T23:00:00.000Z, queryGranularity=NoneGranularity, preferredSegmentGranularity=HOUR, sequenceName='index_kafka

_006_de67f5b958a75a3_2’, previousSegmentId=‘006_2016-11-28T22:00:00.000Z_2016-11-28T23:00:00.000Z_2016-11-28T14:00:01.391Z_1’}

…(allocate new pending segments)

So there is a loop.

for(true){

if(a new hour begins){

allocate pending segments;

}

response query from broker, and no supervisor logs;

}

Until we posted a supervisor spec, the tasks finally shutdown.

2016-11-29T09:00:00,028 INFO [qtp932312334-170] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[inde

x_kafka_006_de67f5b958a75a3_fapfmcol]: SegmentAllocateAction{dataSource=’

_006’, timestamp=2016-11-29T09:00:00.000Z, queryGranularity=NoneGranularity, preferredSegmentGranularity=HOUR, sequenceName='index_kafka

_006_de67f5b958a75a3_3’, previousSegmentId='006_2016-11-29T08:00:00.000Z

_2016-11-29T09:00:00.000Z_2016-11-29T00:00:01.296Z_1’}

…(allocate pending segments)

2016-11-29T09:00:22,386 INFO [qtp932312334-178] io.druid.indexing.overlord.TaskLockbox - Task[index_kafka_006_de67f5b958a75a3_jnmckmgj] already present in TaskLock[index_kafka_006]

2016-11-29T09:00:22,574 INFO [qtp932312334-178] io.druid.metadata.IndexerSQLMetadataStorageCoordinator - Found existing pending segment

[006_2016-11-29T09:00:00.000Z_2016-11-29T10:00:00.000Z_2016-11-29T01:00:01.276Z_9] for sequence[index_ka

fka_006_de67f5b958a75a3_4] (previous = [006_2016-11-29T08:00:00.000Z_201

6-11-29T09:00:00.000Z_2016-11-29T00:00:01.296Z]) in DB

2016-11-29T09:00:42,060 INFO [KafkaIndexTaskClient-006-7] com.metamx.http.client.pool.ChannelResourceFac

tory - Generating: http://bi-druid-middlemanager15:8101

…(response query from broker, no supervisor logs)

2016-11-29T09:51:12,326 INFO [KafkaIndexTaskClient-006-7] com.metamx.http.client.pool.ChannelResourceFac

tory - Generating: http://bi-druid-middlemanager20:8100

2016-11-29T09:51:40,896 INFO [qtp932312334-164] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Beginning shutdown of KafkaSupervis or[006] [because of posting supervisor spec]

2016-11-29T09:51:40,898 INFO [qtp932312334-164] io.druid.indexing.overlord.RemoteTaskRunner - Unregistered listener [KafkaSupervisor-006]

2016-11-29T09:51:42,326 INFO [KafkaIndexTaskClient-006-6] com.metamx.http.client.pool.ChannelResourceFac

tory - Generating: http://bi-druid-middlemanager04:8100

…(response query from broker)

2016-11-29T09:52:42,335 INFO [KafkaIndexTaskClient-006-2] com.metamx.http.client.pool.ChannelResourceFactory - Generating: http://bi-druid-middlemanager28:8100

2016-11-29T09:53:00,904 INFO [qtp932312334-164] io.druid.indexing.kafka.supervisor.KafkaSupervisor - KafkaSupervisor[006] has stopped

2016-11-29T09:53:00,904 WARN [KafkaSupervisor-006-Worker-0] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Exception while stopping task

2016-11-29T09:53:00,905 ERROR [KafkaSupervisor-006] io.druid.indexing.kafka.supervisor.KafkaSupervisor - KafkaSupervisor[006] failed to handle notice: {class=io.druid.indexing.kafka.supervisor.KafkaSupervisor, exceptionType=class java.lang.InterruptedException, exceptionMessage=null, noticeClass=RunNotice}

2016-11-29T09:53:01,144 INFO [KafkaSupervisor-006] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Task group [0] has run for [PT3600S]

2016-11-29T09:53:01,144 ERROR [KafkaSupervisor-006] io.druid.indexing.kafka.supervisor.KafkaSupervisor - KafkaSupervisor[006] failed to handle notice: {class=io.druid.indexing.kafka.supervisor.KafkaSupervisor, exceptionType=class java.util.concurrent.RejectedExecutionException, exceptionMessage=Task com.google.common.util.concurrent.ListenableFutureTask@48e82735 rejected from java.util.concurrent.ThreadPoolExecutor@5948e5f1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 47342], noticeClass=RunNotice}

2016-11-29T09:53:01,148 INFO [KafkaSupervisor-006] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Task group [0] has run for [PT3600S] [obviously taskgroup"006_de67f5b958a75a3" run more than 12 hours from 2016-11-28T21:34:24]

2016-11-29T09:53:01,148 ERROR [KafkaSupervisor-006] io.druid.indexing.kafka.supervisor.KafkaSupervisor - KafkaSupervisor[006] failed to handle notice: {class=io.druid.indexing.kafka.supervisor.KafkaSupervisor, exceptionType=class java.util.concurrent.RejectedExecutionException, exceptionMessage=Task com.google.common.util.concurrent.ListenableFutureTask@51309da7 rejected from java.util.concurrent.ThreadPoolExecutor@5948e5f1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 47342], noticeClass=GracefulShutdownNotice}

2016-11-29T10:00:00,035 INFO [qtp932312334-221] io.druid.indexing.common.actions.LocalTaskActionClient - Performing action for task[index_kafka_006_de67f5b958a75a3_jnmckmgj]: SegmentAllocateAction{dataSource=‘006’, timestamp=2016-11-29T10:00:00.000Z, queryGranularity=NoneGranularity, preferredSegmentGranularity=HOUR, sequenceName=‘index_kafka_006_de67f5b958a75a3_5’, previousSegmentId=‘006_2016-11-29T09:00:00.000Z_2016-11-29T10:00:00.000Z_2016-11-29T01:00:01.276Z_8’}

…(allocate new pending segments)

2016-11-29T10:00:22,531 INFO [qtp932312334-210] io.druid.metadata.IndexerSQLMetadataStorageCoordinator - Found existing pending segment [006_2016-11-29T10:00:00.000Z_2016-11-29T11:00:00.000Z_2016-11-29T02:00:01.263Z_8] for sequence[index_kafka_006_de67f5b958a75a3_4] (previous = [006_2016-11-29T09:00:00.000Z_2016-11-29T10:00:00.000Z_2016-11-29T01:00:01.276Z_9]) in DB

2016-11-29T10:09:03,427 INFO [qtp932312334-193] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Created worker pool with [1] threads for dataSource [006]

2016-11-29T10:09:03,427 INFO [qtp932312334-193] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Created taskClient with dataSource[006] chatThreads[10] httpTimeout[PT10S] chatRetries[8]

2016-11-29T10:09:03,432 INFO [qtp932312334-193] io.druid.indexing.kafka.supervisor.KafkaSupervisor - Started KafkaSupervisor[006], first run in [PT5S], with spec: [KafkaSupervisorSpec{dataSchema=DataSchema{dataSource=‘006’, parser={parseSpec={dimensionsSpec={dimensionExclusions=, dimensions=[db, table, optype, area, type, driver_type, _status, channel, product_id, order_status, require_level, f_order_status, t_order_status, combo_type], spatialDimensions=}, format=json, timestampSpec={column=timestamp, format=auto}}, type=string}, aggregators=[CountAggregatorFactory{name=‘count’}, LongSumAggregatorFactory{fieldName=‘bouns’, name=‘bonus_sum’}, LongSumAggregatorFactory{fieldName=‘driver_start_distance’, name=‘driver_start_distance_sum’}], granularitySpec=io.druid.segment.indexing.granularity.UniformGranularitySpec@1c5396aa}, tuningConfig=KafkaSupervisorTuningConfig{maxRowsInMemory=75000, maxRowsPerSegment=5000000, intermediatePersistPeriod=PT10M, basePersistDirectory=/home/tmp/1480385343425-0, maxPendingPersists=0, indexSpec=io.druid.segment.IndexSpec@551286c0, buildV9Directly=false, reportParseExceptions=false, handoffConditionTimeout=0, workerThreads=null, chatThreads=null, chatRetries=8, httpTimeout=PT10S, shutdownTimeout=PT80S}, ioConfig=KafkaSupervisorIOConfig{topic=‘fmt’, replicas=10, taskCount=1, taskDuration=PT3600S, consumerProperties={bootstrap.servers=bi-samza00:9092,]}, startDelay=PT5S, period=PT30S, useEarliestOffset=false, completionTimeout=PT86400S, lateMessageRejectionPeriod=Optional.of(PT172800S)}}]

2016-11-29T10:09:08,433 INFO [KafkaSupervisor-006] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_internal_canal_delay_metrics_d04656280ee9bc3_ieilbhlc] location changed to [TaskLocation{host=‘bi-druid-middlemanager23’, port=8109}].

2016-11-29T10:09:08,433 INFO [KafkaSupervisor-006] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_internal_canal_delay_metrics_d04656280ee9bc3_pbemeljn] location changed to [TaskLocation{host=‘bi-druid-middlemanager04’, port=8110}].

2016-11-29T10:09:08,433 INFO [KafkaSupervisor-006] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_internal_druid_cache_metrics_44f95883f467ba5_dkhkomab] location changed to [TaskLocation{host=‘bi-druid-middlemanager25’, port=8108}].

2016-11-29T10:09:08,433 INFO [KafkaSupervisor-006] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_internal_druid_cache_metrics_44f95883f467ba5_ekkcpnak] location changed to [TaskLocation{host=‘bi-druid-middlemanager08’, port=8110}].

2016-11-29T10:09:08,433 INFO [KafkaSupervisor-006] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_kafka_internal_druid_customize_metrics_f77c3f1ce971560_epdlpmik] location changed to [TaskLocation{host=‘bi-druid-middlemanager11’, port=8109}].

The normal logs have supervisor information besides response to query from broker. But the logs did not have the supervisor information like

2016-11-28T08:10:22,423 INFO [KafkaSupervisor-006] io.druid.indexing.kafka.supervisor.KafkaSupervisor - {id=‘006’, generationTime=2016-11-28T00:10:22.423Z, payload={dataSource=‘006’, topic=‘fmt’, partitions=10, replicas=10, durationSeconds=3600, active=[{id=‘index_kafka_006_94c74d0ff899ba0_jljfkboc’, startTime=2016-11-27T23:33:18.648Z, remainingSeconds=1376}, {id=‘index_kafka_006_94c74d0ff899ba0_koimehmk’, startTime=2016-11-27T23:33:18.832Z, remainingSeconds=1376}, {id=‘index_kafka_006_94c74d0ff899ba0_ggocblgg’, startTime=2016-11-27T23:33:18.565Z, remainingSeconds=1376}, {id=‘index_kafka_006_94c74d0ff899ba0_oflnfcpa’, startTime=2016-11-27T23:33:18.906Z, remainingSeconds=1376}, {id=‘index_kafka_006_94c74d0ff899ba0_alhfoieg’, startTime=2016-11-27T23:33:18.677Z, remainingSeconds=1376}, {id=‘index_kafka_006_94c74d0ff899ba0_kblihdbo’, startTime=2016-11-27T23:33:18.581Z, remainingSeconds=1376}, {id=‘index_kafka_006_94c74d0ff899ba0_kknkhclb’, startTime=2016-11-27T23:33:18.629Z, remainingSeconds=1376}, {id=‘index_kafka_006_94c74d0ff899ba0_olaphijf’, startTime=2016-11-27T23:33:18.435Z, remainingSeconds=1376}, {id=‘index_kafka_006_94c74d0ff899ba0_oidphoof’, startTime=2016-11-27T23:33:18.768Z, remainingSeconds=1376}, {id=‘index_kafka_006_94c74d0ff899ba0_calgefmb’, startTime=2016-11-27T23:33:18.704Z, remainingSeconds=1376}], publishing=}}

The related data on druid_supervisor table are as follows.

556 | 006 | 2016-11-29T11:26:05.049Z | {“type”:“kafka”,“dataSchema”:{“dataSource”:“006”,“parser”:{“parseSpec”:{“dimensionsSpec”:{“dimensionExclusions”:,“dimensions”:[“db”,“table”,“optype”],“spatialDimensions”:},“format”:“json”,“timestampSpec”:{“column”:“timestamp”,“format”:“auto”}},“type”:“string”},“metricsSpec”:[{“type”:“count”,“name”:“count”}],“granularitySpec”:{“type”:“uniform”,“segmentGranularity”:“HOUR”,“queryGranularity”:{“type”:“none”},“rollup”:true,“intervals”:null}},“tuningConfig”:{“type”:“kafka”,“maxRowsInMemory”:75000,“maxRowsPerSegment”:5000000,“intermediatePersistPeriod”:“PT10M”,“basePersistDirectory”:"/home/tmp/1480389965004-0",“maxPendingPersists”:0,“indexSpec”:{“bitmap”:{“type”:“concise”},“dimensionCompression”:“lz4”,“metricCompression”:“lz4”,“longEncoding”:“longs”},“buildV9Directly”:false,“reportParseExceptions”:false,“handoffConditionTimeout”:0,“workerThreads”:null,“chatThreads”:null,“chatRetries”:8,“httpTimeout”:“PT10S”,“shutdownTimeout”:“PT80S”},“ioConfig”:{“topic”:“fmt”,“replicas”:10,“taskCount”:1,“taskDuration”:“PT14400S”,“consumerProperties”:{“bootstrap.servers”:“bi-samza00:9092”},“startDelay”:“PT5S”,“period”:“PT30S”,“useEarliestOffset”:false,“completionTimeout”:“PT86400S”,“lateMessageRejectionPeriod”:“PT172800S”}} |

555 | 006 | 2016-11-29T10:27:13.947Z | {“type”:“kafka”,“dataSchema”:{“dataSource”:“006”,“parser”:{“parseSpec”:{“dimensionsSpec”:{“dimensionExclusions”:,“dimensions”:[“db”,“table”,“optype”],“spatialDimensions”:},“format”:“json”,“timestampSpec”:{“column”:“timestamp”,“format”:“auto”}},“type”:“string”},“metricsSpec”:[{“type”:“count”,“name”:“count”}],“granularitySpec”:{“type”:“uniform”,“segmentGranularity”:“HOUR”,“queryGranularity”:{“type”:“none”},“rollup”:true,“intervals”:null}},“tuningConfig”:{“type”:“kafka”,“maxRowsInMemory”:75000,“maxRowsPerSegment”:5000000,“intermediatePersistPeriod”:“PT10M”,“basePersistDirectory”:"/home/tmp/1480386433947-0",“maxPendingPersists”:0,“indexSpec”:{“bitmap”:{“type”:“concise”},“dimensionCompression”:“lz4”,“metricCompression”:“lz4”,“longEncoding”:“longs”},“buildV9Directly”:false,“reportParseExceptions”:false,“handoffConditionTimeout”:0,“workerThreads”:null,“chatThreads”:null,“chatRetries”:8,“httpTimeout”:“PT10S”,“shutdownTimeout”:“PT80S”},“ioConfig”:{“topic”:“fmt”,“replicas”:10,“taskCount”:1,“taskDuration”:“PT3600S”,“consumerProperties”:{“bootstrap.servers”:“bi-samza00:9092”},“startDelay”:“PT5S”,“period”:“PT30S”,“useEarliestOffset”:false,“completionTimeout”:“PT86400S”,“lateMessageRejectionPeriod”:“PT172800S”}} |

554 | 006 | 2016-11-29T10:19:24.884Z | {“type”:“NoopSupervisorSpec”}

                                                                        >

553 | 006 | 2016-11-29T10:09:03.426Z | {“type”:“kafka”,“dataSchema”:{“dataSource”:“006”,“parser”:{“parseSpec”:{“dimensionsSpec”:{“dimensionExclusions”:,“dimensions”:[“db”,“table”,“optype”],“spatialDimensions”:},“format”:“json”,“timestampSpec”:{“column”:“timestamp”,“format”:“auto”}},“type”:“string”},“metricsSpec”:[{“type”:“count”,“name”:“count”}],“granularitySpec”:{“type”:“uniform”,“segmentGranularity”:“HOUR”,“queryGranularity”:{“type”:“none”},“rollup”:true,“intervals”:null}},“tuningConfig”:{“type”:“kafka”,“maxRowsInMemory”:75000,“maxRowsPerSegment”:5000000,“intermediatePersistPeriod”:“PT10M”,“basePersistDirectory”:"/home/tmp/1480385343425-0",“maxPendingPersists”:0,“indexSpec”:{“bitmap”:{“type”:“concise”},“dimensionCompression”:“lz4”,“metricCompression”:“lz4”,“longEncoding”:“longs”},“buildV9Directly”:false,“reportParseExceptions”:false,“handoffConditionTimeout”:0,“workerThreads”:null,“chatThreads”:null,“chatRetries”:8,“httpTimeout”:“PT10S”,“shutdownTimeout”:“PT80S”},“ioConfig”:{“topic”:“fmt”,“replicas”:10,“taskCount”:1,“taskDuration”:“PT3600S”,“consumerProperties”:{“bootstrap.servers”:“bi-samza00:9092”},“startDelay”:“PT5S”,“period”:“PT30S”,“useEarliestOffset”:false,“completionTimeout”:“PT86400S”,“lateMessageRejectionPeriod”:“PT172800S”}} |

552 | 006 | 2016-11-29T09:51:40.895Z | {“type”:“NoopSupervisorSpec”}

And our “startDelay” = “PT5S”.

Datasource “001” and “006” consume the same data from the same kafka topic. Our system has a lot of analysis about the two datasource, so the concurrent queries are very high. Thus there are 10 replicas. If the replicas are really too high, do we need to create more datasources to replace the replicas? What is the suitable number of replicas?

Thanks very very much,

Xinxin

在 2016年12月6日星期二 UTC+8上午3:20:01,David Lim写道:

Hey Xinxin,

Regarding RunNotice.handle() possibly throwing an exception, that shouldn’t be a concern for scheduleAtFixedRate(). If you trace the code, scheduleAtFixedRate() doesn’t invoke RunNotice.handle() directly, but calls buildRunTask() which has a single line, notices.add(new RunNotice()); The main run loop then calls notices.take() to pull the next task from the queue and then it is this run loop that calls notice.handle() (and it is in a try-catch block).

We encountered an issue in 0.9.2 where the default configuration can lead to worker thread pool exhaustion which can result in the supervisor locking up. Could you do me a favor and generate a jstack dump the next time it happens so I can look at what’s going on when the supervisor stops periodically running? I haven’t put in a fix for this issue yet but will soon.

If the issue is the same issue as the one I saw a few days ago, you can resolve it by setting chatThreads in the supervisor’s tuningConfig to a value greater than the max number of tasks you would have running concurrently (e.g if at peak you have 30 tasks running across all your middle managers, set chatThreads to at least 31).

Hey David,

Thanks a lot~

I finally find the try-catch block. And the next time it happens I will use jstack.

But I am still a little confused with the chatThreads.

In the doc I find chatThreads = min(10, taskCount * replicas). For datasource “006” taskCount = 1, replicas = 10, chatThreads = 10. Should I set it to 11? When the transaction between tasks happen, there will be 20 tasks. Should I set it to 21?

However, you said the chatThreads should be greater than tasks running across ALL middle managers. Since I have around 100 datasources, adding the replicas, at peak there are around 500 tasks. Should I set it to 501? That is extremely high. Besides, the overlord which on a single machine has about 100 supervisors.

在 2016年12月7日星期三 UTC+8上午8:15:48,David Lim写道:

Hey Xinxin,

Sorry, re-reading my post I realize I wasn’t clear. You should set chatThreads to the number of tasks that will be regularly monitored by that particular supervisor + 1, or in other words something greater than taskCount * replicas. In your case, I would try 11 and see if that helps.

Hey David,

Thank you so much.

I tried 11, it works fine. And It fixes the problem posting or shutting down supervisor is very slow, which surprised me. And I will continue to observe.

Thanks again,

Xinxin

在 2016年12月7日星期三 UTC+8下午12:17:33,David Lim写道:

Hey Xinxin,
I just wanted to give you an update - I mentioned the wrong configuration parameter in my response yesterday; the config that should be set to (taskCount * replicas + 1) is workerThreads, and not chatThreads. I submitted a PR to fix the issue that you can either track or create a patch from if you’re interested: https://github.com/druid-io/druid/pull/3760

Very sorry for the confusion! Please let me know if you encounter any more issues.

Hey David,

Thanks for reminding me~

For every datasource the taskCount is 1, we want the brokers focus on queries other than merging too much. We tried 2 datasources, changing their chatTreads, posting the supervisor spec for 3 times. And for one datasource, the workerThreads also was changed from 1 to 2, it works fine. But the other once shutdown very slow, and after the shutdownTimeout elapsed it finally shutdown.

I find that the doc and code workerThreads(default == min(10, taskCount)). You said it should be set to taskCount * replicas + 1, and it will be 11 in my occasion. Is 10 the suitable upper limit or it can be larger as the machine running supervisor has a better performance?

在 2016年12月8日星期四 UTC+8上午10:12:26,David Lim写道:

Hey Xinxin,

Try running the supervisor with 11 workerThreads and see if the shutdown happens more quickly after that.

Yes, you can run with more workerThreads per supervisor, but I don’t believe you’ll see much performance improvement. Also be conscious of the max user process limit.

Hi everyone,
I have the same error in kafka index tasks.

java.util.concurrent.RejectedExecutionException: Got Interrupted while adding to the Queue

``

How did you guys solve the problem? and What is the problem?

Regards,

Chanh

Hey Chanh,

The problem is that the supervisor can block indefinitely while waiting for a thread from the worker thread pool to become available if the number of worker threads is too low. The solutions are either:

a) Set workerThreads in the supervisor’s tuningConfig to a value greater than the number of tasks the supervisor runs (for example if taskCount=3 and numReplicas=2, set it to 7)
b) Build the Kafka indexing service extension from source with the patch here: https://github.com/druid-io/druid/pull/3760