Connecting to Druid via Spark via JDBC

Hi all,

I am interested in querying Druid via Spark. I know there is a separate project for doing so (https://github.com/SparklineData/spark-druid-olap) but I was curious as to whether the new JDBC support might not be a better supported option.

I am wholly unfamiliar with the Avatica driver and I am unclear as to what class is the proper entry point.

I have tried:

val druidDf = sqlContext.read.format(“jdbc”).options(Map(“url” -> “jdbc:avatica:remote:url=http://mydruidbroker:8082/druid/v2/sql/avatica/”, “dbtable” -> “mydruidtable”, “driver” -> “org.apache.calcite.avatica.remote.Driver”, “fetchSize”->“10000”)).load()

But this gives me an UnsupportedOperationException.

I tried changing the driver to org.apache.calcite.avatica.UnregisteredDriver but this gives me:

java.lang.IllegalAccessException: Class org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$ can not access a member of class org.apache.calcite.avatica.UnregisteredDriver with modifiers “protected”

I presume this is because the constructor is protected.

If someone can point me in the correct direction I would greatly appreciate it.

Thanks,

Do you have a message or stack trace for the UnsupportedOperationException? That’d help.

The Spark docs have a troubleshooting step that talks about getting a classloader set up, which may or may not be related (https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases):

My apologies for the delay. It appears to be a problem using prepared statements:

val dw2 = sqlContext.read.format(“jdbc”).options(Map(“url” -> “jdbc:avatica:remote:url=http://jarvis-druid-query002:8082/druid/v2/sql/avatica/”, “dbtable” -> “sor_business_events_all”, “driver” -> “org.apache.calcite.avatica.remote.Driver”, “fetchSize”->“10000”)).load()

java.lang.UnsupportedOperationException

at org.apache.calcite.avatica.AvaticaConnection.prepareStatement(AvaticaConnection.java:275)

at org.apache.calcite.avatica.AvaticaConnection.prepareStatement(AvaticaConnection.java:121)

at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:122)

at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)

at org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:57)

at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:25)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:30)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:32)

at $iwC$$iwC$$iwC$$iwC$$iwC.(:34)

at $iwC$$iwC$$iwC$$iwC.(:36)

at $iwC$$iwC$$iwC.(:38)

at $iwC$$iwC.(:40)

at $iwC.(:42)

at (:44)

at .(:48)

at .()

at .(:7)

at .()

at $print()

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)

at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)

at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)

at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)

at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)

at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)

at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)

at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)

at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)

at org.apache.spark.repl.Main$.main(Main.scala:31)

at org.apache.spark.repl.Main.main(Main.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)

at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Hmm, looks like something missing in Avatica. Is it possible to get Spark to avoid using prepared statements?

The Calcite folks may also be able to help with this, maybe they can shed some light on why the method isn’t implemented.

Thanks Gian. I have opened a ticket with the Avatica/Calcite folks.

–Ben

hii guys,

Does anybody know, where we should to copy avatica.jar in druid?

How to connect druid with tableau or grafana (as SQL datasource)?

Thank you so much,

there is a druid plugin in grafana, you can use it.

在 2018年6月11日星期一 UTC+8下午5:32:42,lethuy…@gmail.com写道:

This is a problem for me too
Did you solve this problem?

this works for me

val dw2 = sqlContext.read.format(“jdbc”).options(Map(“url” -> “jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/”, “dbtable” -> “Fire_Department_Calls_for_service”, “driver” -> “org.apache.calcite.avatica.remote.Driver”, “fetchSize”->“10000”)).load()

dw2: org.apache.spark.sql.DataFrame = [ALS Unit: string, Address: string … 43 more fields]

I ran spark shell like ./spark-shell --driver-class-path …/…/avatica-1.12.0.jar --jars …/…/avatica-1.12.0.jar

vijay