kafka indexing service

Hello,
I am trying to test the new Kafka indexing service released in druid-0.9.1-rc1
I have set up a druid cluster on a test machine and I submitted the following supervisor Spec
I see from the overlord console that approximately every 30 seconds a new task is created. Each task runs for some seconds and exists with status FAILED, then a new one is created and so on.
I have checked the logs of overlord, middleManager and of each peon task, but there is not any error in any of them. The logs of each task located under baseTaskDir/{taskID} are empty, too.
Can you please have a look at the spec and tell me if I’m doing something wrong?
cat druid/kafka-supervisor.json** { "type":"kafka", "dataSchema" : { "dataSource" : "content20stats_stage", "parser" : { "type" : "string", "parseSpec" : { "format" : "csv", "columns" : [ "timestamp", "a_attrs", "a_boxes_ctr_id", "a_boxes_id", "a_scrolls", "n_boximpression", "n_breakpoint", "n_click", "n_doc_type", "n_fbcomment", "n_fblike", "n_fbshare", "n_gplus", "n_impression", "n_info", "n_mappa", "n_searchno", "n_staytime", "n_twcount", "s_area", "s_box", "s_cat1", "s_cat2", "s_cat3", "s_dest_id", "s_doc_id", "s_domain", "s_link_type", "s_pag_id", "s_page", "s_ref_host", "s_ref_path", "s_search", "s_source", "s_ua" ], "dimensionsSpec" : { "dimensions" : [ "a_attrs", "a_boxes_ctr_id", "a_boxes_id", "a_scrolls", "n_boximpression", "n_breakpoint", "n_click", "n_doc_type", "n_fbcomment", "n_fblike", "n_fbshare", "n_gplus", "n_impression", "n_info", "n_mappa", "n_searchno", "n_staytime", "n_twcount", "s_area", "s_box", "s_cat1", "s_cat2", "s_cat3", "s_dest_id", "s_doc_id", "s_domain", "s_link_type", "s_pag_id", "s_page", "s_ref_host", "s_ref_path", "s_search", "s_source", "s_ua" ] }, "listDelimiter" : ";", "timestampSpec" : { "column" : "timestamp", "format" : "millis" } } }, "granularitySpec" : { "queryGranularity" : "MINUTE", "segmentGranularity" : "HOUR" }, "metricsSpec" : [{ "name" : "count", "type" : "count" }, { "fieldName" : "n_impression", "name" : "impressions", "type" : "longSum" }, { "fieldName" : "n_click", "name" : "clicks", "type" : "longSum" }, { "fieldName" : "n_boximpression", "name" : "boximpressions", "type" : "longSum" }, { "fieldName" : "n_staytime", "name" : "totstaytime", "type" : "longSum" }, { "fieldName" : "n_fblike", "name" : "fblike", "type" : "longSum" }, { "fieldName" : "n_fbshare", "name" : "fbshare", "type" : "longSum" }, { "fieldName" : "n_fbcomment", "name" : "fbcomment", "type" : "longSum" }, { "fieldName" : "n_twcount", "name" : "twcount", "type" : "longSum" }, { "fieldName" : "n_searchno", "name" : "searchres", "type" : "longSum" } ] }, "ioConfig" : { "topic": "event", "consumerProperties": { "bootstrap.servers": "ip_server1:9092,ip_server2:9092" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H" }, "tuningConfig" : { "type" : "kafka", "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "reportParseExceptions" : true } } Here are the runtime.properties of overlord and middleManager ** cat druid/middleManager/runtime.properties
druid.service=middleManager
druid.port=8083

Number of tasks per middleManager

#leave default, which is Number of available processors - 1
#druid.worker.capacity=3

Task launch parameters

druid.indexer.runner.javaOpts=-server -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Dlog-name=peon
druid.indexer.task.baseTaskDir=/data/tmp/druid/task

HTTP server threads

druid.server.http.numThreads=25

Processing threads and buffers

druid.processing.buffer.sizeBytes=536870912
druid.processing.numThreads=2

Hadoop indexing

druid.indexer.task.hadoopWorkingPath=/data/tmp/druid/hadoop-tmp
druid.indexer.task.defaultHadoopCoordinates=[“org.apache.hadoop:hadoop-client:2.3.0”]
druid.indexer.task.restoreTasksOnRestart=true
$ cat druid/overlord/runtime.properties
druid.port=8084
druid.service=overlord

druid.indexer.queue.startDelay=PT30S

druid.indexer.runner.type=remote
druid.indexer.storage.type=metadata

Thanks,
Tommaso

Hey Tommaso,

Could you post one of the task logs so we can get some idea of why the tasks are failing?

Hi David,
there is not any error on the task logs, only some warning about multiple bindings for slf4j.
Here is the content of a task log, the others are all the same
$ cat index_kafka_content20stats_stage_730164bf3b2917a_hekkgpdm.log
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/tommaso/share/druid-0.9.1-rc1/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/tommaso/share/druid-0.9.1-rc1/lib/log4j-slf4j-impl-2.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger (io.druid.guice.PropertiesModule).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

If I activate logs at debug level I see this stack trace in the log of the peons’ virtual machine. I don’t know if it could be relevant, but I post it just in case…

2016-06-01T12:58:56,254 CONFIG [main] com.sun.jersey.server.impl.ejb.EJBComponentProviderFactoryInitilizer - The EJB interceptor binding API is not available. JAX-RS EJB support is disabled.
javax.naming.NoInitialContextException: Need to specify class name in environment or system property, or as an applet parameter, or in an application resource file: java.naming.factory.initial
at javax.naming.spi.NamingManager.getInitialContext(NamingManager.java:662) ~[?:1.8.0_72-internal]
at javax.naming.InitialContext.getDefaultInitCtx(InitialContext.java:313) ~[?:1.8.0_72-internal]
at javax.naming.InitialContext.getURLOrDefaultInitCtx(InitialContext.java:350) ~[?:1.8.0_72-internal]
at javax.naming.InitialContext.lookup(InitialContext.java:417) ~[?:1.8.0_72-internal]
at com.sun.jersey.server.impl.ejb.EJBComponentProviderFactoryInitilizer.initialize(EJBComponentProviderFactoryInitilizer.java:64) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.WebComponent.configure(WebComponent.java:570) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.ServletContainer$InternalWebComponent.configure(ServletContainer.java:332) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.WebComponent.load(WebComponent.java:604) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.WebComponent.init(WebComponent.java:207) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:394) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:577) [jersey-servlet-1.19.jar:1.19]
at javax.servlet.GenericServlet.init(GenericServlet.java:244) [javax.servlet-api-3.1.0.jar:3.1.0]
at com.google.inject.servlet.ServletDefinition.init(ServletDefinition.java:119) [guice-servlet-4.0-beta.jar:?]
at com.google.inject.servlet.ManagedServletPipeline.init(ManagedServletPipeline.java:84) [guice-servlet-4.0-beta.jar:?]
at com.google.inject.servlet.ManagedFilterPipeline.initPipeline(ManagedFilterPipeline.java:104) [guice-servlet-4.0-beta.jar:?]
at com.google.inject.servlet.GuiceFilter.init(GuiceFilter.java:224) [guice-servlet-4.0-beta.jar:?]
at org.eclipse.jetty.servlet.FilterHolder.initialize(FilterHolder.java:138) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:852) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:298) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:741) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.server.Server.start(Server.java:387) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.server.Server.doStart(Server.java:354) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at io.druid.server.initialization.jetty.JettyServerModule$1.start(JettyServerModule.java:204) [druid-server-0.9.1-rc1.jar:0.9.1-rc1]
at com.metamx.common.lifecycle.Lifecycle.start(Lifecycle.java:259) [java-util-0.27.9.jar:?]
at io.druid.guice.LifecycleModule$2.start(LifecycleModule.java:155) [druid-api-0.9.1-rc1.jar:0.9.1-rc1]
at io.druid.cli.GuiceRunnable.initLifecycle(GuiceRunnable.java:91) [druid-services-0.9.1-rc1.jar:0.9.1-rc1]
at io.druid.cli.CliPeon.run(CliPeon.java:274) [druid-services-0.9.1-rc1.jar:0.9.1-rc1]
at io.druid.cli.Main.main(Main.java:105) [druid-services-0.9.1-rc1.jar:0.9.1-rc1]
2016-06-01T12:58:56,259 CONFIG [main] com.sun.jersey.server.impl.cdi.CDIComponentProviderFactoryInitializer - The CDI BeanManager is not available at java:comp/BeanManager
javax.naming.NoInitialContextException: Need to specify class name in environment or system property, or as an applet parameter, or in an application resource file: java.naming.factory.initial
at javax.naming.spi.NamingManager.getInitialContext(NamingManager.java:662) ~[?:1.8.0_72-internal]
at javax.naming.InitialContext.getDefaultInitCtx(InitialContext.java:313) ~[?:1.8.0_72-internal]
at javax.naming.InitialContext.getURLOrDefaultInitCtx(InitialContext.java:350) ~[?:1.8.0_72-internal]
at javax.naming.InitialContext.lookup(InitialContext.java:417) ~[?:1.8.0_72-internal]
at com.sun.jersey.server.impl.cdi.CDIComponentProviderFactoryInitializer.lookupInJndi(CDIComponentProviderFactoryInitializer.java:110) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.server.impl.cdi.CDIComponentProviderFactoryInitializer.lookup(CDIComponentProviderFactoryInitializer.java:83) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.server.impl.cdi.CDIComponentProviderFactoryInitializer.initialize(CDIComponentProviderFactoryInitializer.java:70) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.WebComponent.configure(WebComponent.java:572) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.ServletContainer$InternalWebComponent.configure(ServletContainer.java:332) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.WebComponent.load(WebComponent.java:604) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.WebComponent.init(WebComponent.java:207) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:394) [jersey-servlet-1.19.jar:1.19]
at com.sun.jersey.spi.container.servlet.ServletContainer.init(ServletContainer.java:577) [jersey-servlet-1.19.jar:1.19]
at javax.servlet.GenericServlet.init(GenericServlet.java:244) [javax.servlet-api-3.1.0.jar:3.1.0]
at com.google.inject.servlet.ServletDefinition.init(ServletDefinition.java:119) [guice-servlet-4.0-beta.jar:?]
at com.google.inject.servlet.ManagedServletPipeline.init(ManagedServletPipeline.java:84) [guice-servlet-4.0-beta.jar:?]
at com.google.inject.servlet.ManagedFilterPipeline.initPipeline(ManagedFilterPipeline.java:104) [guice-servlet-4.0-beta.jar:?]
at com.google.inject.servlet.GuiceFilter.init(GuiceFilter.java:224) [guice-servlet-4.0-beta.jar:?]
at org.eclipse.jetty.servlet.FilterHolder.initialize(FilterHolder.java:138) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:852) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:298) [jetty-servlet-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:741) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:132) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.server.Server.start(Server.java:387) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:114) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:61) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.server.Server.doStart(Server.java:354) [jetty-server-9.2.5.v20141112.jar:9.2.5.v20141112]
at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) [jetty-util-9.2.5.v20141112.jar:9.2.5.v20141112]
at io.druid.server.initialization.jetty.JettyServerModule$1.start(JettyServerModule.java:204) [druid-server-0.9.1-rc1.jar:0.9.1-rc1]
at com.metamx.common.lifecycle.Lifecycle.start(Lifecycle.java:259) [java-util-0.27.9.jar:?]
at io.druid.guice.LifecycleModule$2.start(LifecycleModule.java:155) [druid-api-0.9.1-rc1.jar:0.9.1-rc1]
at io.druid.cli.GuiceRunnable.initLifecycle(GuiceRunnable.java:91) [druid-services-0.9.1-rc1.jar:0.9.1-rc1]
at io.druid.cli.CliPeon.run(CliPeon.java:274) [druid-services-0.9.1-rc1.jar:0.9.1-rc1]
at io.druid.cli.Main.main(Main.java:105) [druid-services-0.9.1-rc1.jar:0.9.1-rc1]

