RabbitMQ realtime ingestion

Hi all,

I am trying to use RabbitMQ as a data stream to be ingested by a realtime node but am getting a strange error/stacktrace. Anyone got ideas what might be causing this?

2015-11-05T00:39:09,501 INFO [chief-rabbitmq[0]] net.jodah.lyra.internal.ConnectionHandler - Created connection cxn-1 to amqp://127.0.0.1:5672/

2015-11-05T00:39:09,506 ERROR [chief-rabbitmq[0]] io.druid.segment.realtime.RealtimeManager - RuntimeException aborted realtime processing[rabbitmq]: {class=io.druid.segment.realtime.RealtimeManager, exceptionType=class java.lang.IllegalArgumentException, exceptionMessage=interface net.jodah.lyra.config.ConfigurableConnection is not visible from class loader}

java.lang.IllegalArgumentException: interface net.jodah.lyra.config.ConfigurableConnection is not visible from class loader

at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:616) ~[?:1.7.0_79]

at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:592) ~[?:1.7.0_79]

at java.lang.reflect.WeakCache$Factory.get(WeakCache.java:244) ~[?:1.7.0_79]

at java.lang.reflect.WeakCache.get(WeakCache.java:141) ~[?:1.7.0_79]

at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:455) ~[?:1.7.0_79]

at java.lang.reflect.Proxy.newProxyInstance(Proxy.java:738) ~[?:1.7.0_79]

at net.jodah.lyra.Connections.create(Connections.java:66) ~[?:?]

at io.druid.firehose.rabbitmq.RabbitMQFirehoseFactory.connect(RabbitMQFirehoseFactory.java:154) ~[?:?]

at io.druid.firehose.rabbitmq.RabbitMQFirehoseFactory.connect(RabbitMQFirehoseFactory.java:100) ~[?:?]

at io.druid.segment.realtime.FireDepartment.connect(FireDepartment.java:97) ~[druid-server-0.8.1.jar:0.8.1]

at io.druid.segment.realtime.RealtimeManager$FireChief.initFirehose(RealtimeManager.java:203) ~[druid-server-0.8.1.jar:0.8.1]

at io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:247) [druid-server-0.8.1.jar:0.8.1]

``

I’ve got the “io.druid.extensions:druid-rabbitmq:0.8.1” extension in my config file and other applications can read/write fine to my local RabbitMQ server.

Hmm, seems to be some CP issue. Are you relying on maven to download the rabbit extension?

Hi Fangjin,

I’ve resolved the initial issue by downloading net.jodah.lyre:0.3.1 and putting it into the libs directory along with amqp-client:3.2.1.

However now I am running into another issue:

Exception in thread “chief-rabbitmq[0]” java.lang.RuntimeException: java.io.IOException

at com.google.common.base.Throwables.propagate(Throwables.java:160)

at io.druid.segment.realtime.RealtimeManager$FireChief.initFirehose(RealtimeManager.java:207)

at io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeManager.java:247)

Caused by: java.io.IOException

at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)

at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)

at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)

at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:825)

at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:840)

at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:61)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at net.jodah.lyra.internal.util.Reflection.invoke(Reflection.java:11)

at net.jodah.lyra.internal.ChannelHandler$1.call(ChannelHandler.java:68)

at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:58)

at net.jodah.lyra.internal.ChannelHandler.invoke(ChannelHandler.java:57)

at com.sun.proxy.$Proxy79.queueBind(Unknown Source)

at io.druid.firehose.rabbitmq.RabbitMQFirehoseFactory.connect(RabbitMQFirehoseFactory.java:169)

at io.druid.firehose.rabbitmq.RabbitMQFirehoseFactory.connect(RabbitMQFirehoseFactory.java:100)

at io.druid.segment.realtime.FireDepartment.connect(FireDepartment.java:97)

at io.druid.segment.realtime.RealtimeManager$FireChief.initFirehose(RealtimeManager.java:203)

… 1 more

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - operation not permitted on the default exchange, class-id=50, method-id=20), null, “”}

at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)

at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)

at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)

at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)

at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)

… 17 more

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - operation not permitted on the default exchange, class-id=50, method-id=20), null, “”}

at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:474)

at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)

at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)

at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)

at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:551)

Nov 05, 2015 2:45:15 AM com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory getComponentProvider

``

I think this might be an auth problem though that I need to resolve with my setup.

Ok so I was able to resolve the previous error by assigning the queue I’m reading from to a non-default exchange. However, the RabbitMQ firehose doesn’t look like it is actually consuming anything off of the queue… More on this tomorrow.

Hi Fangjin, so I tried using the indexing service instead of a realtime node and I was able to get Druid to consume messages off of RabbitMQ. However the messages are not being acked and do not become available for querying on the Druid side.

For this test I was running the indexing service, coordinator node, and broker node. Do I need to run a historic or realtime node as well? To me the historic node only makes sense for serving indexed batch data while realtime indexing is labelled as an alternative to using a realtime node. This leaves me confused on what nodes are required for realtime indexing so any clarification about this would be helpful.

I have also attached the realtime indexing task spec and query files I am using.

Thanks!

query.json (335 Bytes)

realtime.spec (1.65 KB)

Hi Andrei, for just testing, you don’t need historicals if you are only looking at recent data. If you want to read more about realtime indexing, please see:
https://groups.google.com/forum/#!searchin/druid-development/fangjin$20yang$20"thoughts"/druid-development/aRMmNHQGdhI/muBGl0Xi_wgJ

We will deprecate realtime nodes sometime in the future.