From cf5f765d5c436bf120374b9a1917f61cc960e227 Mon Sep 17 00:00:00 2001 From: dingyufei Date: Mon, 2 Dec 2024 12:28:35 +0800 Subject: [PATCH] support datetimev2 convert to timestamp (#200) --- .../doris/spark/cfg/ConfigurationOptions.java | 3 + .../doris/spark/serialization/RowBatch.java | 31 +++++- .../spark/rdd/ScalaADBCValueReader.scala | 2 +- .../doris/spark/rdd/ScalaValueReader.scala | 4 +- .../apache/doris/spark/sql/SchemaUtils.scala | 14 +-- .../spark/serialization/TestRowBatch.java | 101 +++++++++++++++++- .../doris/spark/sql/TestSchemaUtils.scala | 6 +- 7 files changed, 142 insertions(+), 19 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 3d601421..12432d3c 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -34,6 +34,9 @@ public interface ConfigurationOptions { String TABLE_IDENTIFIER = "table.identifier"; String DORIS_TABLE_IDENTIFIER = "doris.table.identifier"; String DORIS_READ_FIELD = "doris.read.field"; + // To convert the datetimev2 to timestamp + String DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED = "doris.read.datetimev2.as.timestamp.enabled"; + boolean DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED_DEFAULT = false; String DORIS_FILTER_QUERY = "doris.filter.query"; String DORIS_FILTER_QUERY_IN_MAX_COUNT = "doris.filter.query.in.max.count"; int DORIS_FILTER_QUERY_IN_VALUE_UPPER_LIMIT = 10000; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index ed61b594..dc5d5306 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -45,6 +45,8 @@ import org.apache.arrow.vector.types.Types; import org.apache.commons.lang3.ArrayUtils; import org.apache.doris.sdk.thrift.TScanBatchResult; +import org.apache.doris.spark.cfg.ConfigurationOptions; +import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.rest.models.Schema; import org.apache.doris.spark.util.IPUtils; @@ -59,6 +61,7 @@ import java.math.BigInteger; import java.nio.charset.StandardCharsets; import java.sql.Date; +import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -83,6 +86,7 @@ public class RowBatch { private final ArrowReader arrowReader; private final Schema schema; private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault(); + private boolean datetimev2AsTimestampEnabled = false; private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder() .appendPattern("yyyy-MM-dd HH:mm:ss") @@ -104,7 +108,13 @@ public class RowBatch { private List fieldVectors; public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException { + this(nextResult, schema, null); + } + public RowBatch(TScanBatchResult nextResult, Schema schema, Settings settings) throws DorisException { + if (settings != null) { + this.datetimev2AsTimestampEnabled = settings.getBooleanProperty(ConfigurationOptions.DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED); + } this.rootAllocator = new RootAllocator(Integer.MAX_VALUE); this.arrowReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator); this.schema = schema; @@ -123,8 +133,10 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisExceptio } - public RowBatch(ArrowReader reader, Schema schema) throws DorisException { - + public RowBatch(ArrowReader reader, Schema schema, Settings settings) throws DorisException { + if (settings != null) { + this.datetimev2AsTimestampEnabled = settings.getBooleanProperty(ConfigurationOptions.DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED); + } this.arrowReader = reader; this.schema = schema; @@ -391,7 +403,11 @@ public void convertArrowToRowBatch() throws DorisException { continue; } String value = new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8); - addValueToRow(rowIndex, value); + if (datetimev2AsTimestampEnabled) { + addValueToRow(rowIndex, Timestamp.valueOf(value)); + } else { + addValueToRow(rowIndex, value); + } } } else if (curFieldVector instanceof TimeStampVector) { TimeStampVector timeStampVector = (TimeStampVector) curFieldVector; @@ -401,8 +417,13 @@ public void convertArrowToRowBatch() throws DorisException { continue; } LocalDateTime dateTime = getDateTime(rowIndex, timeStampVector); - String formatted = DATE_TIME_FORMATTER.format(dateTime); - addValueToRow(rowIndex, formatted); + String formatted = dateTimeV2Formatter.format(dateTime); + if (datetimev2AsTimestampEnabled) { + addValueToRow(rowIndex, Timestamp.valueOf(formatted)); + } else { + addValueToRow(rowIndex, formatted); + } + } } else { String errMsg = String.format("Unsupported type for DATETIMEV2, minorType %s, class is %s", mt.name(), curFieldVector.getClass()); diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaADBCValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaADBCValueReader.scala index 3cd3da0f..7eaf08db 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaADBCValueReader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaADBCValueReader.scala @@ -88,7 +88,7 @@ class ScalaADBCValueReader(partition: PartitionDefinition, settings: Settings) e if (!eos.get && (rowBatch == null || !rowBatch.hasNext)) { eos.set(!arrowReader.loadNextBatch()) if (!eos.get) { - rowBatch = new RowBatch(arrowReader, schema) + rowBatch = new RowBatch(arrowReader, schema, settings) } } !eos.get diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index 8f518bde..962417af 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -147,7 +147,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) exten val nextResult = lockClient(_.getNext(nextBatchParams)) eos.set(nextResult.isEos) if (!eos.get) { - val rowBatch = new RowBatch(nextResult, schema) + val rowBatch = new RowBatch(nextResult, schema, settings) offset += rowBatch.getReadRowCount rowBatch.close() rowBatchBlockingQueue.put(rowBatch) @@ -205,7 +205,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) exten val nextResult = lockClient(_.getNext(nextBatchParams)) eos.set(nextResult.isEos) if (!eos.get) { - rowBatch = new RowBatch(nextResult, schema) + rowBatch = new RowBatch(nextResult, schema, settings) } } hasNext = !eos.get diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index e21c6f25..2d47e4d4 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.commons.lang3.StringUtils import org.apache.doris.sdk.thrift.{TPrimitiveType, TScanColumnDesc} -import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD +import org.apache.doris.spark.cfg.ConfigurationOptions._ import org.apache.doris.spark.cfg.Settings import org.apache.doris.spark.exception.DorisException import org.apache.doris.spark.rest.RestService @@ -55,7 +55,9 @@ private[spark] object SchemaUtils { val hllColumns = schema.getProperties.filter(_.getType.equalsIgnoreCase("HLL")).map(_.getName).mkString(",") cfg.setProperty(DORIS_HLL_COLUMNS, hllColumns) val dorisReadField = cfg.getProperty(DORIS_READ_FIELD) - convertToStruct(schema, dorisReadField) + val datetimev2AsTimestampEnabled = cfg.getBooleanProperty(DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED, + DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED_DEFAULT) + convertToStruct(schema, dorisReadField, datetimev2AsTimestampEnabled) } /** @@ -74,7 +76,7 @@ private[spark] object SchemaUtils { * @param schema inner schema * @return Spark Catalyst StructType */ - def convertToStruct(schema: Schema, dorisReadFields: String): StructType = { + def convertToStruct(schema: Schema, dorisReadFields: String, datetimev2AsTimestampEnabled: Boolean): StructType = { val fieldList = if (dorisReadFields != null && dorisReadFields.nonEmpty) { dorisReadFields.split(",") } else { @@ -85,7 +87,7 @@ private[spark] object SchemaUtils { .map(f => DataTypes.createStructField( f.getName, - getCatalystType(f.getType, f.getPrecision, f.getScale), + getCatalystType(f.getType, f.getPrecision, f.getScale, datetimev2AsTimestampEnabled), true ) ) @@ -100,7 +102,7 @@ private[spark] object SchemaUtils { * @param scale decimal scale * @return Spark Catalyst type */ - def getCatalystType(dorisType: String, precision: Int, scale: Int): DataType = { + def getCatalystType(dorisType: String, precision: Int, scale: Int, datetimev2AsTimestampEnabled: Boolean = false): DataType = { dorisType match { case "NULL_TYPE" => DataTypes.NullType case "BOOLEAN" => DataTypes.BooleanType @@ -113,7 +115,7 @@ private[spark] object SchemaUtils { case "DATE" => DataTypes.DateType case "DATEV2" => DataTypes.DateType case "DATETIME" => DataTypes.StringType - case "DATETIMEV2" => DataTypes.StringType + case "DATETIMEV2" => if(datetimev2AsTimestampEnabled) DataTypes.TimestampType else DataTypes.StringType case "BINARY" => DataTypes.BinaryType case "DECIMAL" => DecimalType(precision, scale) case "CHAR" => DataTypes.StringType diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java index 20387e45..cc05cc55 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java @@ -58,6 +58,8 @@ import org.apache.doris.sdk.thrift.TScanBatchResult; import org.apache.doris.sdk.thrift.TStatus; import org.apache.doris.sdk.thrift.TStatusCode; +import org.apache.doris.spark.cfg.ConfigurationOptions; +import org.apache.doris.spark.cfg.PropertiesSettings; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.rest.RestService; import org.apache.doris.spark.rest.models.Schema; @@ -76,13 +78,11 @@ import java.math.BigInteger; import java.nio.charset.StandardCharsets; import java.sql.Date; +import java.sql.Timestamp; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.NoSuchElementException; +import java.util.*; import static org.hamcrest.core.StringStartsWith.startsWith; import static org.junit.Assert.assertEquals; @@ -1367,5 +1367,98 @@ public void timestampTypeNotMatch() throws IOException, DorisException { thrown.expectMessage(startsWith("Unsupported type for DATETIMEV2")); new RowBatch(scanBatchResult, schema); } + @Test + public void testDateTimeV2AsTimestamp() throws IOException, DorisException { + + ImmutableList.Builder childrenBuilder = ImmutableList.builder(); + childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, + null)), null)); + childrenBuilder.add(new Field("k2", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, + null)), null)); + + VectorSchemaRoot root = VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null), + new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( + root, + new DictionaryProvider.MapDictionaryProvider(), + outputStream); + + arrowStreamWriter.start(); + root.setRowCount(3); + + LocalDateTime localDateTime = LocalDateTime.of(2024, 11, 29, + 0, 0, 0, 123456000); + long second = localDateTime.atZone(ZoneId.systemDefault()).toEpochSecond(); + int nano = localDateTime.getNano(); + + FieldVector vector = root.getVector("k1"); + TimeStampMicroVector datetimeV2Vector = (TimeStampMicroVector) vector; + datetimeV2Vector.setInitialCapacity(3); + datetimeV2Vector.allocateNew(); + datetimeV2Vector.setIndexDefined(0); + datetimeV2Vector.setSafe(0, second); + datetimeV2Vector.setIndexDefined(1); + datetimeV2Vector.setSafe(1, second * 1000 + nano / 1000000); + datetimeV2Vector.setIndexDefined(2); + datetimeV2Vector.setSafe(2, second * 1000000 + nano / 1000); + vector.setValueCount(3); + + vector = root.getVector("k2"); + TimeStampMilliVector milliVector = (TimeStampMilliVector) vector; + milliVector.setIndexDefined(3); + milliVector.allocateNew(); + milliVector.setIndexDefined(0); + milliVector.setSafe(0, 1732809600000L); // 2024-11-29 00:00:00 + milliVector.setIndexDefined(1); + milliVector.setSafe(1, 1732809600123L); + milliVector.setIndexDefined(2); + milliVector.setSafe(2, 1732809600999L); + vector.setValueCount(3); + + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + + String schemaStr = "{\"properties\":[" + + "{\"type\":\"DATETIMEV2\",\"name\":\"k1\",\"comment\":\"\"}" + + ",{\"type\":\"DATETIMEV2\",\"name\":\"k2\",\"comment\":\"\"}" + + "], \"status\":200}"; + + Schema schema = RestService.parseSchema(schemaStr, logger); + Properties properties = new Properties(); + properties.setProperty(ConfigurationOptions.DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED, "true"); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, new PropertiesSettings(properties)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow0 = rowBatch.next(); + Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00"), actualRow0.get(0)); + Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00.0"), actualRow0.get(1)); + + + List actualRow1 = rowBatch.next(); + Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00.123"), actualRow1.get(0)); + Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00.123"), actualRow1.get(1)); + + List actualRow2 = rowBatch.next(); + Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00.123456"), actualRow2.get(0)); + Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00.999"), actualRow2.get(1)); + + Assert.assertFalse(rowBatch.hasNext()); + thrown.expect(NoSuchElementException.class); + thrown.expectMessage(startsWith("Get row offset:")); + rowBatch.next(); + + } } diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala index 5da7534b..e608d8a8 100644 --- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala +++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala @@ -37,14 +37,17 @@ class TestSchemaUtils extends ExpectedExceptionTest { schema.setStatus(200) val k1 = new Field("k1", "TINYINT", "", 0, 0, "") val k5 = new Field("k5", "BIGINT", "", 0, 0, "") + val k7 = new Field("k7", "DATETIMEV2", "", 0, 0, "") schema.put(k1) schema.put(k5) + schema.put(k7) var fields = List[StructField]() fields :+= DataTypes.createStructField("k1", DataTypes.ByteType, true) fields :+= DataTypes.createStructField("k5", DataTypes.LongType, true) + fields :+= DataTypes.createStructField("k7", DataTypes.TimestampType, true) val expected = DataTypes.createStructType(fields.asJava) - Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5")) + Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5,k6,k7", true)) } @Test @@ -60,6 +63,7 @@ class TestSchemaUtils extends ExpectedExceptionTest { Assert.assertEquals(DataTypes.DateType, SchemaUtils.getCatalystType("DATE", 0, 0)) Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("DATETIME", 0, 0)) Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("DATETIMEV2", 0, 0)) + Assert.assertEquals(DataTypes.TimestampType, SchemaUtils.getCatalystType("DATETIMEV2", 0, 0, true)) Assert.assertEquals(DataTypes.BinaryType, SchemaUtils.getCatalystType("BINARY", 0, 0)) Assert.assertEquals(DecimalType(9, 3), SchemaUtils.getCatalystType("DECIMAL", 9, 3)) Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("CHAR", 0, 0))