Regards,
Tommaso

Hey Tommaso,

I tried out your spec and it worked fine for me - the only noteworthy thing is that you’ve enabled reportParseExceptions which is great, but know that it can cause some pretty ugly loops if you have any bad data coming in from Kafka. What happens is that the supervisor creates a task which reads the bad data and then throws an exception because it can’t parse the data; the supervisor then notices that the task has died and creates a new task to read the same event that caused the first one to die so it also dies and so on.

There’s probably some errors happening your tasks but they’re not actually properly logging their output so we don’t know exactly what’s going on, this line is pointing to a log4j config issue:

log4j:WARN No appenders could be found for logger (io.druid.guice.PropertiesModule).

Once we get some proper logging we can figure out what’s causing the task to fail.

Hi David,
correct me if I am wrong, but it seems that 0.9.1-rc1 ships with two implementation for slf4j (log4j and log4j2). Is this correct?

I removed from the druid-0.9.1-rc1/lib the jar slf4j-log4j12-1.6.1.jar , corresponding to the log4 binding

Now in the log files the warning about multiple bindings is gone but I find another warning (the same warning is shown in the task logs and in the standard error when I launch the jvm for the various components)

log4j:WARN No appenders could be found for logger (org.jboss.logging).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

The task logs are still empty, apart from this warning.

