Ingestion from kerberised Kafka

Another Druid noob here!

Context

I am running a druid cluster by following the cluster deployment instructions given in the documentation Clustered deployment on an Azure Machine. I have been following Druid documentation for almost everything.

I have made three copies of the extracted folder and I am treating one as master server and other two as data server and query server respectively and I am able to ingest data from non-kerberised HDFS ( haven’t tried kerberised HDFS yet ) and non-kerberised Kafka. So, everything was working fine now!

Experiment

When I try to ingest data from Kerberised Kafka, I did not find any proper documentation. From Google search I was able to get an idea and according to that I followed the steps

  • Step 1 : Got the keytab and a principal for accessing the Kafka.

  • Step 2 : Installed kerberos client on my Azure machine and run the kinit command to obtain a Kerberos TGT

  • Step 3 : Prepared a JAAS file with the following contents:

KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        keyTab="/home/user/kafka.service.keytab"
        principal="user@EXAMPLE.COM"
};

and then configured the jvm.config file in all the three server folders in all the processes i.e. co-ordinator, overlord, middle manager, historical, router, broker by adding this line in the jvm.config

-Djava.security.auth.login.config=/home/rtrs/client_jaas.conf

I also added this line

-Djava.security.krb5.conf=/etc/krb5.conf

pointing to the kerberos configuration file!


Now, when I am entering the following details in the Kafka Ingestion UI in Druid

  • Bootstrap Server : BROKER_NODE:PORT
  • Topic : test_topic
  • Consumer properties :
    { "bootstrap.servers": "10.144.102.128:6667","security.protocol":"SASL_PLAINTEXT"}

I am getting the following error:

Logs

[KafkaSupervisor-new-data-source] ERROR org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Exception starting SeekableStreamSupervisor[new-data-source]: {class=org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor, exceptionType=class org.apache.kafka.common.KafkaException, exceptionMessage=Failed to construct kafka consumer}
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:667) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getKafkaConsumer(KafkaRecordSupplier.java:248) ~[?:?]
	at org.apache.druid.indexing.kafka.KafkaRecordSupplier.<init>(KafkaRecordSupplier.java:63) ~[?:?]
	at org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.setupRecordSupplier(KafkaSupervisor.java:126) ~[?:?]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.tryInit(SeekableStreamSupervisor.java:733) ~[druid-indexing-service-0.20.1.jar:0.20.1]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.lambda$null$0(SeekableStreamSupervisor.java:617) ~[druid-indexing-service-0.20.1.jar:0.20.1]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor$$Lambda$134.000000000009C350.perform(Unknown Source) ~[?:?]
	at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:87) ~[druid-core-0.20.1.jar:0.20.1]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.lambda$start$2(SeekableStreamSupervisor.java:615) ~[druid-indexing-service-0.20.1.jar:0.20.1]
	at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor$$Lambda$133.00000000101824B0.run(Unknown Source) ~[?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522) [?:1.8.0]
	at java.util.concurrent.FutureTask.run(FutureTask.java:277) [?:1.8.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) [?:1.8.0]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:1.8.0]
	at java.lang.Thread.run(Thread.java:785) [?:?]
Caused by: java.lang.SecurityException: Configuration Error:
	Line 6: expected [option key]
	at com.ibm.security.auth.login.ConfigFile.<init>(ConfigFile.java:102) ~[?:1.8.0]
	at java.lang.J9VMInternals.newInstanceImpl(Native Method) ~[?:?]
	at java.lang.Class.newInstance(Class.java:1899) ~[?:?]
	at javax.security.auth.login.Configuration$2.run(Configuration.java:246) ~[?:1.8.0]
	at javax.security.auth.login.Configuration$2.run(Configuration.java:238) ~[?:1.8.0]
	at java.security.AccessController.doPrivileged(AccessController.java:650) ~[?:1.8.0]
	at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:238) ~[?:1.8.0]
	at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:114) ~[?:?]
	at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) ~[?:?]
	at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) ~[?:?]
	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:134) ~[?:?]
	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73) ~[?:?]
	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740) ~[?:?]
	... 15 more
Caused by: java.io.IOException: Configuration Error:
	Line 6: expected [option key]
	at com.ibm.security.auth.login.ConfigFile.match(ConfigFile.java:560) ~[?:1.8.0]
	at com.ibm.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:446) ~[?:1.8.0]
	at com.ibm.security.auth.login.ConfigFile.readConfig(ConfigFile.java:391) ~[?:1.8.0]
	at com.ibm.security.auth.login.ConfigFile.init(ConfigFile.java:291) ~[?:1.8.0]
	at com.ibm.security.auth.login.ConfigFile.init(ConfigFile.java:211) ~[?:1.8.0]
	at com.ibm.security.auth.login.ConfigFile.<init>(ConfigFile.java:99) ~[?:1.8.0]
	at java.lang.J9VMInternals.newInstanceImpl(Native Method) ~[?:?]
	at java.lang.Class.newInstance(Class.java:1899) ~[?:?]
	at javax.security.auth.login.Configuration$2.run(Configuration.java:246) ~[?:1.8.0]
	at javax.security.auth.login.Configuration$2.run(Configuration.java:238) ~[?:1.8.0]
	at java.security.AccessController.doPrivileged(AccessController.java:650) ~[?:1.8.0]
	at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:238) ~[?:1.8.0]
	at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:114) ~[?:?]
	at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) ~[?:?]
	at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) ~[?:?]
	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:134) ~[?:?]
	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73) ~[?:?]
	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[?:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740) ~[?:?]
	... 15 more
09:25:09.035 [KafkaSupervisor-new-data-source] ERROR org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Failed to initialize after 20 retries, aborting. Please resubmit the supervisor spec to restart this supervisor [KafkaSupervisor-new-data-source]: {class=org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor}

I am sorry if I described my problem as a story because I am not sure where did I go wrong.

Any help will be appreciated. Please let me know if I am following the wrong approach or doing something wrong somewhere.


References

I have been following these two links as Kerberised kafka ingestion is not mentioned in the Druid Docuementation.

Relates to Apache Druid <0.20.1>

Hey @consentsam I only just spotted this - did you get an answer OK? It was such a well constructed question haha!

Thank you for your reply @petermarshallio but I have not got any answer and I am still stuck on this error.
Yes, I tried to frame this question in a manner so that any developer gong through this question should at least understand my problem. I did not want the question to not be answered because I missed some details.

@petermarshallio Can you please guide me in ingesting the data from Kerberos Kafka. It would be very nice of you!

Thank you

I found this, where someone suggested that the jaas file might have errors? I don’t know if this will help…