-
Notifications
You must be signed in to change notification settings - Fork 817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
destination connector method elements input #1674
Conversation
def write_elements( | ||
self, | ||
elements: t.List[Element], | ||
filename: t.Optional[str] = None, | ||
indent: int = 4, | ||
encoding: str = "utf-8", | ||
*args, | ||
**kwargs, | ||
) -> None: | ||
from fsspec import AbstractFileSystem, get_filesystem_class | ||
|
||
fs: AbstractFileSystem = get_filesystem_class(self.connector_config.protocol)( | ||
**self.connector_config.get_access_kwargs(), | ||
) | ||
|
||
logger.info(f"Writing content using filesystem: {type(fs).__name__}") | ||
|
||
s3_folder = self.connector_config.path | ||
|
||
s3_output_path = str(PurePath(s3_folder, filename)) if filename else s3_folder | ||
element_dict = convert_to_dict(elements) | ||
with tempfile.NamedTemporaryFile(mode="w+", encoding=encoding) as tmp_file: | ||
json.dump(element_dict, tmp_file, indent=indent) | ||
logger.debug(f"Uploading {tmp_file.name} -> {s3_output_path}") | ||
fs.put_file(lpath=tmp_file.name, rpath=s3_output_path) | ||
|
||
def write(self, docs: t.List[BaseIngestDoc]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unable to do the write/write_dict/write_elements pattern for this as you did the others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is a bit unique because I need to write the content to the filesystem regardless to upload to s3 using the pattern we've been using of fs.put_file()
. I can look into if that library support writing from a byte stream to the destination in which case maybe the write/write_dict/write_elements pattern could be used.
def write_elements(self, elements: t.List[Element], *args, **kwargs) -> None: | ||
elements_dict = convert_to_dict(elements) | ||
self.write_dict(json_list=elements_dict) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as previous comment, can we make this a function defined on BaseIngestDoc?
7d154b1
to
d544c50
Compare
@ryannikolaidis given that there's only 3 implementations of this so far, and how varied those 3 are in the implementation, would make sense until we have some more use cases before we look for generalizations in the code. |
I have a hunch that once you get in and code, most implementations should fall into that write/write_dict/write_elements pattern and would definitely like to avoid another copy and paste across every single connector if there are patterns we can abstract (even if they are optionally overwritten where needed or mixed in). maybe worth taking a second to explore you're comment on s3 as an option. overall I think the code itself makes sense if you want to start moving forward. that's just my biggest caution here. |
21f16f7
to
88c07f3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! one request: make sure we have this included in the README destination connector steps when the doc PR lands
88c07f3
to
72a44e4
Compare
This reverts commit d843178.
72a44e4
to
ddc8e7c
Compare
Description
Ingest destination connectors support for writing raw list of elements Along with the default write method used in the ingest pipeline to write the json content associated with the ingest docs, each destination connector can now also write a raw list of elements to the desired downstream location without having an ingest doc associated with it.