Anyway now something interesting appears on the logs of the peons.

2016-06-03T08:35:04,980 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[KafkaIndexTask{id=index_kafka_content20stats_stage_730164bf3b2917a_iiimiiah, type=index_kafka, dataSource=content20stats_stage}]
org.apache.kafka.common.protocol.types.SchemaException: Error reading field ‘responses’: Error reading field ‘topic’: java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) ~[?:?]
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:464) ~[?:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) ~[?:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303) ~[?:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197) ~[?:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187) ~[?:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:877) ~[?:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829) ~[?:?]
at io.druid.indexing.kafka.KafkaIndexTask$2.call(KafkaIndexTask.java:373) ~[?:?]
at io.druid.indexing.kafka.KafkaIndexTask$2.call(KafkaIndexTask.java:368) ~[?:?]
at com.metamx.common.RetryUtils.retry(RetryUtils.java:60) ~[java-util-0.27.9.jar:?]
at com.metamx.common.RetryUtils.retry(RetryUtils.java:78) ~[java-util-0.27.9.jar:?]
at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:366) ~[?:?]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.9.1-rc1.jar:0.9.1-rc1]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.9.1-rc1.jar:0.9.1-rc1]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_72-internal]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_72-internal]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_72-internal]

Thanks for your help,
Tommaso

