How to configure a amqp firehose for realtime ingestion

I’m trying to get the amqp firehose to work for realtime ingestion. but I’m a bit confused which type of ingestion task to use.

In the documentation of the firehoses, it states :

“Firehoses are used in native batch ingestion tasks, stream push tasks automatically created by Tranquility, and the stream-pull (deprecated) ingestion model.”

I don’t want batch ingestion. Tranquility can’t handle the flattenspec, and i assume I dont need this extra server. If I follow the link to steam-pull it states I should use the Kafka indexing service (posting a supervisor. but they don’t support firehoses).

I’m able to get a amqp connection when using a default index-task but, but the messages aren’t consumed. All other tries don’t even build the amqp connection.

If you are using RabbitMQ as your AMQP product, then you can use the following extension

https://druid.apache.org/docs/latest/development/extensions-contrib/rabbitmq.html

If not you have couple of options

  1. Build your own firehoses for your AQMP product - https://druid.apache.org/docs/latest/development/modules.html

  2. You can use tranquility as a Java API in your java application which in turn integrate with your AQMP product - https://github.com/druid-io/tranquility/blob/master/docs/core.md

Thanks for the fast reply Muthu,

But it was not the answer i was looking for. I installed the druid-rabbitmq extension. It does work (at least it creates the queue, and opens a channel). But I want to know what kind of ingestion task should be created to have it process the messages on the queue.

The manual states "“Firehoses are used in native batch ingestion tasks, stream push tasks automatically created by Tranquility, and the stream-pull (deprecated) ingestion model.”

If possible I don’t want to create a separate application for this (Tranquility). And I don’t want to used deprecated technologies “stream-pull (deprecated)” ). This leaves the “native batch ingestion task” but, is this applicable for realtime ingestion?

Oh okay got it. Here is an example for the ingestion spec that you can use.

“ioConfig”: {

 "type": "realtime",

 "firehose": {

   "type": "rabbitmq",

   "connection": {

     "host": "<IP_ADDRESS>",

     "port": "5672",

     "username": "guest",

  "password": "guest"

   },

   "config": {

     "exchange": "exchange",

     "queue": "queue",

     "routingKey": "queue",

     "durable": "false",

     "exclusive": "false",

     "autoDelete": "false",

     "maxRetries": "10",

     "retryIntervalSeconds": "1",

     "maxDurationSeconds": "300"

   }

 },

 "appendToExisting": false

}

``

Hope this helps.