Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): enhance parquet file source #19221

Merged
merged 13 commits into from
Nov 12, 2024
63 changes: 50 additions & 13 deletions e2e_test/s3/fs_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ def gen_data(file_num, item_num_per_file):
'test_bytea': pa.scalar(b'\xDe00BeEf', type=pa.binary()),
'test_date': pa.scalar(datetime.now().date(), type=pa.date32()),
'test_time': pa.scalar(datetime.now().time(), type=pa.time64('us')),
'test_timestamp': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamptz': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('us', tz='+00:00')),
'test_timestamp_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s')),
'test_timestamp_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms')),
'test_timestamp_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us')),
'test_timestamp_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns')),
'test_timestamptz_s': pa.scalar(datetime.now().timestamp(), type=pa.timestamp('s', tz='+00:00')),
'test_timestamptz_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms', tz='+00:00')),
'test_timestamptz_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us', tz='+00:00')),
'test_timestamptz_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns', tz='+00:00')),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]
Expand Down Expand Up @@ -60,8 +66,15 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp,
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
test_timestamptz_ns timestamptz

) WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -128,8 +141,14 @@ def _table():
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns,
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
test_timestamptz_ns
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand All @@ -147,7 +166,7 @@ def _table():
print('Sink into s3 in parquet encode...')
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE test_parquet_sink_table(
id bigint primary key,
id bigint primary key,\
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
name TEXT,
sex bigint,
mark bigint,
Expand All @@ -158,8 +177,14 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp,
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
test_timestamptz_ns timestamptz
) WITH (
connector = 's3',
match_pattern = 'test_parquet_sink/*.parquet',
Expand Down Expand Up @@ -196,8 +221,14 @@ def _table():
test_bytea,
test_date,
test_time,
test_timestamp,
test_timestamptz
test_timestamp_s,
test_timestamp_ms,
test_timestamp_us,
test_timestamp_ns,
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
test_timestamptz_ns
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
Expand Down Expand Up @@ -226,8 +257,14 @@ def _table():
test_bytea bytea,
test_date date,
test_time time,
test_timestamp timestamp,
test_timestamptz timestamptz,
test_timestamp_s timestamp,
test_timestamp_ms timestamp,
test_timestamp_us timestamp,
test_timestamp_ns timestamp,
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
test_timestamptz_ns timestamptz
) WITH (
connector = 's3',
match_pattern = 'test_json_sink/*.json',
Expand Down
Loading