Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partial parsing #151

Merged
merged 2 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbt_server/services/dbt_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ def parse_to_manifest(project_path, args):


@tracer.wrap
def serialize_manifest(manifest, serialize_path):
def serialize_manifest(manifest, serialize_path, partial_parse_path):
manifest_msgpack = dbt_serialize_manifest(manifest)
filesystem_service.write_file(serialize_path, manifest_msgpack)
filesystem_service.write_file(partial_parse_path, manifest_msgpack)


@tracer.wrap
Expand Down
19 changes: 18 additions & 1 deletion dbt_server/services/filesystem_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dbt_server.exceptions import StateNotFoundException
from dbt_server import tracer

PARTIAL_PARSE_FILE = "partial_parse.msgpack"

def get_working_dir():
return os.environ.get("__DBT_WORKING_DIR", "./working-dir")
Expand Down Expand Up @@ -45,6 +46,12 @@ def write_file(path, contents):
fh.write(contents)


@tracer.wrap
def copy_file(source_path, dest_path):
ensure_dir_exists(dest_path)
shutil.copyfile(source_path, dest_path)


@tracer.wrap
def read_serialized_manifest(path):
try:
Expand All @@ -55,7 +62,7 @@ def read_serialized_manifest(path):


@tracer.wrap
def write_unparsed_manifest_to_disk(state_id, filedict):
def write_unparsed_manifest_to_disk(state_id, previous_state_id, filedict):
root_path = get_root_path(state_id)
if os.path.exists(root_path):
shutil.rmtree(root_path)
Expand All @@ -64,6 +71,16 @@ def write_unparsed_manifest_to_disk(state_id, filedict):
path = get_path(state_id, filename)
write_file(path, file_info.contents)

if previous_state_id and state_id != previous_state_id:
# TODO: The target folder is usually created during command runs and won't exist on push/parse
# of a new state. It can also be named by env var or flag -- hardcoding as this will change
# with the click API work
previous_partial_parse_path = get_path(previous_state_id, "target", PARTIAL_PARSE_FILE)
new_partial_parse_path = get_path(state_id, "target", PARTIAL_PARSE_FILE)
if not os.path.exists(previous_partial_parse_path):
return
copy_file(previous_partial_parse_path, new_partial_parse_path)


@tracer.wrap
def get_latest_state_id(state_id):
Expand Down
4 changes: 3 additions & 1 deletion dbt_server/state.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from dbt_server.services import filesystem_service, dbt_service
from dbt_server.exceptions import StateNotFoundException
from dbt_server.logging import DBT_SERVER_EVENT_LOGGER as logger
Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(

self.root_path = filesystem_service.get_root_path(state_id)
self.serialize_path = filesystem_service.get_path(state_id, "manifest.msgpack")
self.partial_parse_path = filesystem_service.get_path(state_id, "target", filesystem_service.PARTIAL_PARSE_FILE)

@classmethod
@tracer.wrap
Expand Down Expand Up @@ -144,7 +146,7 @@ def load_state(cls, state_id, args=None):
@tracer.wrap
def serialize_manifest(self):
logger.info(f"Serializing manifest to file system ({self.serialize_path})")
dbt_service.serialize_manifest(self.manifest, self.serialize_path)
dbt_service.serialize_manifest(self.manifest, self.serialize_path, self.partial_parse_path)
self.manifest_size = filesystem_service.get_size(self.serialize_path)

@tracer.wrap
Expand Down
4 changes: 3 additions & 1 deletion dbt_server/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,11 @@ async def ready():
return JSONResponse(status_code=200, content={})



@app.post("/push")
def push_unparsed_manifest(args: PushProjectArgs):
# Parse / validate it
previous_state_id = filesystem_service.get_latest_state_id(None)
state_id = filesystem_service.get_latest_state_id(args.state_id)

size_in_files = len(args.body)
Expand All @@ -274,7 +276,7 @@ def push_unparsed_manifest(args: PushProjectArgs):
# Stupid example of reusing an existing manifest
if not os.path.exists(path):
reuse = False
filesystem_service.write_unparsed_manifest_to_disk(state_id, args.body)
filesystem_service.write_unparsed_manifest_to_disk(state_id, previous_state_id, args.body)

# Write messagepack repr to disk
# Return a key that the client can use to operate on it?
Expand Down