Druid not reading from Kafka Topic

Hi,

I am new to druid (apologies in advance), for the long email, but I wanted to make sure you have as much information as possible!

I am running version 0.15.0 and I have created a topic in Kafka called druid to test ingestion with. in the attachment you can see two rows of sample data.

Using the Supervisor script (below), this gets accepted and shows a running state, but no tasks appear.

I have added the following to the common properties file to ensure our site has some basic encryption:

druid.auth.authenticatorChain=[“basic”]

druid.auth.authenticator.basic.type=basic

druid.auth.authenticator.basic.initialAdminPassword=WejoAdmin

druid.auth.authenticator.basic.initialInternalClientPassword=Clientpwd

druid.auth.authenticator.basic.authorizerName=basic

druid.escalator.type=basic

druid.escalator.internalClientUsername=druid_system

druid.escalator.internalClientPassword=Clientpwd

druid.escalator.authorizerName=basic

druid.auth.authorizers=[“basic”]

druid.auth.authorizer.basic.type=basic

I can see that there are messages in the kafka topic (using kafka-console-consumer.sh) but they are just not being read by the supervisor.

I am running the single-small configuration. I am seeing the following in the co-coordinator logs:

019-08-15T13:48:12,269 INFO [TaskQueue-StorageSync] org.apache.druid.indexing.overlord.TaskQueue - Synced 0 tasks from storage (0 tasks added, 0 tasks removed).
2019-08-15T13:48:12,942 WARN [DatabaseSegmentManager-Exec–0] org.apache.druid.metadata.SQLMetadataSegmentManager - No segments found in the database!
2019-08-15T13:48:13,123 INFO [qtp1408290972-164] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Posting ResetNotice
2019-08-15T13:48:13,851 INFO [DatabaseRuleManager-Exec–0] org.apache.druid.metadata.SQLMetadataRuleManager - Polled and found 1 rule(s) for 1 datasource(s)
2019-08-15T13:48:15,000 INFO [Coordinator-Exec–0] org.apache.druid.server.coordinator.DruidCoordinator - Metadata store not polled yet, skipping this run.
2019-08-15T13:48:20,000 INFO [Coordinator-Exec–0] org.apache.druid.server.coordinator.DruidCoordinator - Metadata store not polled yet, skipping this run.
2019-08-15T13:48:25,000 INFO [Coordinator-Exec–0] org.apache.druid.server.coordinator.DruidCoordinator - Metadata store not polled yet, skipping this run.
2019-08-15T13:48:30,001 INFO [Coordinator-Exec–0] org.apache.druid.server.coordinator.DruidCoordinator - Metadata store not polled yet, skipping this run.
2019-08-15T13:48:35,001 INFO [Coordinator-Exec–0] org.apache.druid.server.coordinator.DruidCoordinator - Metadata store not polled yet, skipping this run.

I have tried to reset the supervisor but this also has no effect.

The job was created by clicking on the +submit supervisor, if I tried to add it under tasks , raw, JSON, I get the following error message:

Failed to submit task: Could not resolve type id ‘kafka’ into a subtype of [simple type, class org.apache.druid.indexing.common.task.Task]: known type ids = [Task, archive, compact, index, index_hadoop, index_kafka, index_parallel, index_realtime, index_realtime_appenderator, index_sub, kill, move, noop, restore] at [Source: HttpInputOverHTTP@689d33cc[c=1941,q=0,[0]=null,s=STREAM]; line: 1, column: 2]

The broker is showing the following error however hinting the basic authentication is the problem:

2019-08-14T13:15:11,567 ERROR [main] org.apache.druid.curator.discovery.ServerDiscoverySelector - No server instance found for [druid/coordinator]

2019-08-14T13:15:07,738 WARN [main] org.apache.druid.java.util.common.RetryUtils - Retrying (3 of 9) in 3,823ms.

