Skip to content

Commit

Permalink
support datetimev2 convert to timestamp (apache#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
dingyufei committed Dec 2, 2024
1 parent a9d5e25 commit cf5f765
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public interface ConfigurationOptions {
String TABLE_IDENTIFIER = "table.identifier";
String DORIS_TABLE_IDENTIFIER = "doris.table.identifier";
String DORIS_READ_FIELD = "doris.read.field";
// To convert the datetimev2 to timestamp
String DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED = "doris.read.datetimev2.as.timestamp.enabled";
boolean DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED_DEFAULT = false;
String DORIS_FILTER_QUERY = "doris.filter.query";
String DORIS_FILTER_QUERY_IN_MAX_COUNT = "doris.filter.query.in.max.count";
int DORIS_FILTER_QUERY_IN_VALUE_UPPER_LIMIT = 10000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.arrow.vector.types.Types;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.util.IPUtils;
Expand All @@ -59,6 +61,7 @@
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -83,6 +86,7 @@ public class RowBatch {
private final ArrowReader arrowReader;
private final Schema schema;
private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
private boolean datetimev2AsTimestampEnabled = false;

private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd HH:mm:ss")
Expand All @@ -104,7 +108,13 @@ public class RowBatch {
private List<FieldVector> fieldVectors;

public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException {
this(nextResult, schema, null);
}

public RowBatch(TScanBatchResult nextResult, Schema schema, Settings settings) throws DorisException {
if (settings != null) {
this.datetimev2AsTimestampEnabled = settings.getBooleanProperty(ConfigurationOptions.DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED);
}
this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
this.arrowReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator);
this.schema = schema;
Expand All @@ -123,8 +133,10 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisExceptio

}

public RowBatch(ArrowReader reader, Schema schema) throws DorisException {

public RowBatch(ArrowReader reader, Schema schema, Settings settings) throws DorisException {
if (settings != null) {
this.datetimev2AsTimestampEnabled = settings.getBooleanProperty(ConfigurationOptions.DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED);
}
this.arrowReader = reader;
this.schema = schema;

Expand Down Expand Up @@ -391,7 +403,11 @@ public void convertArrowToRowBatch() throws DorisException {
continue;
}
String value = new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
addValueToRow(rowIndex, value);
if (datetimev2AsTimestampEnabled) {
addValueToRow(rowIndex, Timestamp.valueOf(value));
} else {
addValueToRow(rowIndex, value);
}
}
} else if (curFieldVector instanceof TimeStampVector) {
TimeStampVector timeStampVector = (TimeStampVector) curFieldVector;
Expand All @@ -401,8 +417,13 @@ public void convertArrowToRowBatch() throws DorisException {
continue;
}
LocalDateTime dateTime = getDateTime(rowIndex, timeStampVector);
String formatted = DATE_TIME_FORMATTER.format(dateTime);
addValueToRow(rowIndex, formatted);
String formatted = dateTimeV2Formatter.format(dateTime);
if (datetimev2AsTimestampEnabled) {
addValueToRow(rowIndex, Timestamp.valueOf(formatted));
} else {
addValueToRow(rowIndex, formatted);
}

}
} else {
String errMsg = String.format("Unsupported type for DATETIMEV2, minorType %s, class is %s", mt.name(), curFieldVector.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class ScalaADBCValueReader(partition: PartitionDefinition, settings: Settings) e
if (!eos.get && (rowBatch == null || !rowBatch.hasNext)) {
eos.set(!arrowReader.loadNextBatch())
if (!eos.get) {
rowBatch = new RowBatch(arrowReader, schema)
rowBatch = new RowBatch(arrowReader, schema, settings)
}
}
!eos.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) exten
val nextResult = lockClient(_.getNext(nextBatchParams))
eos.set(nextResult.isEos)
if (!eos.get) {
val rowBatch = new RowBatch(nextResult, schema)
val rowBatch = new RowBatch(nextResult, schema, settings)
offset += rowBatch.getReadRowCount
rowBatch.close()
rowBatchBlockingQueue.put(rowBatch)
Expand Down Expand Up @@ -205,7 +205,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) exten
val nextResult = lockClient(_.getNext(nextBatchParams))
eos.set(nextResult.isEos)
if (!eos.get) {
rowBatch = new RowBatch(nextResult, schema)
rowBatch = new RowBatch(nextResult, schema, settings)
}
}
hasNext = !eos.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.commons.lang3.StringUtils
import org.apache.doris.sdk.thrift.{TPrimitiveType, TScanColumnDesc}
import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD
import org.apache.doris.spark.cfg.ConfigurationOptions._
import org.apache.doris.spark.cfg.Settings
import org.apache.doris.spark.exception.DorisException
import org.apache.doris.spark.rest.RestService
Expand Down Expand Up @@ -55,7 +55,9 @@ private[spark] object SchemaUtils {
val hllColumns = schema.getProperties.filter(_.getType.equalsIgnoreCase("HLL")).map(_.getName).mkString(",")
cfg.setProperty(DORIS_HLL_COLUMNS, hllColumns)
val dorisReadField = cfg.getProperty(DORIS_READ_FIELD)
convertToStruct(schema, dorisReadField)
val datetimev2AsTimestampEnabled = cfg.getBooleanProperty(DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED,
DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED_DEFAULT)
convertToStruct(schema, dorisReadField, datetimev2AsTimestampEnabled)
}

/**
Expand All @@ -74,7 +76,7 @@ private[spark] object SchemaUtils {
* @param schema inner schema
* @return Spark Catalyst StructType
*/
def convertToStruct(schema: Schema, dorisReadFields: String): StructType = {
def convertToStruct(schema: Schema, dorisReadFields: String, datetimev2AsTimestampEnabled: Boolean): StructType = {
val fieldList = if (dorisReadFields != null && dorisReadFields.nonEmpty) {
dorisReadFields.split(",")
} else {
Expand All @@ -85,7 +87,7 @@ private[spark] object SchemaUtils {
.map(f =>
DataTypes.createStructField(
f.getName,
getCatalystType(f.getType, f.getPrecision, f.getScale),
getCatalystType(f.getType, f.getPrecision, f.getScale, datetimev2AsTimestampEnabled),
true
)
)
Expand All @@ -100,7 +102,7 @@ private[spark] object SchemaUtils {
* @param scale decimal scale
* @return Spark Catalyst type
*/
def getCatalystType(dorisType: String, precision: Int, scale: Int): DataType = {
def getCatalystType(dorisType: String, precision: Int, scale: Int, datetimev2AsTimestampEnabled: Boolean = false): DataType = {
dorisType match {
case "NULL_TYPE" => DataTypes.NullType
case "BOOLEAN" => DataTypes.BooleanType
Expand All @@ -113,7 +115,7 @@ private[spark] object SchemaUtils {
case "DATE" => DataTypes.DateType
case "DATEV2" => DataTypes.DateType
case "DATETIME" => DataTypes.StringType
case "DATETIMEV2" => DataTypes.StringType
case "DATETIMEV2" => if(datetimev2AsTimestampEnabled) DataTypes.TimestampType else DataTypes.StringType
case "BINARY" => DataTypes.BinaryType
case "DECIMAL" => DecimalType(precision, scale)
case "CHAR" => DataTypes.StringType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.sdk.thrift.TStatus;
import org.apache.doris.sdk.thrift.TStatusCode;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.PropertiesSettings;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.rest.RestService;
import org.apache.doris.spark.rest.models.Schema;
Expand All @@ -76,13 +78,11 @@
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.*;

import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -1367,5 +1367,98 @@ public void timestampTypeNotMatch() throws IOException, DorisException {
thrown.expectMessage(startsWith("Unsupported type for DATETIMEV2"));
new RowBatch(scanBatchResult, schema);
}
@Test
public void testDateTimeV2AsTimestamp() throws IOException, DorisException {

ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND,
null)), null));
childrenBuilder.add(new Field("k2", FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND,
null)), null));

VectorSchemaRoot root = VectorSchemaRoot.create(
new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
new RootAllocator(Integer.MAX_VALUE));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
root,
new DictionaryProvider.MapDictionaryProvider(),
outputStream);

arrowStreamWriter.start();
root.setRowCount(3);

LocalDateTime localDateTime = LocalDateTime.of(2024, 11, 29,
0, 0, 0, 123456000);
long second = localDateTime.atZone(ZoneId.systemDefault()).toEpochSecond();
int nano = localDateTime.getNano();

FieldVector vector = root.getVector("k1");
TimeStampMicroVector datetimeV2Vector = (TimeStampMicroVector) vector;
datetimeV2Vector.setInitialCapacity(3);
datetimeV2Vector.allocateNew();
datetimeV2Vector.setIndexDefined(0);
datetimeV2Vector.setSafe(0, second);
datetimeV2Vector.setIndexDefined(1);
datetimeV2Vector.setSafe(1, second * 1000 + nano / 1000000);
datetimeV2Vector.setIndexDefined(2);
datetimeV2Vector.setSafe(2, second * 1000000 + nano / 1000);
vector.setValueCount(3);

vector = root.getVector("k2");
TimeStampMilliVector milliVector = (TimeStampMilliVector) vector;
milliVector.setIndexDefined(3);
milliVector.allocateNew();
milliVector.setIndexDefined(0);
milliVector.setSafe(0, 1732809600000L); // 2024-11-29 00:00:00
milliVector.setIndexDefined(1);
milliVector.setSafe(1, 1732809600123L);
milliVector.setIndexDefined(2);
milliVector.setSafe(2, 1732809600999L);
vector.setValueCount(3);

arrowStreamWriter.writeBatch();

arrowStreamWriter.end();
arrowStreamWriter.close();

TStatus status = new TStatus();
status.setStatusCode(TStatusCode.OK);
TScanBatchResult scanBatchResult = new TScanBatchResult();
scanBatchResult.setStatus(status);
scanBatchResult.setEos(false);
scanBatchResult.setRows(outputStream.toByteArray());


String schemaStr = "{\"properties\":[" +
"{\"type\":\"DATETIMEV2\",\"name\":\"k1\",\"comment\":\"\"}" +
",{\"type\":\"DATETIMEV2\",\"name\":\"k2\",\"comment\":\"\"}" +
"], \"status\":200}";

Schema schema = RestService.parseSchema(schemaStr, logger);
Properties properties = new Properties();
properties.setProperty(ConfigurationOptions.DORIS_READ_DATETIMEV2_AS_TIMESTAMP_ENABLED, "true");
RowBatch rowBatch = new RowBatch(scanBatchResult, schema, new PropertiesSettings(properties));

Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00"), actualRow0.get(0));
Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00.0"), actualRow0.get(1));


List<Object> actualRow1 = rowBatch.next();
Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00.123"), actualRow1.get(0));
Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00.123"), actualRow1.get(1));

List<Object> actualRow2 = rowBatch.next();
Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00.123456"), actualRow2.get(0));
Assert.assertEquals(Timestamp.valueOf("2024-11-29 00:00:00.999"), actualRow2.get(1));

Assert.assertFalse(rowBatch.hasNext());
thrown.expect(NoSuchElementException.class);
thrown.expectMessage(startsWith("Get row offset:"));
rowBatch.next();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@ class TestSchemaUtils extends ExpectedExceptionTest {
schema.setStatus(200)
val k1 = new Field("k1", "TINYINT", "", 0, 0, "")
val k5 = new Field("k5", "BIGINT", "", 0, 0, "")
val k7 = new Field("k7", "DATETIMEV2", "", 0, 0, "")
schema.put(k1)
schema.put(k5)
schema.put(k7)

var fields = List[StructField]()
fields :+= DataTypes.createStructField("k1", DataTypes.ByteType, true)
fields :+= DataTypes.createStructField("k5", DataTypes.LongType, true)
fields :+= DataTypes.createStructField("k7", DataTypes.TimestampType, true)
val expected = DataTypes.createStructType(fields.asJava)
Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5"))
Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5,k6,k7", true))
}

@Test
Expand All @@ -60,6 +63,7 @@ class TestSchemaUtils extends ExpectedExceptionTest {
Assert.assertEquals(DataTypes.DateType, SchemaUtils.getCatalystType("DATE", 0, 0))
Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("DATETIME", 0, 0))
Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("DATETIMEV2", 0, 0))
Assert.assertEquals(DataTypes.TimestampType, SchemaUtils.getCatalystType("DATETIMEV2", 0, 0, true))
Assert.assertEquals(DataTypes.BinaryType, SchemaUtils.getCatalystType("BINARY", 0, 0))
Assert.assertEquals(DecimalType(9, 3), SchemaUtils.getCatalystType("DECIMAL", 9, 3))
Assert.assertEquals(DataTypes.StringType, SchemaUtils.getCatalystType("CHAR", 0, 0))
Expand Down

0 comments on commit cf5f765

Please sign in to comment.