When getRow inside custom filter, it fails to read row afterward

Hi!

I’m currently working on extend custom filter.

Called SeqFilter, that is filter multivalue with it’s array index and value.

If express sql it would be like “arr[0] = ‘xxx’”

I implemented custom filter by reading row by dimensionHandler runtime,

and compare row.get(2) with xxx’s dictionay

see below

@Override
public ValueMatcher makeMatcher(ColumnSelectorFactory factory) {
    return new ValueMatcher() {
        @Override
        public boolean matches() {
            // Caching
            if(ds == null) {
                ds = factory.makeDimensionSelector(DefaultDimensionSpec.of("arr"));
                final IdLookup idLookup = ds.idLookup();
                dictVal = idLookup.lookupId("xxx");
            }

            final IndexedInts row = ds.getRow();
            System.out.println(row.size());
            if(row.size() > 2) {
                return row.get(2) == dictVal;
            }

            return false;
        }
    };
}

But exception occured undexpectably, which is saying

Exception in thread “main” io.druid.java.util.common.IAE: Index[10794] >= size[139]

at io.druid.segment.data.GenericIndexed.checkIndex(GenericIndexed.java:231)

at io.druid.segment.data.GenericIndexed.access$200(GenericIndexed.java:72)

at io.druid.segment.data.GenericIndexed$4.get(GenericIndexed.java:459)

at io.druid.segment.data.CompressedVSizeIntsIndexedSupplier$CompressedVSizeIndexedInts.loadBuffer(CompressedVSizeIntsIndexedSupplier.java:390)

at io.druid.segment.data.CompressedVSizeIntsIndexedSupplier$CompressedVSizeIndexedInts.get(CompressedVSizeIntsIndexedSupplier.java:351)

at io.druid.segment.CompressedVSizeIndexedSupplier$CompressedVSizeIndexed$1.get(CompressedVSizeIndexedSupplier.java:197)

I digging some code and long time debugging,

it turns out as soon as ds.getRow method called abnormal exception be thrown.

I can’t not understand why this happen…

Question 1. Can’t i get row runtime and compare value?

Question 2. Can i get some advise about extend SeqQuery that i working on?

Thanks.

Below is my full code

MultiValueReadTest.java

// Skip above lines

        try {
            final ObjectMapper om = kdyInjector.getInstance(Key.get(ObjectMapper.class, Json.class));
            final IndexIO indexIO = kdyInjector.getInstance(IndexIO.class);
            final QueryableIndex index = indexIO.loadIndex(new File("/tmp/druid"));

            final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(index);
            final TestFilter f1 = new TestFilter();
            final Sequence<Cursor> cursors = adapter.makeCursors(
                    f1,
                    index.getDataInterval().withChronology(ISOChronology.getInstanceUTC()),
                    VirtualColumns.EMPTY,
                    Granularities.ALL,
                    false
            );

            final List<String> columnNames = Arrays.asList("arr");
            final Sequence<Cursor> sequence = Sequences.map(cursors, new Function<Cursor, Cursor>() {
                @Override
                public Cursor apply(Cursor cursor) {
                    System.out.println("Reading, cursor -> " + cursor);
                    return cursor;
                }
            });

            Cursor cursor = sequence.accumulate(
                    null,
                    new Accumulator<Cursor, Cursor>() {
                        @Override
                        public Cursor accumulate(Cursor NOT_USING, Cursor in) {
                            System.out.println(in);
                            return in;
                        }
                    }
            );

            final List<ObjectColumnSelector> selectors = Lists.newArrayList();
            for(final String columnName : columnNames) {
                selectors.add(makeSelector(columnName, index.getColumn(columnName), cursor));
            }

            int count = 0;
            while(!cursor.isDone()) {
                for(final ObjectColumnSelector selector : selectors) {
                    final Object tmp = selector.get();
                    if(tmp != null) {
                        System.out.println("--------------------");
                        System.out.println(tmp);
                        System.out.println(tmp.getClass());
                        System.out.println("--------------------");
                    }
                }
                cursor.advance();
                count++;
            }

            System.out.println("count -> " + count);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static ObjectColumnSelector makeSelector(
            final String columnName,
            final Column column,
            final ColumnSelectorFactory columnSelectorFactory
    )
    {
        final ObjectColumnSelector selector;

        if (column.getDictionaryEncoding() != null) {
            // Special case for dimensions -> always wrap multi-value in arrays
            final DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(
                    new DefaultDimensionSpec(columnName, columnName)
            );

            if (column.getDictionaryEncoding().hasMultipleValues()) {
                return new ObjectColumnSelector<List>()
                {
                    @Override
                    public Class<List> classOfObject()
                    {
                        return List.class;
                    }

                    @Override
                    public List<String> get()
                    {
                        final IndexedInts row = dimensionSelector.getRow();

                        if(row.size() > idx) {
                            final int got = row.get(idx);
                            if(got == dictVal) {
                                final List<String> retVal = Lists.newArrayList();
                                for (int i = 0; i < row.size(); i++) {
                                    retVal.add(dimensionSelector.lookupName(row.get(i)));
                                }

                                return retVal;
                            }
                        }

                        return null;
                    }
                };
            } else {
                return new ObjectColumnSelector<String>()
                {
                    @Override
                    public Class<String> classOfObject()
                    {
                        return String.class;
                    }

                    @Override
                    public String get()
                    {
                        final IndexedInts row = dimensionSelector.getRow();
                        return row.size() == 0 ? null : dimensionSelector.lookupName(row.get(0));
                    }
                };
            }
        } else {
            final ObjectColumnSelector maybeSelector = columnSelectorFactory.makeObjectColumnSelector(columnName);
            if (maybeSelector != null) {
                selector = maybeSelector;
            } else {
                // Selector failed to create (unrecognized column type?)
                selector = new ObjectColumnSelector()
                {
                    @Override
                    public Class classOfObject()
                    {
                        return Object.class;
                    }

                    @Override
                    public Object get()
                    {
                        return null;
                    }
                };
            }
        }

        return selector;
    }

TestFilter


public class TestFilter implements Filter {
    private ImmutableBitmap bitmap = null;
    private int dictVal = -1;
    private DimensionSelector ds = null;

    @Override
    public ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector) {
        System.out.println("TestFilter.getBitmapIndex, " + selector);
        return null;
    }

    @Override
    public double estimateSelectivity(BitmapIndexSelector indexSelector) {
        System.out.println("TestFilter.estimateSelectivity, " + indexSelector);
        return 0;
    }

    @Override
    public ValueMatcher makeMatcher(ColumnSelectorFactory factory) {
        System.out.println("TestFilter.makeMatcher, " + factory);

        return new ValueMatcher() {
            @Override
            public boolean matches() {
                // Caching
                if(ds == null) {
                    ds = factory.makeDimensionSelector(DefaultDimensionSpec.of("arr"));
                    final IdLookup idLookup = ds.idLookup();
                    dictVal = idLookup.lookupId("xxx");
                }

                final IndexedInts row = ds.getRow();
                System.out.println(row.size());
                if(row.size() > 2) {
                    return row.get(2) == dictVal;
                }

                return false;
            }
        };
    }

    @Override
    public boolean supportsBitmapIndex(BitmapIndexSelector selector) {
        // It's post filter
        System.out.println("TestFilter.supportsBitmapIndex, " + selector);
        return false;
    }

    @Override
    public boolean supportsSelectivityEstimation(ColumnSelector columnSelector, BitmapIndexSelector indexSelector) {
        System.out.println("TestFilter.supportsSelectivityEstimation, " + columnSelector + ", " + indexSelector);
        return false;
    }
}