Tranquility API - ConcurrentModificationException

I have a single Kafka Topic into which multiple types of messages are dropped and each of these message types belong to a different datasource.

Because I have couple of datasources, I am constructing a registry on start-up to cache tranquilizer senders.

Here is my code for TranquilitySenderRegistry - http://pastebin.com/GJV8R1kc

I have a Kafka Consumer that processes messages of these different type and spawns a new thread and passes the batch of messages to drop into Druid. Here is the consumer processor code -

private CompletionService pool = new ExecutorCompletionService(exec);
public void process(MMetadata<?, ?> mMetadata, List events) {
logger.debug(“Total # " + events.size() + " event(s)”);
try {
for (MapEvent event : events) {
String datasourceName = event.getHeaderAsString(MapEvent.HEADER
List<Map<String, ?>> evtBody = event.getBody();
pool.submit(new TranquilitySenderThread(datasourceName , evtBody));
}
} catch (Exception e) {
e.printStacktrace();
}
}
private class TranquilitySenderThread implements Callable {

    String datasourceName = null;

    List<Map<String, ?>> list = null;

    public TranquilitySenderThread(String datasourceName , List<Map<String, ?>> list) {

        this.datasourceName = datasourceName ;

        this.list= list;

    }

    @Override

    public Boolean call() throws Exception {

        TranquilitySender.sendDataToDruid(datasourceName , list);

        return true;

    }

}

``

And finally, my TranquilitySender.sendDateToDrui() method -

public static void sendDataToDruid(String datasourceName, List<Map<String, ?>> metricList)
throws DBException, DataException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, IOException {
long startTime = System.currentTimeMillis();
Tranquilizer<Map<String, Object>> sender = TranquilitySenderRegistry.getInstance().getSender(datasourceName);
if (sender == null)
return;
try {
for (Map obj : metricList) {
// Asynchronously send event to Druid:
sender.send(obj).addEventListener(new FutureEventListener() {
@Override
public void onSuccess(BoxedUnit value) {
logger.info(“Sent message: %s”, obj);
}

                @Override
                public void onFailure(Throwable e) {
                    if (e instanceof MessageDroppedException) {
                        e.printStackTrace();
                        logger.warn("Dropped message: %s", e);
                    } else {
                        e.printStackTrace();
                        logger.error("Failed to send message: %s", e);
                    }
                }
            });
        }
    } catch (Exception e) {
        logger.error("error pushing data into druid");
        e.printStackTrace();
    }finally {
        //sender.close();
    }
    logger.info("TranquilitySender.sendDataToDruid() :: ended in " + (System.currentTimeMillis() - startTime) + " ms");
}

``

When I run this code, I see -

java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
at java.util.HashMap$EntryIterator.next(HashMap.java:1463)
at java.util.HashMap$EntryIterator.next(HashMap.java:1461)
at java.util.AbstractMap.hashCode(AbstractMap.java:507)
at com.metamx.tranquility.partition.GenericTimeAndDimsPartitioner.partition(GenericTimeAndDimsPartitioner.scala:88)
at com.metamx.tranquility.beam.MessageHolder$$anonfun$partition$1.apply$mcI$sp(MessageHolder.scala:41)
at com.metamx.tranquility.beam.MessageHolder$$anonfun$partition$1.apply(MessageHolder.scala:41)
at com.metamx.tranquility.beam.MessageHolder$$anonfun$partition$1.apply(MessageHolder.scala:41)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
at com.metamx.tranquility.beam.MessageHolder.partition(MessageHolder.scala:41)
at com.metamx.tranquility.beam.MessageHolder$$anon$2.partition(MessageHolder.scala:71)
at com.metamx.tranquility.beam.MessageHolder$$anon$2.partition(MessageHolder.scala:69)
at com.metamx.tranquility.beam.MergingPartitioningBeam$$anonfun$1.apply(MergingPartitioningBeam.scala:38)
at com.metamx.tranquility.beam.MergingPartitioningBeam$$anonfun$1.apply(MergingPartitioningBeam.scala:37)
at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:328)
at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:327)
at scala.collection.AbstractTraversable.groupBy(Traversable.scala:105)
at com.metamx.tranquility.beam.MergingPartitioningBeam.sendBatch(MergingPartitioningBeam.scala:37)
at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$26$$anonfun$apply$15.apply(ClusteredBeam.scala:381)
at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$26$$anonfun$apply$15.apply(ClusteredBeam.scala:379)
at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:894)
at com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:893)
at com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:112)
at com.twitter.util.Promise$Transformer.k(Promise.scala:112)
at com.twitter.util.Promise$Transformer.apply(Promise.scala:122)
at com.twitter.util.Promise$Transformer.apply(Promise.scala:103)
at com.twitter.util.Promise$$anon$2.run(Promise.scala:743)
at com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:178)
at com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:136)
at com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:207)
at com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:92)
at com.twitter.util.Promise.continue(Promise.scala:741)
at com.twitter.util.Promise$Chained.continue(Promise.scala:222)
at com.twitter.util.Promise$Responder$class.transform(Promise.scala:187)
at com.twitter.util.Promise$Chained.transform(Promise.scala:194)
at com.twitter.util.Future.flatMap(Future.scala:893)
at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$26.apply(ClusteredBeam.scala:378)
at com.metamx.tranquility.beam.ClusteredBeam$$anonfun$26.apply(ClusteredBeam.scala:374)
at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.metamx.tranquility.beam.ClusteredBeam.sendBatch(ClusteredBeam.scala:374)
at com.metamx.tranquility.druid.DruidBeams$Builder$$anon$5.sendBatch(DruidBeams.scala:636)
at com.metamx.tranquility.beam.TransformingBeam.sendBatch(TransformingBeam.scala:28)
at com.metamx.tranquility.tranquilizer.Tranquilizer.com$metamx$tranquility$tranquilizer$Tranquilizer$$sendBuffer(Tranquilizer.scala:299)
at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:200)
at com.metamx.tranquility.tranquilizer.Tranquilizer$$anonfun$send$1.apply(Tranquilizer.scala:200)
at scala.Option.foreach(Option.scala:236)
at com.metamx.tranquility.tranquilizer.Tranquilizer.send(Tranquilizer.scala:200)
at com.priceline.druid.tranquility.sender.TranquilitySender.sendDataToDruid(TranquilitySender.java:41)
at com.priceline.druid.monitor.index.processor.KafkaBAMBatchProcessor$TranquilitySenderThread.call(KafkaBAMBatchProcessor.java:74)
at com.priceline.druid.monitor.index.processor.KafkaBAMBatchProcessor$TranquilitySenderThread.call(KafkaBAMBatchProcessor.java:1)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

``

What am I doing wrong?

  • For every batch of messages I send to the sender that pushes the messages to druid, do I need to call the
    sender.start()

``

  • and after the messages are sent, do I need to call

sender.close()

``

  • can the sender be re-used after closing?

Thanks!

Hey Jagadeesh,

You should only call start and stop/close once each. The sender isn’t usable after calling close – you should only do that when you’re done with it. But it is thread safe and so a single sender can be used by multiple threads. That ConcurrentModificationException looks like you’re modifying the maps after you pass them to Tranquility. Please avoid that, you should not modify things after passing them along.

Btw, since Tranquilizers are thread safe and asynchronous, you might not even need that separate thread pool for sending things. Unless there is something else it’s buying you, you should be find just calling send directly from your main thread.

Thanks Gian! This is helpful and I moved on to my next task :slight_smile: