From cf82c2aa1d8aa4229a41503f7a0dbdc6e6e423e8 Mon Sep 17 00:00:00 2001 From: Niklas Schmidtmer Date: Thu, 22 Dec 2022 10:47:14 +0100 Subject: [PATCH 1/7] Adjust data retention reallocate strategy to use CrateDB 5.2 node attributes --- dags/data_retention_reallocate_dag.py | 15 ++++----------- include/data_retention_reallocate_tracking.sql | 1 - ..._retention_retrieve_reallocate_policies.sql | 18 ++++++++++++------ setup/data_retention_schema.sql | 9 --------- 4 files changed, 16 insertions(+), 27 deletions(-) delete mode 100644 include/data_retention_reallocate_tracking.sql diff --git a/dags/data_retention_reallocate_dag.py b/dags/data_retention_reallocate_dag.py index 3173df7..a97540a 100644 --- a/dags/data_retention_reallocate_dag.py +++ b/dags/data_retention_reallocate_dag.py @@ -5,8 +5,9 @@ Prerequisites ------------- -In CrateDB, tables for storing retention policies need to be created once manually. -See the file setup/data_retention_schema.sql in this repository. +- CrateDB 5.2.0 or later +- Tables for storing retention policies need to be created once manually in + CrateDB. See the file setup/data_retention_schema.sql in this repository. """ from pathlib import Path import pendulum @@ -58,18 +59,10 @@ def generate_sql_reallocate(policy): def data_retention_reallocate(): policies = map_policy.expand(policy=get_policies()) - reallocate = SQLExecuteQueryOperator.partial( + SQLExecuteQueryOperator.partial( task_id="reallocate_partitions", conn_id="cratedb_connection", ).expand(sql=generate_sql_reallocate.expand(policy=policies)) - track = SQLExecuteQueryOperator.partial( - task_id="add_tracking_information", - conn_id="cratedb_connection", - sql="data_retention_reallocate_tracking.sql", - ).expand(parameters=policies) - - reallocate >> track - data_retention_reallocate() diff --git a/include/data_retention_reallocate_tracking.sql b/include/data_retention_reallocate_tracking.sql deleted file mode 100644 index c10d363..0000000 --- a/include/data_retention_reallocate_tracking.sql +++ /dev/null @@ -1 +0,0 @@ -INSERT INTO doc.retention_policy_tracking VALUES (%(schema)s, %(table)s, 'reallocate', %(value)s) ON CONFLICT (table_name, table_schema, strategy) DO UPDATE SET last_partition_value = excluded.last_partition_value; diff --git a/include/data_retention_retrieve_reallocate_policies.sql b/include/data_retention_retrieve_reallocate_policies.sql index 938b9a0..08f61bd 100644 --- a/include/data_retention_retrieve_reallocate_policies.sql +++ b/include/data_retention_retrieve_reallocate_policies.sql @@ -1,3 +1,11 @@ +WITH partition_allocations AS ( + SELECT DISTINCT s.schema_name AS table_schema, + s.table_name, + s.partition_ident, + n.attributes + FROM sys.shards s + JOIN sys.nodes n ON s.node['id'] = n.id +) SELECT QUOTE_IDENT(p.table_schema), QUOTE_IDENT(p.table_name), QUOTE_IDENT(p.table_schema) || '.' || QUOTE_IDENT(p.table_name), @@ -9,11 +17,9 @@ FROM information_schema.table_partitions p JOIN doc.retention_policies r ON p.table_schema = r.table_schema AND p.table_name = r.table_name AND p.values[r.partition_column] < %(day)s::TIMESTAMP - (r.retention_period || ' days')::INTERVAL -LEFT JOIN doc.retention_policy_tracking t ON t.table_schema = p.table_schema - AND t.table_name = p.table_name +JOIN partition_allocations a ON a.table_schema = p.table_schema + AND a.table_name = p.table_name + AND p.partition_ident = a.partition_ident + AND attributes[r.reallocation_attribute_name] <> r.reallocation_attribute_value WHERE r.strategy = 'reallocate' - AND ( - t.last_partition_value IS NULL - OR p.values[r.partition_column] > t.last_partition_value - ) ORDER BY 5 ASC; diff --git a/setup/data_retention_schema.sql b/setup/data_retention_schema.sql index 9a61c93..9fab393 100644 --- a/setup/data_retention_schema.sql +++ b/setup/data_retention_schema.sql @@ -10,12 +10,3 @@ CREATE TABLE IF NOT EXISTS doc.retention_policies ( PRIMARY KEY ("table_schema", "table_name", "strategy") ) CLUSTERED INTO 1 SHARDS; - -CREATE TABLE IF NOT EXISTS doc.retention_policy_tracking ( - "table_schema" TEXT, - "table_name" TEXT, - "strategy" TEXT, - "last_partition_value" TIMESTAMP WITH TIME ZONE NOT NULL, - PRIMARY KEY (table_schema, table_name, strategy) -) -CLUSTERED INTO 1 SHARDS; From 8e048d6852b9deb7e5d079188a751faf922ea1a1 Mon Sep 17 00:00:00 2001 From: Niklas Schmidtmer Date: Thu, 22 Dec 2022 21:36:55 +0100 Subject: [PATCH 2/7] Update development dependencies --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f8a869f..467038a 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ python_requires=">=3.9", install_requires=["apache-airflow==2.5.0"], extras_require={ - "develop": ["pylint==2.15.7", "black==22.10.0"], + "develop": ["pylint==2.15.9", "black==22.12.0"], "testing": ["pytest==7.2.0"], }, ) From ec7355bb69a8b7b05fe5dee43ed363740bd6185b Mon Sep 17 00:00:00 2001 From: Niklas Schmidtmer Date: Fri, 23 Dec 2022 14:58:31 +0100 Subject: [PATCH 3/7] Simplify data retention DAGs by using dynamic task mapping on the `params` parameter This is a new upstream improvement that required a workaround earlier: https://github.com/apache/airflow/issues/24014 --- dags/data_retention_delete_dag.py | 26 ++++++++++---------------- dags/data_retention_reallocate_dag.py | 22 +++++++--------------- dags/data_retention_snapshot_dag.py | 25 +++++++++---------------- include/data_retention_delete.sql | 1 - include/data_retention_reallocate.sql | 1 - include/data_retention_snapshot.sql | 1 - 6 files changed, 26 insertions(+), 50 deletions(-) delete mode 100644 include/data_retention_delete.sql delete mode 100644 include/data_retention_reallocate.sql delete mode 100644 include/data_retention_snapshot.sql diff --git a/dags/data_retention_delete_dag.py b/dags/data_retention_delete_dag.py index db103cd..2342cec 100644 --- a/dags/data_retention_delete_dag.py +++ b/dags/data_retention_delete_dag.py @@ -15,18 +15,12 @@ from airflow.decorators import dag, task -@task -def generate_sql(policy): - """Generate DROP statment for a given partition""" - return ( - Path("include/data_retention_delete.sql") - .read_text(encoding="utf-8") - .format( - table_fqn=policy[0], - column=policy[1], - value=policy[2], - ) - ) +def map_policy(policy): + return { + "table_fqn": policy[0], + "column": policy[1], + "value": policy[2], + } @task @@ -35,7 +29,8 @@ def get_policies(ds=None): pg_hook = PostgresHook(postgres_conn_id="cratedb_connection") sql = Path("include/data_retention_retrieve_delete_policies.sql") return pg_hook.get_records( - sql=sql.read_text(encoding="utf-8"), parameters={"day": ds} + sql=sql.read_text(encoding="utf-8"), + parameters={"day": ds}, ) @@ -45,12 +40,11 @@ def get_policies(ds=None): catchup=False, ) def data_retention_delete(): - sql_statements = generate_sql.expand(policy=get_policies()) - SQLExecuteQueryOperator.partial( task_id="delete_partition", conn_id="cratedb_connection", - ).expand(sql=sql_statements) + sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};", + ).expand(params=get_policies().map(map_policy)) data_retention_delete() diff --git a/dags/data_retention_reallocate_dag.py b/dags/data_retention_reallocate_dag.py index a97540a..476e451 100644 --- a/dags/data_retention_reallocate_dag.py +++ b/dags/data_retention_reallocate_dag.py @@ -22,11 +22,11 @@ def get_policies(ds=None): pg_hook = PostgresHook(postgres_conn_id="cratedb_connection") sql = Path("include/data_retention_retrieve_reallocate_policies.sql") return pg_hook.get_records( - sql=sql.read_text(encoding="utf-8"), parameters={"day": ds} + sql=sql.read_text(encoding="utf-8"), + parameters={"day": ds}, ) -@task def map_policy(policy): """Map index-based policy to readable dict structure""" return { @@ -40,16 +40,6 @@ def map_policy(policy): } -@task -def generate_sql_reallocate(policy): - """Generate SQL for reallocation""" - return ( - Path("include/data_retention_reallocate.sql") - .read_text(encoding="utf-8") - .format(**policy) - ) - - @dag( start_date=pendulum.datetime(2021, 11, 19, tz="UTC"), schedule="@daily", @@ -57,12 +47,14 @@ def generate_sql_reallocate(policy): template_searchpath=["include"], ) def data_retention_reallocate(): - policies = map_policy.expand(policy=get_policies()) - SQLExecuteQueryOperator.partial( task_id="reallocate_partitions", conn_id="cratedb_connection", - ).expand(sql=generate_sql_reallocate.expand(policy=policies)) + sql=""" + ALTER TABLE {{params.table_fqn}} PARTITION ({{params.column}} = {{params.value}}) + SET ("routing.allocation.require.{{params.attribute_name}}" = '{{params.attribute_value}}'); + """, + ).expand(params=get_policies().map(map_policy)) data_retention_reallocate() diff --git a/dags/data_retention_snapshot_dag.py b/dags/data_retention_snapshot_dag.py index 9d1f572..b447ced 100644 --- a/dags/data_retention_snapshot_dag.py +++ b/dags/data_retention_snapshot_dag.py @@ -25,7 +25,6 @@ def get_policies(ds=None): ) -@task def map_policy(policy): """Map index-based policy to readable dict structure""" return { @@ -38,35 +37,29 @@ def map_policy(policy): } -@task -def generate_sql(query_file, policy): - """Generate SQL statment for a given partition""" - return Path(query_file).read_text(encoding="utf-8").format(**policy) - - @dag( start_date=pendulum.datetime(2021, 11, 19, tz="UTC"), schedule="@daily", catchup=False, ) def data_retention_snapshot(): - policies = map_policy.expand(policy=get_policies()) - sql_statements_snapshot = generate_sql.partial( - query_file="include/data_retention_snapshot.sql" - ).expand(policy=policies) - sql_statements_delete = generate_sql.partial( - query_file="include/data_retention_delete.sql" - ).expand(policy=policies) + policies = get_policies().map(map_policy) reallocate = SQLExecuteQueryOperator.partial( task_id="snapshot_partitions", conn_id="cratedb_connection", - ).expand(sql=sql_statements_snapshot) + sql=""" + CREATE SNAPSHOT {{params.target_repository_name}}."{{params.schema}}.{{params.table}}-{{params.value}}" + TABLE {{params.table_fqn}} PARTITION ({{params.column}} = {{params.value}}) + WITH ("wait_for_completion" = true); + """, + ).expand(params=policies) delete = SQLExecuteQueryOperator.partial( task_id="delete_partitions", conn_id="cratedb_connection", - ).expand(sql=sql_statements_delete) + sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};", + ).expand(params=policies) reallocate >> delete diff --git a/include/data_retention_delete.sql b/include/data_retention_delete.sql deleted file mode 100644 index d9f4005..0000000 --- a/include/data_retention_delete.sql +++ /dev/null @@ -1 +0,0 @@ -DELETE FROM {table_fqn} WHERE {column} = {value}; diff --git a/include/data_retention_reallocate.sql b/include/data_retention_reallocate.sql deleted file mode 100644 index 500de4f..0000000 --- a/include/data_retention_reallocate.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE {table_fqn} PARTITION ({column} = {value}) SET ("routing.allocation.require.{attribute_name}" = '{attribute_value}'); diff --git a/include/data_retention_snapshot.sql b/include/data_retention_snapshot.sql deleted file mode 100644 index 69992ed..0000000 --- a/include/data_retention_snapshot.sql +++ /dev/null @@ -1 +0,0 @@ -CREATE SNAPSHOT {target_repository_name}."{schema}.{table}-{value}" TABLE {table_fqn} PARTITION ({column} = {value}) WITH ("wait_for_completion" = true); From 2adc5182abc3cd8918df7e8871efbb48b3d70b13 Mon Sep 17 00:00:00 2001 From: Niklas Schmidtmer Date: Tue, 27 Dec 2022 11:35:12 +0100 Subject: [PATCH 4/7] Simplify bash commands by using `env` parameter --- dags/nyc_taxi_dag.py | 58 ++++++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/dags/nyc_taxi_dag.py b/dags/nyc_taxi_dag.py index 8ca83c0..5ab9aee 100644 --- a/dags/nyc_taxi_dag.py +++ b/dags/nyc_taxi_dag.py @@ -4,7 +4,7 @@ here https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page the data is also available in their public S3 Bucket s3://nyc-tlc/trip data/ or through the CDN described in the website above. -A detailed tutorial is available at TBD +A detailed tutorial is available at https://community.crate.io/t/tutorial-how-to-automate-the-import-of-parquet-files-using-airflow/1247 Prerequisites ------------- @@ -14,38 +14,44 @@ In the CrateDB schema "nyc_taxi", the tables "load_trips_staging" and "trips" need to be present before running the DAG. You can retrieve the CREATE TABLE statements from the file setup/taxi-schema.sql in this repository. - """ - -from pathlib import Path import pendulum from airflow.models import Variable from airflow.decorators import task, dag from airflow.models.baseoperator import chain - from airflow.operators.bash import BashOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.providers.amazon.aws.transfers.local_to_s3 import ( LocalFilesystemToS3Operator, ) +# The URL of the directory containing the Parquet files ORIGIN_PATH = Variable.get("ORIGIN_PATH", "test-path") +# Any local directory to which the Parquet files are temporarily downloaded to, such as /tmp DESTINATION_PATH = Variable.get("DESTINATION_PATH", "test-path") -S3_KEY = Variable.get("S3_KEY", "test-key") +# The name of an S3 bucket to which CSV files are temporarily uploaded to S3_BUCKET = Variable.get("S3_BUCKET", "test-bucket") +# AWS Access Key ID ACCESS_KEY_ID = Variable.get("ACCESS_KEY_ID", "access-key") +# AWS Secret Access Key SECRET_ACCESS_KEY = Variable.get("SECRET_ACCESS_KEY", "secret-key") +# Append trailing slash if missing +DESTINATION_PATH = ( + DESTINATION_PATH + "/" if not DESTINATION_PATH.endswith("/") else DESTINATION_PATH +) + # The configuration of the DAG was done based on the info shared by NYC TLC here: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page -# The documentation mentioned that the Parquet files are released @monthly since January 2009 +# The documentation mentioned that the Parquet files are released monthly since January 2009 @dag( dag_id="nyc-taxi-parquet", schedule="@monthly", start_date=pendulum.datetime(2009, 3, 1, tz="UTC"), catchup=True, + template_searchpath=["include"], ) def taskflow(): - @task(task_id="format_file_name") + @task def format_file_name(ds=None): # The files are released with 2 months of delay, therefore the .subtract(months=2) timestamp = pendulum.parse(ds) @@ -58,29 +64,31 @@ def format_file_name(ds=None): process_parquet = BashOperator( task_id="process_parquet", bash_command=""" - curl -o "${{params.DESTINATION_PATH}}{{ task_instance.xcom_pull(task_ids='format_file_name')}}.parquet" {{params.ORIGIN_PATH}}{{ task_instance.xcom_pull(task_ids='format_file_name')}}.parquet && - parquet-tools csv "${{params.DESTINATION_PATH}}{{ task_instance.xcom_pull(task_ids='format_file_name')}}.parquet" > "${{params.DESTINATION_PATH}}{{ task_instance.xcom_pull(task_ids='format_file_name')}}.csv" - """, - params={ - "ORIGIN_PATH": ORIGIN_PATH, - "DESTINATION_PATH": DESTINATION_PATH, + curl -o "${destination_path}${formatted_file_date}.parquet" "${origin_path}${formatted_file_date}.parquet" && + parquet-tools csv "${destination_path}${formatted_file_date}.parquet" > "${destination_path}${formatted_file_date}.csv" + """, + env={ + "origin_path": ORIGIN_PATH, + "destination_path": DESTINATION_PATH, + "formatted_file_date": formatted_file_date, }, ) copy_csv_to_s3 = LocalFilesystemToS3Operator( task_id="copy_csv_to_s3", filename=f"{DESTINATION_PATH}{formatted_file_date}.csv", - dest_key=f"{S3_KEY}{formatted_file_date}.csv", + dest_bucket=S3_BUCKET, + dest_key=f"{formatted_file_date}.csv", aws_conn_id="s3_conn", replace=True, ) copy_csv_staging = SQLExecuteQueryOperator( task_id="copy_csv_staging", - conn_id="cratedb_demo_connection", + conn_id="cratedb_connection", sql=f""" COPY nyc_taxi.load_trips_staging - FROM 's3://{ACCESS_KEY_ID}:{SECRET_ACCESS_KEY}@{S3_BUCKET}{formatted_file_date}.csv' + FROM 's3://{ACCESS_KEY_ID}:{SECRET_ACCESS_KEY}@{S3_BUCKET}/{formatted_file_date}.csv' WITH (format = 'csv', empty_string_as_null = true) RETURN SUMMARY; """, @@ -88,22 +96,26 @@ def format_file_name(ds=None): copy_staging_to_trips = SQLExecuteQueryOperator( task_id="copy_staging_to_trips", - conn_id="cratedb_demo_connection", - sql=Path("include/taxi-insert.sql").read_text(encoding="utf-8"), + conn_id="cratedb_connection", + sql="taxi-insert.sql", ) delete_staging = SQLExecuteQueryOperator( task_id="delete_staging", - conn_id="cratedb_demo_connection", + conn_id="cratedb_connection", sql="DELETE FROM nyc_taxi.load_trips_staging;", ) delete_local_parquet_csv = BashOperator( task_id="delete_local_parquet_csv", bash_command=""" - rm "${{params.DESTINATION_PATH}}{{ task_instance.xcom_pull(task_ids='format_file_name')}}.parquet" "${{params.DESTINATION_PATH}}{{ task_instance.xcom_pull(task_ids='format_file_name')}}.csv" - """, - params={"DESTINATION_PATH": DESTINATION_PATH}, + rm "${destination_path}${formatted_file_date}.parquet"; + rm "${destination_path}${formatted_file_date}.csv" + """, + env={ + "destination_path": DESTINATION_PATH, + "formatted_file_date": formatted_file_date, + }, ) chain( From 337ef74a201f5a3ab58b0e930cec4b978a618741 Mon Sep 17 00:00:00 2001 From: Niklas Schmidtmer Date: Tue, 27 Dec 2022 11:42:58 +0100 Subject: [PATCH 5/7] Use dynamic task mapping for SQL generation in Data Quality DAG --- dags/data_quality_checks_dag.py | 63 ++++++++++++++++----------------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/dags/data_quality_checks_dag.py b/dags/data_quality_checks_dag.py index 91ee282..1494712 100644 --- a/dags/data_quality_checks_dag.py +++ b/dags/data_quality_checks_dag.py @@ -20,7 +20,6 @@ ACCESS_KEY_ID= SECRET_ACCESS_KEY= """ - import os import pendulum from airflow.decorators import dag, task, task_group @@ -57,13 +56,15 @@ def slack_failure_notification(context): log_url = context.get("task_instance").log_url slack_msg = f""" :red_circle: Task Failed. - *Task*: {task_id} - *Dag*: {dag_id} + *Task*: {task_id} + *DAG*: {dag_id} *Execution Time*: {exec_date} - *Log Url*: {log_url} + *Log URL*: {log_url} """ failed_alert = SlackWebhookOperator( - task_id="slack_notification", http_conn_id="slack_webhook", message=slack_msg + task_id="slack_notification", + http_conn_id="slack_webhook", + message=slack_msg, ) return failed_alert.execute(context=context) @@ -78,22 +79,21 @@ def get_files_from_s3(bucket, prefix_value): return list(filter(lambda element: element.endswith(".csv"), paths)) -@task -def get_import_statements(files): - statements = [] - for path in files: - sql = f""" - COPY {TEMP_TABLE} FROM 's3://{ACCESS_KEY_ID}:{SECRET_ACCESS_KEY}@{S3_BUCKET}/{path}' WITH (format='csv'); - """ - statements.append(sql) - return statements - - @task def list_local_files(directory): return list(filter(lambda entry: entry.endswith(".csv"), os.listdir(directory))) +def copy_file_kwargs(file): + return { + "temp_table": TEMP_TABLE, + "access_key_id": ACCESS_KEY_ID, + "secret_access_key": SECRET_ACCESS_KEY, + "s3_bucket": S3_BUCKET, + "path": file, + } + + def upload_kwargs(file): return { "filename": f"{FILE_DIR}/{file}", @@ -181,22 +181,23 @@ def move_incoming_files(s3_files): def data_quality_checks(): upload = upload_local_files() s3_files = get_files_from_s3(S3_BUCKET, INCOMING_DATA_PREFIX) - import_stmt = get_import_statements(s3_files) + # pylint: disable=E1101 import_data = SQLExecuteQueryOperator.partial( task_id="import_data_to_cratedb", conn_id="cratedb_connection", - ).expand(sql=import_stmt) + sql=""" + COPY {{params.temp_table}} + FROM 's3://{{params.acces_key_id}}:{{params.secret_access_key}}@{{params.s3_bucket}}/{{params.path}}' + WITH (format = 'csv'); + """, + ).expand(params=s3_files.map(upload_kwargs)) refresh = SQLExecuteQueryOperator( task_id="refresh_table", conn_id="cratedb_connection", - sql=""" - REFRESH TABLE {{params.temp_table}}; - """, - params={ - "temp_table": TEMP_TABLE, - }, + sql="REFRESH TABLE {{params.temp_table}};", + params={"temp_table": TEMP_TABLE}, ) checks = home_data_checks() @@ -204,18 +205,17 @@ def data_quality_checks(): move_data = SQLExecuteQueryOperator( task_id="move_to_table", conn_id="cratedb_connection", - sql=""" - INSERT INTO {{params.table}} SELECT * FROM {{params.temp_table}}; - """, - params={"table": TABLE, "temp_table": TEMP_TABLE}, + sql="INSERT INTO {{params.table}} SELECT * FROM {{params.temp_table}};", + params={ + "table": TABLE, + "temp_table": TEMP_TABLE, + }, ) delete_data = SQLExecuteQueryOperator( task_id="delete_from_temp_table", conn_id="cratedb_connection", - sql=""" - DELETE FROM {{params.temp_table}}; - """, + sql="DELETE FROM {{params.temp_table}};", params={"temp_table": TEMP_TABLE}, trigger_rule="all_done", ) @@ -231,7 +231,6 @@ def data_quality_checks(): chain( upload, s3_files, - import_stmt, import_data, refresh, checks, From a8fe45c8bb868145ddabdc5bc132b01acdbeaa4d Mon Sep 17 00:00:00 2001 From: Niklas Schmidtmer Date: Tue, 27 Dec 2022 11:44:29 +0100 Subject: [PATCH 6/7] Define a primary key on all tables (except for staging tables) --- setup/smart_home_data.sql | 6 ++++-- setup/taxi-schema.sql | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/setup/smart_home_data.sql b/setup/smart_home_data.sql index 3474e62..93a735b 100644 --- a/setup/smart_home_data.sql +++ b/setup/smart_home_data.sql @@ -14,7 +14,8 @@ CREATE TABLE IF NOT EXISTS "iot"."smart_home_data" ( "microwave" DOUBLE PRECISION, "living_room" DOUBLE PRECISION, "temperature" DOUBLE PRECISION, - "humidity" DOUBLE PRECISION + "humidity" DOUBLE PRECISION, + PRIMARY KEY ("time") ); CREATE TABLE IF NOT EXISTS "iot"."smart_home_data_temp" ( @@ -33,5 +34,6 @@ CREATE TABLE IF NOT EXISTS "iot"."smart_home_data_temp" ( "microwave" DOUBLE PRECISION, "living_room" DOUBLE PRECISION, "temperature" DOUBLE PRECISION, - "humidity" DOUBLE PRECISION + "humidity" DOUBLE PRECISION, + PRIMARY KEY ("time") ); \ No newline at end of file diff --git a/setup/taxi-schema.sql b/setup/taxi-schema.sql index 36c3b03..13e33b8 100644 --- a/setup/taxi-schema.sql +++ b/setup/taxi-schema.sql @@ -48,6 +48,7 @@ CREATE TABLE IF NOT EXISTS "nyc_taxi"."trips" ( "trip_type" INTEGER, "pickup_location_id" INTEGER, "dropoff_location_id" INTEGER, - "airport_fee" DOUBLE PRECISION + "airport_fee" DOUBLE PRECISION, + PRIMARY KEY ("id") ) PARTITIONED BY ("pickup_year"); From b0dcd0e2d102ab2cf57527c8f110968280a8e255 Mon Sep 17 00:00:00 2001 From: Niklas Schmidtmer Date: Tue, 27 Dec 2022 11:44:52 +0100 Subject: [PATCH 7/7] Add explicit `pendulum` dependency --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 08e788b..27dc9f3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ beautifulsoup4==4.11.1 requests>=2.28.0 yfinance==0.1.87 parquet-tools +pendulum>=2,<3