Kafka offset out of range exceptions

Hi all,

I am getting errors trying to run a kafka indexing task related to the clients having the incorrect offsets. I tried adding “auto.offset.reset” to the consumerProperties but that value is explicitly set to none in the code (https://github.com/druid-io/druid/blob/master/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java#L802).

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {raw_shopkick_pylons_weblog_avro_v1-0=2968580}


I'm not sure what the resolution would be.

Where are the offsets stored?  I have been wiping my environment repeatedly trying to start over from scratch however it is possible that I have not been wiping the offsets.  I have been clearing the druid zookeeper node, dropping my MySQL database and deleting my local data directory on each node.

Thanks,

--Ben

Hey Ben,

We force auto.offset.reset to none as part of providing the exactly-once ingestion guarantee - i.e. if we allowed Kafka to jump to the earliest/latest offset if the offset that follows from the last offset read by the previous task was no longer available, it would appear to the user that we’ve read all the offsets exactly once when actually we missed a bunch because they had been evicted from Kafka before we could read them.

The way to clear the offset is to remove the associated entry from the druid_dataSource table.

In the future, we plan to add a convenience API to make it easier to reset the committed offset, see: https://github.com/druid-io/druid/issues/3195

Hi,

I have a similar problem.

Hey Antoine,

In the metadata database, there should be a table named druid_dataSource (unless you’ve renamed it in config). That table contains an entry for each datasource which is used to track the last committed Kafka offset. If your Kafka offsets no longer follow sequentially because the Kafka logs were purged or Kafka expired old events, you’ll need to delete this entry so that the indexing service doesn’t try to enforce offset continuity between tasks.

Hey David,
I am following your instruction but I have some error.

com.metamx.common.ISE: Unable to grant lock to inactive Task [index_kafka_ad_statistic_hourly_0bdf47af19ad1e0_ljofknan]

seem 1 task was locked and the other tasks can’t start.

2016-12-01 02:54:56,040 WARN o.e.j.s.ServletHandler [qtp1637000661-167] /druid/indexer/v1/action

com.metamx.common.ISE: Unable to grant lock to inactive Task [index_kafka_ad_statistic_hourly_0bdf47af19ad1e0_ljofknan]

at io.druid.indexing.overlord.TaskLockbox.tryLock(TaskLockbox.java:229) ~[druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.overlord.TaskLockbox.tryLock(TaskLockbox.java:206) ~[druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.common.actions.SegmentAllocateAction.perform(SegmentAllocateAction.java:206) ~[druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.common.actions.SegmentAllocateAction.perform(SegmentAllocateAction.java:57) ~[druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.common.actions.LocalTaskActionClient.submit(LocalTaskActionClient.java:64) ~[druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.overlord.http.OverlordResource$3.apply(OverlordResource.java:326) ~[druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.overlord.http.OverlordResource$3.apply(OverlordResource.java:315) ~[druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.overlord.http.OverlordResource.asLeaderWith(OverlordResource.java:658) ~[druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at io.druid.indexing.overlord.http.OverlordResource.doAction(OverlordResource.java:312) ~[druid-indexing-service-0.9.1.1.jar:0.9.1.1]

at sun.reflect.GeneratedMethodAccessor78.invoke(Unknown Source) ~[?:?]

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_77]

at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_77]

at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.server.impl.uri.rules.ResourceClassRule.accept(ResourceClassRule.java:108) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409) ~[jersey-server-1.19.jar:1.19]

at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:409) ~[jersey-servlet-1.19.jar:1.19]

at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:558) ~[jersey-servlet-1.19.jar:1.19]

at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:733) ~[jersey-servlet-1.19.jar:1.19]

at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) ~[javax.servlet-api-3.1.0.jar:3.1.0]

at com.google.inject.servlet.ServletDefinition.doServiceImpl(ServletDefinition.java:278) ~[guice-servlet-4.0-beta.jar:?]

at com.google.inject.servlet.ServletDefinition.doService(ServletDefinition.java:268) ~[guice-servlet-4.0-beta.jar:?]

at com.google.inject.servlet.ServletDefinition.service(ServletDefinition.java:180) ~[guice-servlet-4.0-beta.jar:?]

at com.google.inject.servlet.ManagedServletPipeline.service(ManagedServletPipeline.java:93) ~[guice-servlet-4.0-beta.jar:?]

at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:120) ~[guice-servlet-4.0-beta.jar:?]

at com.google.inject.servlet.GuiceFilter$1.call(GuiceFilter.java:132) ~[guice-servlet-4.0-beta.jar:?]

at com.google.inject.servlet.GuiceFilter$1.call(GuiceFilter.java:129) ~[guice-servlet-4.0-beta.jar:?]

at com.google.inject.servlet.GuiceFilter$Context.call(GuiceFilter.java:206) ~[guice-servlet-4.0-beta.jar:?]

at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:129) ~[guice-servlet-4.0-beta.jar:?]

at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112]

at io.druid.server.http.RedirectFilter.doFilter(RedirectFilter.java:71) ~[druid-server-0.9.1.1.jar:0.9.1.1]

at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.servlets.UserAgentFilter.doFilter(UserAgentFilter.java:83) ~[jetty-servlets-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.servlets.GzipFilter.doFilter(GzipFilter.java:364) ~[jetty-servlets-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652) ~[jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1125) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1059) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:52) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.server.Server.handle(Server.java:497) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:248) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540) [jetty-io-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:620) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]

at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:540) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]

at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]

2016-12-01 02:55:10,595 INFO i.d.i.c.a.LocalTaskActionClient [qtp1637000661-170] Performing action for task[index_kafka_ad_statistic_hourly_1810b821f94a3cd_acpjbmjb]: SegmentAllocateAction{dataSource=‘ad_statistic_hourly’, timestamp=2016-12-01T01:00:00.000Z, queryGranularity=DurationGranularity{length=3600000, origin=0}, preferredSegmentGranularity=HOUR, sequenceName=‘index_kafka_ad_statistic_hourly_1810b821f94a3cd_6’, previousSegmentId=‘ad_statistic_hourly_2016-12-01T00:00:00.000Z_2016-12-01T01:00:00.000Z_2016-12-01T00:18:37.340Z_6’}

``

Hey Chanh,

Is the task [index_kafka_ad_statistic_hourly_0bdf47af19ad1e0_ljofknan] still in the pending/waiting/running queues of your indexing service or has it completed? Are there other tasks waiting to run that don’t start even though your workers have capacity to run them?

Hi David,

Is the task [index_kafka_ad_statistic_hourly_0bdf47af19ad1e0_ljofknan] still in the pending/waiting/running queues of your indexing service or has it completed?

It is in running tasks.

Are there other tasks waiting to run that don’t start even though your workers have capacity to run them?

The capacity was available (50 total) for all tasks to run but it’s all in waiting tasks (6 tasks).

The problem had gone away when I restart middle manager, coordinator and overload.

I attached the overload logs.

Regards,

app-2016-12-01-01-1.log.gz (350 KB)

app-2016-12-01-02-1.log.gz (353 KB)

Hmm, that’s odd. Glad to hear that things are working again after a restart.

Let us know if the issue happens again. I’ve seen similar issues to this in the past but haven’t been able to nail down a consistent repro. I’ll spend some time testing it.