org.apache.druid.java.util.common.IOE: No known server
at org.apache.druid.discovery.DruidLeaderClient.getCurrentKnownLeader(DruidLeaderClient.java:298) ~[druid-server-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.discovery.DruidLeaderClient.makeRequest(DruidLeaderClient.java:131) ~[druid-server-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.discovery.DruidLeaderClient.makeRequest(DruidLeaderClient.java:139) ~[druid-server-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.security.basic.authentication.db.cache.CoordinatorPollingBasicAuthenticatorCacheManager.tryFetchUserMapFromCoordinator(CoordinatorPollingBasicAuthenticatorCacheManager.java:250) ~[?:?]
at org.apache.druid.security.basic.authentication.db.cache.CoordinatorPollingBasicAuthenticatorCacheManager.lambda$fetchUserMapFromCoordinator$1(CoordinatorPollingBasicAuthenticatorCacheManager.java:191) ~[?:?]
at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:86) [druid-core-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:114) [druid-core-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.java.util.common.RetryUtils.retry(RetryUtils.java:104) [druid-core-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.security.basic.authentication.db.cache.CoordinatorPollingBasicAuthenticatorCacheManager.fetchUserMapFromCoordinator(CoordinatorPollingBasicAuthenticatorCacheManager.java:189) [druid-basic-security-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.security.basic.authentication.db.cache.CoordinatorPollingBasicAuthenticatorCacheManager.initUserMaps(CoordinatorPollingBasicAuthenticatorCacheManager.java:282) [druid-basic-security-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.security.basic.authentication.db.cache.CoordinatorPollingBasicAuthenticatorCacheManager.start(CoordinatorPollingBasicAuthenticatorCacheManager.java:107) [druid-basic-security-0.15.0-incubating.jar:0.15.0-incubating]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_222]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_222]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_222]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_222]
at org.apache.druid.java.util.common.lifecycle.Lifecycle$AnnotationBasedHandler.start(Lifecycle.java:443) [druid-core-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.java.util.common.lifecycle.Lifecycle.start(Lifecycle.java:339) [druid-core-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.guice.LifecycleModule$2.start(LifecycleModule.java:140) [druid-core-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.cli.GuiceRunnable.initLifecycle(GuiceRunnable.java:106) [druid-services-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.cli.ServerRunnable.run(ServerRunnable.java:57) [druid-services-0.15.0-incubating.jar:0.15.0-incubating]
at org.apache.druid.cli.Main.main(Main.java:118) [druid-services-0.15.0-incubating.jar:0.15.0-incubating]

Any ideas please (and thanks in advance!)?

Here is the Supervisor script that should flatten out the JSON:

{

“type”: “kafka”,
“dataSchema”: {
“dataSource”: “test-stream-datasource”,
“parser”: {
“type”: “string”,
“parseSpec”: {
“format”: “json”,
“flatternspec”: {
“useFieldDiscovery”: true,
“fields”: [
{
“type”: “path”,
“name”: “odometer”,
“expr”: “$.wejo.odometer”
},
{
“type”: “path”,
“name”: “gForceY”,
“expr”: “$wejo.dt.obj.gForceY”
},
{
“type”: “path”,
“name”: “gForceX”,
“expr”: “$wejo.dt.obj.gForceX”
},
{
“type”: “path”,
“name”: “latitude”,
“expr”: “$wejo.dt.obj.latitude”
},
{
“type”: “path”,
“name”: “longitude”,
“expr”: “$wejo.dt.obj.longitude”
},
{
“type”: “path”,
“name”: “capturedTimestamp”,
“expr”: “wejo.dt.obj.ts" }, { "type": "path", "name": "receivedTimesptamp", "expr": ".wejo.receivedEpochUTC”
},
{
“type”: “path”,
“name”: “speed”,
“expr”: “$wejo.dt.obj.speed”
},
{
“type”: “path”,
“name”: “tripid”,
“expr”: “$wejo.dt.trip”
},
{
“type”: “path”,
“name”: “cid”,
“expr”: “$wejo.dt.cid”
},
{
“type”: “path”,
“name”: “ignition_on”,
“expr”: “$wejo.dt.IgnitionOnTime”
},
{
“type”: “path”,
“name”: “ignition_off”,
“expr”: “$wejo.dt.IgnitionOffTime”
},
{
“type”: “path”,
“name”: “ignition_status”,
“expr”: “$wejo.dt.IgnitionStatus”
}
]
},
“timestampSpec”: {
“column”: “receivedEpochUTC”,
“format”: “auto”
},
“dimensionsSpec”: {
“dimensions”: [
“tripid”,
“cid”,
“longitude”,
“latitude”
]
},
“spatialDimensions”: [
{
“dimName”: “coordinates”,
“dims”: [
“latitude”,
“longitude”
]
}
]
}
},
“metricsSpec”: [
{
“type”: “count”,
“name”: “count-tripids”
},
{
“type”: “doubleMin”,
“name”: “speed_min”,
“fieldName”: “speed”,
“expression”: null
},
{
“type”: “doubleMax”,
“name”: “speed_max”,
“fieldName”: “speed”,
“expression”: null
},
{
“type”: “doubleSum”,
“name”: “odometer_sum”,
“fieldName”: “odometer”,
“expression”: null
},
{
“type”: “doubleSum”,
“name”: “speed_sum”,
“fieldName”: “speed”,
“expression”: null
}
],
“granularitySpec”: {
“type”: “uniform”,
“segmentGranularity”: “DAY”,
“queryGranularity”: {
“type”: “none”
},
“rollup”: false,
“intervals”: null
},
“transformSpec”: {
“filter”: null,
“transforms”:
}
},
“tuningConfig”: {
“type”: “kafka”,
“maxRowsInMemory”: 1000000,
“maxBytesInMemory”: 0,
“maxRowsPerSegment”: 5000000,
“maxTotalRows”: null,
“intermediatePersistPeriod”: “PT10M”,
“basePersistDirectory”: “/opt/druid/var/tmp/1565790821072-0”,
“maxPendingPersists”: 0,
“indexSpec”: {
“bitmap”: {
“type”: “concise”
},
“dimensionCompression”: “lz4”,
“metricCompression”: “lz4”,
“longEncoding”: “longs”
},
“buildV9Directly”: true,
“reportParseExceptions”: false,
“handoffConditionTimeout”: 0,
“resetOffsetAutomatically”: false,
“segmentWriteOutMediumFactory”: null,
“workerThreads”: null,
“chatThreads”: null,
“chatRetries”: 8,
“httpTimeout”: “PT10S”,
“shutdownTimeout”: “PT80S”,
“offsetFetchPeriod”: “PT5S”,
“intermediateHandoffPeriod”: “P2147483647D”,
“logParseExceptions”: false,
“maxParseExceptions”: 2147483647,
“maxSavedParseExceptions”: 0,
“skipSequenceNumberAvailabilityCheck”: false
},
“ioConfig”: {
“topic”: “druid”,
“replicas”: 2,
“taskCount”: 1,
“taskDuration”: “PT600S”,
“consumerProperties”: {
“bootstrap.servers”: “10.205.6.220:9092, 10.205.7.115, 10.205.8.148”
},
“pollTimeout”: 100,
“startDelay”: “PT5S”,
“period”: “PT30S”,
“useEarliestOffset”: false,
“completionTimeout”: “PT1200S”,
“lateMessageRejectionPeriod”: null,
“earlyMessageRejectionPeriod”: null,
“stream”: “druid”,
“useEarliestSequenceNumber”: false
},
“context”: null,
“suspended”: false
}

test.json (14.7 KB)

Hi Rafe, error message like "Could not resolve type id ‘kafka’ into a subtype of " probably indicated you did not include kafka extension to the load when starting the Druid. Can you double check?
https://druid.apache.org/docs/latest/development/extensions-core/kafka-ingestion.html

Thanks

Oh, BTW, Broker plays no role in Kafka Ingestion. So the error you showed was another unrelated problem. For now, that Broker issue looks like a DNS/network problem to me.

First, Thanks for your speedy response!

I have checked and the entry exists as per the instructions: I have attached the config file for completeness and helps with the diagnosis…

I am also unsure and cant find the difference why you would submit via the Supervisor, or the Task Manger, let alone why one is successful, and the other isn’t!

Kind Regards

Rafe

common.runtime.properties.txt (5.01 KB)

Hi,

Anyone got any ideas how I might proceed?

thanks

Hey Rafe!

Noted that you have “flatternspec” rather than “flattenspec” in your JSON - may not really be your issue but … you know… one bite of cake at once… could you retry with that correction?

Rafe: also noticed that in consumerProperties --> bootstrap.servers you haven’t specified ports for a couple of them, only the first in the list. This may be why it’s returning errors re: unable to connect to server.

Thank you Peter for your diligence and speedy response!

I will give them both a go, I am sure you’re on the mark!

Take Care.

Regards

Rafe