Skip to content

Commit

Permalink
[bugfix](iceberg)fix datetime conversion error and data path error (a…
Browse files Browse the repository at this point in the history
…pache#35708)

## Proposed changes
Issue apache#31442

<!--Describe your changes.-->

1. The unit of the seventh parameter of `ZonedDateTime.of` is
nanosecond, so we should multiply the microsecond by 1000.
2. When writing to a non-partitioned iceberg table, the data path has an
extra slash
  • Loading branch information
wuwenchi authored and seawinde committed Jun 5, 2024
1 parent 5d10564 commit 36720b2
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 9 deletions.
17 changes: 13 additions & 4 deletions be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,19 @@ std::shared_ptr<VIcebergPartitionWriter> VIcebergTableWriter::_create_partition_
}
const std::string& output_path = iceberg_table_sink.output_path;

auto write_path = fmt::format("{}/{}", output_path, partition_path);
auto original_write_path =
fmt::format("{}/{}", iceberg_table_sink.original_output_path, partition_path);
auto target_path = fmt::format("{}/{}", output_path, partition_path);
std::string write_path;
std::string original_write_path;
std::string target_path;
if (partition_path.empty()) {
original_write_path = iceberg_table_sink.original_output_path;
target_path = output_path;
write_path = output_path;
} else {
original_write_path =
fmt::format("{}/{}", iceberg_table_sink.original_output_path, partition_path);
target_path = fmt::format("{}/{}", output_path, partition_path);
write_path = fmt::format("{}/{}", output_path, partition_path);
}

