Skip to content

Commit

Permalink
Initial support for BytesRefBlock to support WKB for Geometries
Browse files Browse the repository at this point in the history
This commit does not yet remove support for PointBlock, since we are investigating how to kabe multiple block types backing the same geometries.
  • Loading branch information
craigtaverner committed Dec 22, 2023
1 parent 261bf04 commit eb47224
Show file tree
Hide file tree
Showing 45 changed files with 562 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,15 @@ protected Object parseSourceValue(Object value) {
public BlockLoader blockLoader(BlockLoaderContext blContext) {
// TODO: If we have doc-values we have to use them, due to BlockSourceReader.columnAtATimeReader() returning null
if (blContext.forStats() && hasDocValues()) {
// LongsBlock only works for points, we need something else for shapes
return new BlockDocValuesReader.LongsBlockLoader(name());
}
// TODO: Enhance BlockLoaderContext with knowledge about preferring to load from source (see EsPhysicalOperationProviders)
return new BlockSourceReader.PointsBlockLoader(
// TODO: See if there is value in using the optimized PointsBlock which uses less memory and does fewer conversions
// return new BlockSourceReader.PointsBlockLoader(
// valueFetcher(blContext.sourcePaths(name()), nullValue, GeometryFormatterFactory.WKT)
// );
// GeometriesBlockLoader currently only supports points, and needs more work for shapes
return new BlockSourceReader.GeometriesBlockLoader(
valueFetcher(blContext.sourcePaths(name()), nullValue, GeometryFormatterFactory.WKT)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ interface BlockFactory {
*/
PointBuilder points(int expectedCount);

/**
* Build a builder to load {@link SpatialPoint}s backed by WKB in BytesRefBlock.
*/
BytesRefBuilder geometries(int expectedCount);

/**
* Build a builder to load doubles as loaded from doc values.
* Doc values load doubles deduplicated and in sorted order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.geometry.utils.GeometryValidator;
import org.elasticsearch.geometry.utils.WellKnownBinary;
import org.elasticsearch.geometry.utils.WellKnownText;
import org.elasticsearch.search.fetch.StoredFieldsSpec;

import java.io.IOException;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -154,6 +156,24 @@ public RowStrideReader rowStrideReader(LeafReaderContext context) {
}
}

public static class GeometriesBlockLoader extends SourceBlockLoader {
private final ValueFetcher fetcher;

public GeometriesBlockLoader(ValueFetcher fetcher) {
this.fetcher = fetcher;
}

@Override
public Builder builder(BlockFactory factory, int expectedCount) {
return factory.geometries(expectedCount);
}

@Override
public RowStrideReader rowStrideReader(LeafReaderContext context) {
return new Geometries(fetcher);
}
}

private static class BytesRefs extends BlockSourceReader {
BytesRef scratch = new BytesRef();

Expand Down Expand Up @@ -206,6 +226,41 @@ public String toString() {
}
}

private static class Geometries extends BlockSourceReader {

Geometries(ValueFetcher fetcher) {
super(fetcher);
}

@Override
protected void append(BlockLoader.Builder builder, Object v) {
if (v instanceof SpatialPoint point) {
BytesRef wkb = new BytesRef(WellKnownBinary.toWKB(new Point(point.getX(), point.getY()), ByteOrder.LITTLE_ENDIAN));
((BlockLoader.BytesRefBuilder) builder).appendBytesRef(wkb);
} else if (v instanceof String wkt) {
try {
// TODO: figure out why this is not already happening in the GeoPointFieldMapper
Geometry geometry = WellKnownText.fromWKT(GeometryValidator.NOOP, false, wkt);
if (geometry instanceof Point point) {
BytesRef wkb = new BytesRef(WellKnownBinary.toWKB(point, ByteOrder.LITTLE_ENDIAN));
((BlockLoader.BytesRefBuilder) builder).appendBytesRef(wkb);
} else {
throw new IllegalArgumentException("Cannot convert geometry into point:: " + geometry.type());
}
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse point geometry: " + e.getMessage(), e);
}
} else {
throw new IllegalArgumentException("Unsupported source type for point: " + v.getClass().getSimpleName());
}
}

@Override
public String toString() {
return "BlockSourceReader.Geometries";
}
}

public static class DoublesBlockLoader extends SourceBlockLoader {
private final ValueFetcher fetcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ public PointsBuilder appendPoint(SpatialPoint value) {
return new PointsBuilder();
}

@Override
public BlockLoader.BytesRefBuilder geometries(int expectedCount) {
class GeometriesBuilder extends TestBlock.Builder implements BlockLoader.BytesRefBuilder {
@Override
public GeometriesBuilder appendBytesRef(BytesRef value) {
add(BytesRef.deepCopyOf(value));
return this;
}
}
return new GeometriesBuilder();
}

@Override
public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) {
return doubles(expectedCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ public class Types {
static final ClassName SOURCE = ClassName.get("org.elasticsearch.xpack.ql.tree", "Source");

static final ClassName BYTES_REF = ClassName.get("org.apache.lucene.util", "BytesRef");

static final ClassName POINT = ClassName.get("org.elasticsearch.common.geo", "SpatialPoint");
static final ClassName GEOMETRY = ClassName.get("org.elasticsearch.geometry", "Geometry");

static final ClassName RELEASABLE = ClassName.get("org.elasticsearch.core", "Releasable");
static final ClassName RELEASABLES = ClassName.get("org.elasticsearch.core", "Releasables");
Expand All @@ -153,6 +155,11 @@ static ClassName blockType(TypeName elementType) {
return DOUBLE_BLOCK;
}
if (elementType.equals(POINT)) {
// TODO: Decide if we want to support backing POINT with LONG, POINT or BYTES_REF
// return POINT_BLOCK;
return BYTES_REF_BLOCK;
}
if (elementType.equals(GEOMETRY)) {
return POINT_BLOCK;
}
throw new IllegalArgumentException("unknown block type for [" + elementType + "]");
Expand All @@ -175,7 +182,12 @@ static ClassName blockType(String elementType) {
return DOUBLE_BLOCK;
}
if (elementType.equalsIgnoreCase("POINT")) {
return POINT_BLOCK;
// TODO: Decide if we want to support backing POINT with LONG, POINT or BYTES_REF
// return POINT_BLOCK;
return BYTES_REF_BLOCK;
}
if (elementType.equalsIgnoreCase("GEOMETRY")) {
return BYTES_REF_BLOCK;
}
throw new IllegalArgumentException("unknown vector type for [" + elementType + "]");
}
Expand All @@ -197,7 +209,12 @@ static ClassName vectorType(TypeName elementType) {
return DOUBLE_VECTOR;
}
if (elementType.equals(POINT)) {
return POINT_VECTOR;
// TODO: Decide if we want to support backing POINT with LONG, POINT or BYTES_REF
// return POINT_VECTOR;
return BYTES_REF_VECTOR;
}
if (elementType.equals(GEOMETRY)) {
return BYTES_REF_VECTOR;
}
throw new IllegalArgumentException("unknown vector type for [" + elementType + "]");
}
Expand All @@ -219,7 +236,12 @@ static ClassName vectorType(String elementType) {
return DOUBLE_VECTOR;
}
if (elementType.equalsIgnoreCase("POINT")) {
return POINT_VECTOR;
// TODO: Decide if we want to support backing POINT with LONG, POINT or BYTES_REF
// return POINT_VECTOR;
return BYTES_REF_VECTOR;
}
if (elementType.equalsIgnoreCase("GEOMETRY")) {
return BYTES_REF_VECTOR;
}
throw new IllegalArgumentException("unknown vector type for [" + elementType + "]");
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/esql/compute/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
requires org.apache.logging.log4j;
requires org.elasticsearch.logging;
requires org.elasticsearch.tdigest;
requires org.elasticsearch.geo;

exports org.elasticsearch.compute;
exports org.elasticsearch.compute.aggregation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
import org.elasticsearch.common.geo.SpatialPoint;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.geometry.utils.WellKnownBinary;

import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand Down Expand Up @@ -208,14 +212,25 @@ public static void appendValue(Block.Builder builder, Object val, ElementType ty
switch (type) {
case LONG -> ((LongBlock.Builder) builder).appendLong((Long) val);
case INT -> ((IntBlock.Builder) builder).appendInt((Integer) val);
case BYTES_REF -> ((BytesRefBlock.Builder) builder).appendBytesRef(toBytesRef(val));
case BYTES_REF -> ((BytesRefBlock.Builder) builder).appendBytesRef(spatialToBytesRef(val));
case DOUBLE -> ((DoubleBlock.Builder) builder).appendDouble((Double) val);
case BOOLEAN -> ((BooleanBlock.Builder) builder).appendBoolean((Boolean) val);
case POINT -> ((PointBlock.Builder) builder).appendPoint((SpatialPoint) val);
default -> throw new UnsupportedOperationException("unsupported element type [" + type + "]");
}
}

private static BytesRef spatialToBytesRef(Object val) {
// TODO: if the cluster is running an older version that does not support this, should we throw and error here
if (val instanceof SpatialPoint point) {
return new BytesRef(WellKnownBinary.toWKB(new Point(point.getX(), point.getY()), ByteOrder.LITTLE_ENDIAN));
}
if (val instanceof Geometry geometry) {
return new BytesRef(WellKnownBinary.toWKB(geometry, ByteOrder.LITTLE_ENDIAN));
}
return toBytesRef(val);
}

public static Block constantBlock(BlockFactory blockFactory, Object val, int size) {
if (val == null) {
return Block.constantNullBlock(size);
Expand All @@ -229,7 +244,7 @@ private static Block constantBlock(BlockFactory blockFactory, ElementType type,
case NULL -> Block.constantNullBlock(size);
case LONG -> LongBlock.newConstantBlockWith((long) val, size, blockFactory);
case INT -> IntBlock.newConstantBlockWith((int) val, size, blockFactory);
case BYTES_REF -> BytesRefBlock.newConstantBlockWith(toBytesRef(val), size, blockFactory);
case BYTES_REF -> BytesRefBlock.newConstantBlockWith(spatialToBytesRef(val), size, blockFactory);
case DOUBLE -> DoubleBlock.newConstantBlockWith((double) val, size, blockFactory);
case BOOLEAN -> BooleanBlock.newConstantBlockWith((boolean) val, size, blockFactory);
case POINT -> PointBlock.newConstantBlockWith((SpatialPoint) val, size, blockFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.geo.SpatialPoint;
import org.elasticsearch.geometry.Geometry;

/**
* The type of elements in {@link Block} and {@link Vector}
Expand Down Expand Up @@ -77,7 +78,11 @@ public static ElementType fromJava(Class<?> type) {
} else if (type == String.class || type == BytesRef.class) {
elementType = BYTES_REF;
} else if (SpatialPoint.class.isAssignableFrom(type)) {
elementType = POINT;
// TODO: Consider cases where we want to back this with POINT (for efficiency) or LONG (for doc-values)
// elementType = LONG;
elementType = BYTES_REF;
} else if (Geometry.class.isAssignableFrom(type)) {
elementType = BYTES_REF;
} else if (type == Boolean.class) {
elementType = BOOLEAN;
} else if (type == null || type == Void.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,11 @@ public BlockLoader.PointBuilder points(int expectedCount) {
return factory.newPointBlockBuilder(expectedCount);
}

@Override
public BlockLoader.BytesRefBuilder geometries(int expectedCount) {
return factory.newBytesRefBlockBuilder(expectedCount);
}

@Override
public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) {
return factory.newDoubleBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public interface TopNEncoder {
* An encoder for semver versions.
*/
VersionTopNEncoder VERSION = new VersionTopNEncoder();
/**
* An encoder for UTF-8 text.
*/
WKBTopNEncoder WKB = new WKBTopNEncoder();

/**
* Placeholder encoder for unsupported data types.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.utils.GeometryValidator;
import org.elasticsearch.geometry.utils.WellKnownBinary;
import org.elasticsearch.geometry.utils.WellKnownText;

import java.nio.ByteOrder;

/**
* Encodes WKB bytes[].
* Since WKB can contain bytes with zero value, which are used as terminator bytes, we need to encode differently.
* Our initial implementation is to re-write to WKT and encode with UTF8TopNEncoder.
* This is likely very inefficient.
* We cannot use the UTF8TopNEncoder as is, because it removes the continuation byte, which could be a valid value in WKB.
*/
final class WKBTopNEncoder extends SortableTopNEncoder {
@Override
public int encodeBytesRef(BytesRef value, BreakingBytesRefBuilder bytesRefBuilder) {
Geometry geometry = WellKnownBinary.fromWKB(GeometryValidator.NOOP, false, value.bytes, value.offset, value.length);
String wkt = WellKnownText.toWKT(geometry);
return UTF8.encodeBytesRef(BytesRefs.toBytesRef(wkt), bytesRefBuilder);
}

@Override
public BytesRef decodeBytesRef(BytesRef bytes, BytesRef scratch) {
String wkt = BytesRefs.toString(UTF8.decodeBytesRef(bytes, scratch));
try {
Geometry geometry = WellKnownText.fromWKT(GeometryValidator.NOOP, false, wkt);
byte[] wkb = WellKnownBinary.toWKB(geometry, ByteOrder.LITTLE_ENDIAN);
scratch.bytes = wkb;
scratch.offset = 0;
scratch.length = wkb.length;
return scratch;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public String toString() {
return "WKBTopNEncoder";
}

@Override
public TopNEncoder toSortable() {
return this;
}

@Override
public TopNEncoder toUnsortable() {
return this;
}
}
Loading

0 comments on commit eb47224

Please sign in to comment.