Hello,
for your information, I was able to eliminate the warning about log4j by removing from the lib directory the jar log4j-1.2.16.jar (in addition to slf4j-log4j12-1.6.1.jar as I mentioned in the previous message)

The error that I mentioned before is of course still generated by the peons every time a new task is started.

Bye,
Tommaso

Hey Tommaso,

Ah okay, the Kafka index task uses the new Kafka 0.9 consumer which is unfortunately incompatible with Kafka 0.8.x and earlier brokers. Kafka introduced a protocol change in 0.9 which is why the consumer is throwing those errors. Upgrade your brokers to 0.9 or later and those errors should go away.

I don’t think we mentioned this anywhere in the docs and it feels like something worthwhile to mention so I’ll look into this! Let me know if this helps.

Thanks David,
I will let you know when I will be able to test it. It will take some time in order to make the test because I was getting the data from a kafka server in production environment and I can not upgrade it so easily.

As a side note, don’t you think it is an error that the 0.9.1-rc1 distribution includes slf4j-log4j12-1.6.1.jar and log4j-1.2.16.jar in the lib directory? Is there something I am not understanding about the logging system?

Thanks again for your invaluable help.

regards,
Tommaso

Hey Tommaso,

Cool, let us know how things go.

As for the slf4j bindings, I think you’re correct. It looks like we pulled in log4j accidentally when we upgraded to ZK 3.4.8. I put out a PR to exclude the dependency: https://github.com/druid-io/druid/pull/3075. Thanks!