VIcebergPartitionWriter::WriteInfo write_info = {
std::move(write_path), std::move(original_write_path), std::move(target_path),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -967,12 +967,25 @@ public static DateLiteral read(DataInput in) throws IOException {
}

public long unixTimestamp(TimeZone timeZone) {
ZonedDateTime zonedDateTime = ZonedDateTime.of((int) year, (int) month, (int) day, (int) hour,
(int) minute, (int) second, (int) microsecond, ZoneId.of(timeZone.getID()));
Timestamp timestamp = Timestamp.from(zonedDateTime.toInstant());
Timestamp timestamp = getTimestamp(timeZone);
return timestamp.getTime();
}

private Timestamp getTimestamp(TimeZone timeZone) {
ZonedDateTime zonedDateTime = ZonedDateTime.of((int) year, (int) month, (int) day, (int) hour,
(int) minute, (int) second, (int) microsecond * 1000, ZoneId.of(timeZone.getID()));
return Timestamp.from(zonedDateTime.toInstant());
}

public long getUnixTimestampWithMillisecond(TimeZone timeZone) {
return unixTimestamp(timeZone);
}

public long getUnixTimestampWithMicroseconds(TimeZone timeZone) {
Timestamp timestamp = getTimestamp(timeZone);
return timestamp.getTime() * 1000 + timestamp.getNanos() / 1000 % 1000;
}

public static boolean hasTimePart(String format) {
return format.chars().anyMatch(c -> TIME_PART_SET.contains((char) c));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public Integer initialValue() {
return 0;
}
};
static long MILLIS_TO_NANO_TIME = 1000;
// https://iceberg.apache.org/spec/#schemas-and-data-types
// All time and timestamp values are stored with microsecond precision
private static final int ICEBERG_DATETIME_SCALE_MS = 6;
Expand Down Expand Up @@ -320,7 +319,7 @@ public static Object extractDorisLiteral(org.apache.iceberg.types.Type icebergTy
case DATE:
return dateLiteral.getStringValue();
case TIMESTAMP:
return dateLiteral.unixTimestamp(TimeUtils.getTimeZone()) * MILLIS_TO_NANO_TIME;
return dateLiteral.getUnixTimestampWithMicroseconds(TimeUtils.getTimeZone());
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.junit.Assert;
import org.junit.Test;

import java.time.ZoneOffset;
import java.util.TimeZone;

public class DateLiteralTest {

@Test
Expand Down Expand Up @@ -414,4 +417,16 @@ public void testCheckRangeForDateV2() {
}
Assert.assertFalse(hasException);
}

@Test
public void testUnixTimestampWithMilliMicroSecond() throws AnalysisException {
String s = "2020-12-13 12:13:14.123456";
Type type = Type.DATETIMEV2;
DateLiteral literal = new DateLiteral(s, type);
long l = literal.getUnixTimestampWithMillisecond(TimeZone.getTimeZone(ZoneOffset.UTC));
Assert.assertEquals(123, l % 1000);

long l2 = literal.getUnixTimestampWithMicroseconds(TimeZone.getTimeZone(ZoneOffset.UTC));
Assert.assertEquals(123456, l2 % 1000000);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !qt01 --
1 2024-05-30T20:34:56
2 2024-05-30T20:34:56.100
3 2024-05-30T20:34:56.120
4 2024-05-30T20:34:56.123
5 2024-05-30T20:34:56.123400
6 2024-05-30T20:34:56.123450
7 2024-05-30T20:34:56.123456

-- !qt02 --
1 2024-05-30T20:34:56

-- !qt03 --
2 2024-05-30T20:34:56.100

-- !qt04 --
2 2024-05-30T20:34:56.100

-- !qt05 --
2 2024-05-30T20:34:56.100

-- !qt06 --
4 2024-05-30T20:34:56.123

-- !qt07 --
4 2024-05-30T20:34:56.123

-- !qt08 --
5 2024-05-30T20:34:56.123400

-- !qt09 --
7 2024-05-30T20:34:56.123456

-- !qt10 --
1 2024-05-30T20:34:56
2 2024-05-30T20:34:56.100

-- !qt11 --
4 2024-05-30T20:34:56.123
5 2024-05-30T20:34:56.123400
6 2024-05-30T20:34:56.123450
7 2024-05-30T20:34:56.123456

-- !qt12 --
1 2024-05-30T20:34:56
2 2024-05-30T20:34:56.100

-- !qt13 --
4 2024-05-30T20:34:56.123
5 2024-05-30T20:34:56.123400
6 2024-05-30T20:34:56.123450
7 2024-05-30T20:34:56.123456

Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_iceberg_filter", "p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enableIcebergTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String catalog_name = "test_iceberg_filter"

sql """drop catalog if exists ${catalog_name}"""
sql """CREATE CATALOG ${catalog_name} PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type'='rest',
'uri' = 'http://${externalEnvIp}:${rest_port}',
"s3.access_key" = "admin",
"s3.secret_key" = "password",
"s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
"s3.region" = "us-east-1"
);"""

sql """ switch ${catalog_name} """
sql """ create database if not exists ${catalog_name} """
sql """ use ${catalog_name} """

String tb_ts_filter = "tb_ts_filter";
sql """ drop table if exists ${tb_ts_filter} """
sql """ create table ${tb_ts_filter} (id int, ts datetime)"""
sql """ insert into ${tb_ts_filter} values (1, '2024-05-30 20:34:56') """
sql """ insert into ${tb_ts_filter} values (2, '2024-05-30 20:34:56.1') """
sql """ insert into ${tb_ts_filter} values (3, '2024-05-30 20:34:56.12') """
sql """ insert into ${tb_ts_filter} values (4, '2024-05-30 20:34:56.123') """
sql """ insert into ${tb_ts_filter} values (5, '2024-05-30 20:34:56.1234') """
sql """ insert into ${tb_ts_filter} values (6, '2024-05-30 20:34:56.12345') """
sql """ insert into ${tb_ts_filter} values (7, '2024-05-30 20:34:56.123456') """

qt_qt01 """ select * from ${tb_ts_filter} order by id """
qt_qt02 """ select * from ${tb_ts_filter} where ts = '2024-05-30 20:34:56' order by id """
qt_qt03 """ select * from ${tb_ts_filter} where ts = '2024-05-30 20:34:56.1' order by id """
qt_qt04 """ select * from ${tb_ts_filter} where ts = '2024-05-30 20:34:56.10' order by id """
qt_qt05 """ select * from ${tb_ts_filter} where ts = '2024-05-30 20:34:56.100' order by id """
qt_qt06 """ select * from ${tb_ts_filter} where ts = '2024-05-30 20:34:56.123' order by id """
qt_qt07 """ select * from ${tb_ts_filter} where ts = '2024-05-30 20:34:56.1230' order by id """
qt_qt08 """ select * from ${tb_ts_filter} where ts = '2024-05-30 20:34:56.123400' order by id """
qt_qt09 """ select * from ${tb_ts_filter} where ts = '2024-05-30 20:34:56.123456' order by id """

qt_qt10 """ select * from ${tb_ts_filter} where ts < '2024-05-30 20:34:56.12' order by id """
qt_qt11 """ select * from ${tb_ts_filter} where ts > '2024-05-30 20:34:56.12' order by id """
qt_qt12 """ select * from ${tb_ts_filter} where ts < '2024-05-30 20:34:56.1200' order by id """
qt_qt13 """ select * from ${tb_ts_filter} where ts > '2024-05-30 20:34:56.1200' order by id """

// TODO support filter
// explain {
// sql("select * from ${tb_ts_filter} where ts < '2024-05-30 20:34:56'")
// contains "inputSplitNum=0"
// }
// explain {
// sql("select * from ${tb_ts_filter} where ts < '2024-05-30 20:34:56.12'")
// contains "inputSplitNum=1"
// }
// explain {
// sql("select * from ${tb_ts_filter} where ts > '2024-05-30 20:34:56.1234'")
// contains "inputSplitNum=2"
// }
// explain {
// sql("select * from ${tb_ts_filter} where ts > '2024-05-30 20:34:56.0'")
// contains "inputSplitNum=1"
// }
// explain {
// sql("select * from ${tb_ts_filter} where ts = '2024-05-30 20:34:56.123456'")
// contains "inputSplitNum=1"
// }
// explain {
// sql("select * from ${tb_ts_filter} where ts < '2024-05-30 20:34:56.123456'")
// contains "inputSplitNum=5"
// }
// explain {
// sql("select * from ${tb_ts_filter} where ts > '2024-05-30 20:34:56.123456'")
// contains "inputSplitNum=0"
// }

} finally {
}
}
}

0 comments on commit 36720b2

Please sign in to comment.