Another Druid noob here!
Context
I am running a druid cluster by following the cluster deployment instructions given in the documentation Clustered deployment on an Azure Machine. I have been following Druid documentation for almost everything.
I have made three copies of the extracted folder and I am treating one as master server and other two as data server and query server respectively and I am able to ingest data from non-kerberised HDFS ( haven’t tried kerberised HDFS yet ) and non-kerberised Kafka. So, everything was working fine now!
Experiment
When I try to ingest data from Kerberised Kafka, I did not find any proper documentation. From Google search I was able to get an idea and according to that I followed the steps
-
Step 1 : Got the keytab and a principal for accessing the Kafka.
-
Step 2 : Installed kerberos client on my Azure machine and run the
kinit
command to obtain a Kerberos TGT -
Step 3 : Prepared a JAAS file with the following contents:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/home/user/kafka.service.keytab"
principal="user@EXAMPLE.COM"
};
and then configured the jvm.config
file in all the three server folders in all the processes i.e. co-ordinator, overlord, middle manager, historical, router, broker by adding this line in the jvm.config
-Djava.security.auth.login.config=/home/rtrs/client_jaas.conf
I also added this line
-Djava.security.krb5.conf=/etc/krb5.conf
pointing to the kerberos configuration file!
Now, when I am entering the following details in the Kafka Ingestion UI in Druid
-
Bootstrap Server :
BROKER_NODE:PORT
-
Topic :
test_topic
-
Consumer properties :
{ "bootstrap.servers": "10.144.102.128:6667","security.protocol":"SASL_PLAINTEXT"}
I am getting the following error:
Logs
[KafkaSupervisor-new-data-source] ERROR org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Exception starting SeekableStreamSupervisor[new-data-source]: {class=org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor, exceptionType=class org.apache.kafka.common.KafkaException, exceptionMessage=Failed to construct kafka consumer}
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823) ~[?:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:667) ~[?:?]
at org.apache.druid.indexing.kafka.KafkaRecordSupplier.getKafkaConsumer(KafkaRecordSupplier.java:248) ~[?:?]
at org.apache.druid.indexing.kafka.KafkaRecordSupplier.<init>(KafkaRecordSupplier.java:63) ~[?:?]
at org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.setupRecordSupplier(KafkaSupervisor.java:126) ~[?:?]
at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.tryInit(SeekableStreamSupervisor.java:733) ~[druid-indexing-service-0.20.1.jar:0.20.1]
at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.lambda$null$0(SeekableStreamSupervisor.java:617) ~[druid-indexing-service-0.20.1.jar:0.20.1]
at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor$$Lambda$134.000000000009C350.perform(Unknown Source) ~[?:?]
at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:87) ~[druid-core-0.20.1.jar:0.20.1]
at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.lambda$start$2(SeekableStreamSupervisor.java:615) ~[druid-indexing-service-0.20.1.jar:0.20.1]
at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor$$Lambda$133.00000000101824B0.run(Unknown Source) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:522) [?:1.8.0]
at java.util.concurrent.FutureTask.run(FutureTask.java:277) [?:1.8.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) [?:1.8.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:1.8.0]
at java.lang.Thread.run(Thread.java:785) [?:?]
Caused by: java.lang.SecurityException: Configuration Error:
Line 6: expected [option key]
at com.ibm.security.auth.login.ConfigFile.<init>(ConfigFile.java:102) ~[?:1.8.0]
at java.lang.J9VMInternals.newInstanceImpl(Native Method) ~[?:?]
at java.lang.Class.newInstance(Class.java:1899) ~[?:?]
at javax.security.auth.login.Configuration$2.run(Configuration.java:246) ~[?:1.8.0]
at javax.security.auth.login.Configuration$2.run(Configuration.java:238) ~[?:1.8.0]
at java.security.AccessController.doPrivileged(AccessController.java:650) ~[?:1.8.0]
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:238) ~[?:1.8.0]
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:114) ~[?:?]
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) ~[?:?]
at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) ~[?:?]
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:134) ~[?:?]
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73) ~[?:?]
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[?:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740) ~[?:?]
... 15 more
Caused by: java.io.IOException: Configuration Error:
Line 6: expected [option key]
at com.ibm.security.auth.login.ConfigFile.match(ConfigFile.java:560) ~[?:1.8.0]
at com.ibm.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:446) ~[?:1.8.0]
at com.ibm.security.auth.login.ConfigFile.readConfig(ConfigFile.java:391) ~[?:1.8.0]
at com.ibm.security.auth.login.ConfigFile.init(ConfigFile.java:291) ~[?:1.8.0]
at com.ibm.security.auth.login.ConfigFile.init(ConfigFile.java:211) ~[?:1.8.0]
at com.ibm.security.auth.login.ConfigFile.<init>(ConfigFile.java:99) ~[?:1.8.0]
at java.lang.J9VMInternals.newInstanceImpl(Native Method) ~[?:?]
at java.lang.Class.newInstance(Class.java:1899) ~[?:?]
at javax.security.auth.login.Configuration$2.run(Configuration.java:246) ~[?:1.8.0]
at javax.security.auth.login.Configuration$2.run(Configuration.java:238) ~[?:1.8.0]
at java.security.AccessController.doPrivileged(AccessController.java:650) ~[?:1.8.0]
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:238) ~[?:1.8.0]
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:114) ~[?:?]
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) ~[?:?]
at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) ~[?:?]
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:134) ~[?:?]
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73) ~[?:?]
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[?:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:740) ~[?:?]
... 15 more
09:25:09.035 [KafkaSupervisor-new-data-source] ERROR org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Failed to initialize after 20 retries, aborting. Please resubmit the supervisor spec to restart this supervisor [KafkaSupervisor-new-data-source]: {class=org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor}
I am sorry if I described my problem as a story because I am not sure where did I go wrong.
Any help will be appreciated. Please let me know if I am following the wrong approach or doing something wrong somewhere.
References
I have been following these two links as Kerberised kafka ingestion is not mentioned in the Druid Docuementation.