SQL Firehose

Hi, I’ve been trying to get the SQL Firehose working, however I keep getting stumped with this error:

2019-01-25T05:52:07,444 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.common.task.IndexTask - Encountered exception in DETERMINE_PARTITIONS. java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to java.nio.ByteBuffer at org.apache.druid.segment.transform.TransformingStringInputRowParser.parseBatch(TransformingStringInputRowParser.java:31) ~[druid-processing-0.13.0-incubating.jar:0.13.0-incubating] at org.apache.druid.data.input.impl.SqlFirehose.nextRow(SqlFirehose.java:67) ~[druid-api-0.13.0-incubating.jar:0.13.0-incubating] at org.apache.druid.indexing.common.task.IndexTask.collectIntervalsAndShardSpecs(IndexTask.java:768) ~[druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating] at org.apache.druid.indexing.common.task.IndexTask.createShardSpecsFromInput(IndexTask.java:694) ~[druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating] at org.apache.druid.indexing.common.task.IndexTask.determineShardSpecs(IndexTask.java:629) ~[druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating] at org.apache.druid.indexing.common.task.IndexTask.run(IndexTask.java:436) [druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating] at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:421) [druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating] at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:393) [druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_191] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]

I’ve been trying to use this on a simple mysql db and tweaking the settings back and forth to try and find the write way of doing things, but no luck.

I’ve not been able to find a complete example of the sql firehose being used, so I’m a bit stumped!

(It looks like SqlFirehose is trying to cram the wrong datastructure into the parseBatch method)

Has anyone used this successfully?

Hey Nathan,

I was able to get this working successfully. Could you post your ingestion spec here?

Thanks,

Atul

Thanks Atul!

I’ve posted the my spec below. I’m fairly sure the query is running ok, but I think I’m missing something with the parser

For clarity, I’ve included the error below with better formatting

{
“type”: “index”,
“spec”: {
“dataSchema” : {
“dataSource” : “sql-test”,
“parser” : {
“type” : “string”,
“parseSpec” : {
“format” : “timeAndDims”,
“timestampSpec” : {
“column” : “UpdateDate”,
“format” : “auto” },
“dimensionsSpec” : {
“dimensions” :
[“ID”, “f_name”, “l_name”, “UpdateDate”],
“dimensionExclusions” : ,
“spatialDimensions” :
},
“columns”: [“ID”, “f_name”, “l_name”, “UpdateDate”]
}
},

        "granularitySpec" : {
            "segmentGranularity" : "DAY",
            "queryGranularity" : "NONE",
            "rollup": "false"
        }
    },
    "ioConfig" : {
        "type" : "index",
        "firehose" : {
            "type" : "sql",
            "database": {
                "type": "mysql",
                "connectorConfig" : {
                    "connectURI" : "jdbc:mysql://server/database",
                    "user" : "username",
                    "password" : "password"
                }
             },                
            "sqls" : ["SELECT ID, f_name, l_name, UpdateDate FROM Sample"]
        }
    }
}

}

``

2019-01-28T23:45:03,658 INFO [task-runner-0-priority-0] org.apache.druid.data.input.impl.prefetch.CacheManager - Object[SELECT ID, f_name, l_name, UpdateDate FROM Sample] is cached. Current cached bytes is [140]
2019-01-28T23:45:03,662 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.common.task.IndexTask - Encountered exception in DETERMINE_PARTITIONS.
java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to java.nio.ByteBuffer
at org.apache.druid.segment.transform.TransformingStringInputRowParser.parseBatch(TransformingStringInputRowParser.java:31) ~[druid-processing-0.13.0-incubating.jar:0.13.0-incubating]
at org.apache.druid.data.input.impl.SqlFirehose.nextRow(SqlFirehose.java:67) ~[druid-api-0.13.0-incubating.jar:0.13.0-incubating]
at org.apache.druid.indexing.common.task.IndexTask.collectIntervalsAndShardSpecs(IndexTask.java:768) ~[druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating]
at org.apache.druid.indexing.common.task.IndexTask.createShardSpecsFromInput(IndexTask.java:694) ~[druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating]
at org.apache.druid.indexing.common.task.IndexTask.determineShardSpecs(IndexTask.java:629) ~[druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating]
at org.apache.druid.indexing.common.task.IndexTask.run(IndexTask.java:436) [druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:421) [druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:393) [druid-indexing-service-0.13.0-incubating.jar:0.13.0-incubating]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]

``

Changing the parser type from “string” to “map” worked.