From df12db62f05b9bed3f070c4ebfadfff7e3fa6b91 Mon Sep 17 00:00:00 2001 From: Rachel Daniel Date: Wed, 21 Dec 2022 14:24:56 -0600 Subject: [PATCH 1/2] Support partial parsing --- dbt_server/services/dbt_service.py | 3 ++- dbt_server/services/filesystem_service.py | 17 ++++++++++++++++- dbt_server/state.py | 4 +++- dbt_server/views.py | 4 +++- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/dbt_server/services/dbt_service.py b/dbt_server/services/dbt_service.py index 11108ed..fc47155 100644 --- a/dbt_server/services/dbt_service.py +++ b/dbt_server/services/dbt_service.py @@ -151,9 +151,10 @@ def parse_to_manifest(project_path, args): @tracer.wrap -def serialize_manifest(manifest, serialize_path): +def serialize_manifest(manifest, serialize_path, partial_parse_path): manifest_msgpack = dbt_serialize_manifest(manifest) filesystem_service.write_file(serialize_path, manifest_msgpack) + filesystem_service.write_file(partial_parse_path, manifest_msgpack) @tracer.wrap diff --git a/dbt_server/services/filesystem_service.py b/dbt_server/services/filesystem_service.py index 587a991..cf32cd5 100644 --- a/dbt_server/services/filesystem_service.py +++ b/dbt_server/services/filesystem_service.py @@ -4,6 +4,7 @@ from dbt_server.exceptions import StateNotFoundException from dbt_server import tracer +PARTIAL_PARSE_FILE = "partial_parse.msgpack" def get_working_dir(): return os.environ.get("__DBT_WORKING_DIR", "./working-dir") @@ -45,6 +46,12 @@ def write_file(path, contents): fh.write(contents) +@tracer.wrap +def copy_file(source_path, dest_path): + ensure_dir_exists(dest_path) + shutil.copyfile(source_path, dest_path) + + @tracer.wrap def read_serialized_manifest(path): try: @@ -55,7 +62,7 @@ def read_serialized_manifest(path): @tracer.wrap -def write_unparsed_manifest_to_disk(state_id, filedict): +def write_unparsed_manifest_to_disk(state_id, previous_state_id, filedict): root_path = get_root_path(state_id) if os.path.exists(root_path): shutil.rmtree(root_path) @@ -64,6 +71,14 @@ def write_unparsed_manifest_to_disk(state_id, filedict): path = get_path(state_id, filename) write_file(path, file_info.contents) + if previous_state_id and state_id != previous_state_id: + previous_partial_parse_path = get_path(previous_state_id, "target", PARTIAL_PARSE_FILE) + # TODO: We don't create a target folder-- will this change with click API? + new_partial_parse_path = get_path(state_id, "target", PARTIAL_PARSE_FILE) + if not os.path.exists(previous_partial_parse_path): + return + copy_file(previous_partial_parse_path, new_partial_parse_path) + @tracer.wrap def get_latest_state_id(state_id): diff --git a/dbt_server/state.py b/dbt_server/state.py index b46c346..05699a0 100644 --- a/dbt_server/state.py +++ b/dbt_server/state.py @@ -1,3 +1,4 @@ +import os from dbt_server.services import filesystem_service, dbt_service from dbt_server.exceptions import StateNotFoundException from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger @@ -64,6 +65,7 @@ def __init__( self.root_path = filesystem_service.get_root_path(state_id) self.serialize_path = filesystem_service.get_path(state_id, "manifest.msgpack") + self.partial_parse_path = filesystem_service.get_path(state_id, "target", filesystem_service.PARTIAL_PARSE_FILE) @classmethod @tracer.wrap @@ -144,7 +146,7 @@ def load_state(cls, state_id, args=None): @tracer.wrap def serialize_manifest(self): logger.info(f"Serializing manifest to file system ({self.serialize_path})") - dbt_service.serialize_manifest(self.manifest, self.serialize_path) + dbt_service.serialize_manifest(self.manifest, self.serialize_path, self.partial_parse_path) self.manifest_size = filesystem_service.get_size(self.serialize_path) @tracer.wrap diff --git a/dbt_server/views.py b/dbt_server/views.py index 4b0b8a2..6661daa 100644 --- a/dbt_server/views.py +++ b/dbt_server/views.py @@ -259,9 +259,11 @@ async def ready(): return JSONResponse(status_code=200, content={}) + @app.post("/push") def push_unparsed_manifest(args: PushProjectArgs): # Parse / validate it + previous_state_id = filesystem_service.get_latest_state_id(None) state_id = filesystem_service.get_latest_state_id(args.state_id) size_in_files = len(args.body) @@ -274,7 +276,7 @@ def push_unparsed_manifest(args: PushProjectArgs): # Stupid example of reusing an existing manifest if not os.path.exists(path): reuse = False - filesystem_service.write_unparsed_manifest_to_disk(state_id, args.body) + filesystem_service.write_unparsed_manifest_to_disk(state_id, previous_state_id, args.body) # Write messagepack repr to disk # Return a key that the client can use to operate on it? From 8c3055dca0d83c5ec3f0265bc789f55703c6bef4 Mon Sep 17 00:00:00 2001 From: Rachel Daniel Date: Wed, 21 Dec 2022 14:32:37 -0600 Subject: [PATCH 2/2] Updates todo --- dbt_server/services/filesystem_service.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dbt_server/services/filesystem_service.py b/dbt_server/services/filesystem_service.py index cf32cd5..7806815 100644 --- a/dbt_server/services/filesystem_service.py +++ b/dbt_server/services/filesystem_service.py @@ -72,8 +72,10 @@ def write_unparsed_manifest_to_disk(state_id, previous_state_id, filedict): write_file(path, file_info.contents) if previous_state_id and state_id != previous_state_id: + # TODO: The target folder is usually created during command runs and won't exist on push/parse + # of a new state. It can also be named by env var or flag -- hardcoding as this will change + # with the click API work previous_partial_parse_path = get_path(previous_state_id, "target", PARTIAL_PARSE_FILE) - # TODO: We don't create a target folder-- will this change with click API? new_partial_parse_path = get_path(state_id, "target", PARTIAL_PARSE_FILE) if not os.path.exists(previous_partial_parse_path): return