Measuring throughput of Tranquility Core API

Hi -

Can you help me understand how to measure the through of the Tranquility Core API?

Here is the sender code -

sender.send(obj).addEventListener(new FutureEventListener() {

@Override
public void onSuccess(BoxedUnit value) {
logger.info("TranquilitySender.sendDataToDruid() :: Sent message: " + obj);
}

@Override
public void onFailure(Throwable e) {
if (e instanceof MessageDroppedException) {
//e.printStackTrace();
logger.warn(“TranquilitySender.sendDataToDruid() :: Dropped message: %s”, e);
} else {
//e.printStackTrace();
logger.error(“TranquilitySender.sendDataToDruid() :: Failed to send message: %s”, e);
}
}
});

``

One way to capture is keep a running counter and update the counter either via OnSuccess method or OnFailure method and capture the time period before and after sender.

PS: Feel free to share sample code if you already went through the same exercise!

Thoughts ?

Anyone?

Incrementing a counter in onSuccess/onFailure sounds good to me. When you’re done sending all your objects, call “sender.flush()”, which will block until they’re all done. After that call returns, your counters should all be up to date.

I understand flush() forces to flush the existing batch and wait for the status. But is it absolutely required to call flush() or is it just for me to get my throughput numbers?

Here is what I have today - data in kafka is not in json or any other format accepted by Druid. It is in some arbitrary format like List<Event<Map<String,?>>>. So I have a consumer that reads the messages from Kafka and convert that into a List<Map<String, Object>> and this is straight away passed to the sendDataToDruid() method as shown below. In the sendDataToDruid() method, I iterate through this list and pass the individual map to druid via tranquility api. Often, my list may contain only single map. Because Tranquility is batching the messages internally, I am not doing anything special. But I have the capability to run multiple instances of my consumer. So again, in this case, do I need to call flush() to get updated counts?

public static void sendDataToDruid(String senderName, List<Map<String, Object>> metricList)
throws DBException, DataException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, IOException {
long startTime = System.currentTimeMillis();
logger.info("TranquilitySender.sendDataToDruid() :: start ");
Tranquilizer<Map<String, Object>> sender = TranquilitySenderRegistry.getInstance().getSender(senderName);

    if (sender == null)
        return;

    try {
        // sender.start();
        for (Map obj : metricList) {
            // Asynchronously send event to Druid:
            sender.send(obj).addEventListener(new FutureEventListener<BoxedUnit>() {
                @Override
                public void onSuccess(BoxedUnit value) {
                    logger.info("TranquilitySender.sendDataToDruid() :: Sent message: " + obj);
                }

                @Override
                public void onFailure(Throwable e) {
                    if (e instanceof MessageDroppedException) {
                        //e.printStackTrace();
                        logger.warn("TranquilitySender.sendDataToDruid() :: Dropped message: %s", e);
                    } else {
                        //e.printStackTrace();
                        logger.error("TranquilitySender.sendDataToDruid() :: Failed to send message: %s", e);
                    }
                }
            });
        }
    } catch (Exception e) {
        logger.error("TranquilitySender.sendDataToDruid() :: error pushing data into druid");
        e.printStackTrace();
    }
    logger.info("TranquilitySender.sendDataToDruid() :: ended in " + (System.currentTimeMillis() - startTime) + " ms");
}

``

Gian - Thoughts?

All “flush” does is guarantee that all messages you have passed to “send” have actually succeeded or failed, and have had their listeners called. So, if you have some other way of confirming that, then you don’t need flush (perhaps you are retaining references to the futures and awaiting on them, or perhaps you are counting down a latch in your listeners).