Kafka Indexing Service not working in Kerberized cluster

Hi Team,

I am using Druid 0.9.2 ( http://druid.io/docs/0.9.2/development/extensions-core/kafka-ingestion.html ) and trying to make kafka indexing service work in kerberized cluster.

The Overlord UI shows below status for the running task:

{“task”:“index_kafka_metrics-kafka_480f9eadc9905fa_ecjfikpb”,“status”:{“id”:“index_kafka_metrics-kafka_480f9eadc9905fa_ecjfikpb”,“status”:“FAILED”,“duration”:3931}}

When check the log I see below error:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:648) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:542) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:524) ~[?:?]
	at io.druid.indexing.kafka.KafkaIndexTask.newConsumer(KafkaIndexTask.java:882) ~[?:?]
	at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:284) ~[?:?]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.9.2.2.6.2.0-205.jar:0.9.2.2.6.2.0-205]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.9.2.2.6.2.0-205.jar:0.9.2.2.6.2.0-205]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_112]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_112]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_112]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
	Suppressed: java.lang.NullPointerException
		at io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifier.close(CoordinatorBasedSegmentHandoffNotifier.java:164) ~[druid-server-0.9.2.2.6.2.0-205.jar:0.9.2.2.6.2.0-205]
		at io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver.close(FiniteAppenderatorDriver.java:301) ~[druid-server-0.9.2.2.6.2.0-205.jar:0.9.2.2.6.2.0-205]
		at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:544) ~[?:?]
		at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.9.2.2.6.2.0-205.jar:0.9.2.2.6.2.0-205]
		at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.9.2.2.6.2.0-205.jar:0.9.2.2.6.2.0-205]
		at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_112]
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_112]
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_112]
		at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
	Suppressed: java.lang.NullPointerException
		at io.druid.segment.realtime.appenderator.AppenderatorImpl.shutdownExecutors(AppenderatorImpl.java:683) ~[druid-server-0.9.2.2.6.2.0-205.jar:0.9.2.2.6.2.0-205]
		at io.druid.segment.realtime.appenderator.AppenderatorImpl.close(AppenderatorImpl.java:611) ~[druid-server-0.9.2.2.6.2.0-205.jar:0.9.2.2.6.2.0-205]
		at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:544) ~[?:?]
		at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.9.2.2.6.2.0-205.jar:0.9.2.2.6.2.0-205]
		at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.9.2.2.6.2.0-205.jar:0.9.2.2.6.2.0-205]
		at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_112]
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_112]
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_112]
		at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner  authentication information from the user

I have added below properties in middle manager druid.indexer.runner.javaOpts in runtime.properties file.

-Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb.conf -Djavax.security.auth.useSubjectCredsOnly=false -Djava.security.krb5.kdc=c1165-kdchost.druid-labs.com -Djava.security.krb5.realm=MYCORP.NET -Dsun.security.krb5.debug=true

Also, added in Overlord jvm.config file.

-Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf

-Dsun.security.krb5.debug=true

Here is my kafka_client_jaas.conf file and this file exist on all nodes.

KafkaClient {

com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true

renewTicket=true

serviceName=“kafka”;

};

I can confirm, I have valid kerberos ticket and Kafka topic ACL is set to allow any consumer. I tried running as kafka user and other application user and got the same error. The middle manager and Overlord runs on different nodes and I tried submitting the job from overlord node. I also looked at https://groups.google.com/forum/#!topic/druid-user/W2SiPnNsy0U which reports similar issue but different error message.

Would appreciate any help here.

Hi Kashif,

Perhaps try upgrading Druid. It comes with a newer version of the Kafka client too and that update may be beneficial.

Try also double checking that you have configured your keytab properly and that the principal you are trying to use is in the keytab.

Hi Gian,

I upgraded Druid to 0.10.1 and seems it is working now when I manually run kinit for druid user. E.g I am submitting Kafka Supervisor indexing spec as User1 and User1 has valid ticket. However, if Druid user doesn’t have valid kerberos ticket (which by default it won’t have) the submission fails. When I sudo as druid user and create a valid ticket and then submit the job as User1 it works fine.

Any insight here? I have added -Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf to Indexer, overlord and middle manager jvm opts.

Thanks.

Instead of setting “useTicketCache=true” in the kafka_client_jaas.conf file, what happens if you try specifying the keytab/principal explicitly, like in the client example show here?

http://kafka.apache.org/090/documentation.html#security_sasl_brokerconfig

  • Jon

Thanks Jon, that helped.