Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: unstructured ingest as a pipeline #1551

Merged
merged 52 commits into from
Oct 6, 2023
Merged

Conversation

rbiseck3
Copy link
Contributor

@rbiseck3 rbiseck3 commented Sep 27, 2023

Description

As we add more and more steps to the pipeline (i.e. chunking, embedding, table manipulation), it would help seperate the responsibility of each of these into their own processes, running each in parallel using json files to share data across. This will also help guarantee data is serializable if this code was used in an actual pipeline. Following is a flow diagram of the proposed changes. As part of this change:

  • A parent pipeline class will be responsible for running each node, which can optionally be run via multiprocessing if it supports it, or not. Possible nodes at this moment:
    • Doc factory: creates all the ingest docs via the source connector
    • Source: reads/downloads all of the content to process to the local filesystem to the location set by the download_dir parameter.
    • Partition: runs partition on all of the downloaded content in json format.
    • Any number of reformat nodes that modify the partitioned content. This can include chunking, embedding, etc.
    • Write: push the final json into the destination via the destination connector
  • This pipeline relies on the information of the ingest docs to be available via their serialization. An optimization was introduced with the IngestDocJsonMixin which adds in all the @property fields to the serialized json already being created via the DataClassJsonMixin
  • For all intermediate steps (partitioning, reformatting), the content is saved to a dedicated location on the local filesystem. Right now it's set to $HOME/.cache/unstructured/ingest/pipeline/STEP_NAME/.
  • Minor changes: made sense to move some of the config parameters between the read and partition configs when I explicitly divided the responsibility to download vs partition the content in the pipeline.
  • The pipeline class only makes the doc factory, source and partition nodes required, keeping with the logic that has been supported so far. All reformatting nodes and write node are optional.
  • Long term, there should also be some changes to the base configs supported by the CLI to support pipeline specific configs, but for now what exists was used to minimize changes in this PR.
  • Final step to copy the final output to the location designated by the _output_filename value of the ingest doc.
  • Hashing occurs at each step by hashing the parameters of that step (i.e. partition configs) along with the previous step via the filename used. This allows each step to be the same if all the parameters for it have not changed and the content so far is the same.
  • The only data that is shared and has writes to across processes is the dictionary of ingest json data. This dict is created using the multiprocessing.manager.DictProxy to make sure any interaction with it is behind a lock.

Minor refactors included:

  • Utility methods added to extract configs from the click options
  • Utility method to add common options to click commands.
  • All writers moved to using the class approach which extracts a lot of the common code so there's less copy-paste when new runners are added.
  • Use @property for source metadata on base ingest doc to add logic to call update_source_metadata if it's still None at the time it's fetched.

Additional bug fixes included

  • Fsspec connectors were not serializable due to the ingest_doc_cls. This was removed from the fields captured by the @dataclass decorator and added in a __post_init__ method.
  • Various reddit connector params were missing. This doesn't have an explicit ingest test at the moment so was never caught.
  • Fsspec connector had the parent update_source_metadata misnamed as update_source_metadata_metadata so it was never being called.

Flow Diagram

ingest_pipeline

@rbiseck3 rbiseck3 force-pushed the roman/ingest-pipeline branch from b780f47 to 37813ce Compare September 29, 2023 17:54
@rbiseck3 rbiseck3 marked this pull request as ready for review September 29, 2023 17:55
@rbiseck3 rbiseck3 force-pushed the roman/ingest-pipeline branch from 37813ce to 2df5fd3 Compare September 29, 2023 18:20
@rbiseck3 rbiseck3 requested a review from badGarnet October 2, 2023 15:18
@rbiseck3 rbiseck3 added enhancement New feature or request ingest labels Oct 2, 2023
Copy link
Collaborator

@badGarnet badGarnet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some general comments:

  • can we switch to use concurrent executor pool from concurrent.futures instead of multiprocessing pool? The executor is better at avoiding thread locks than pool. Given we potentially run models with their own mp backend (e.g., huggingface tokenizer uses rust backend) we can get into trouble with threadlock easily
  • the rust backend from huggingface is also an example for careful setting up the pipeline: there are nodes in this design that inherently does multi-threading/procesing on its own: While the attribute support_multiprocessing can help id them it doesn't help when some are preferred to not be called AFTER or BEFORE python's multi processing: e.g., python version hangs on encode_batch when run in subprocess huggingface/tokenizers#187
  • how does this pipeline optimize a workflow where part of the process is on GPU and we can utilize the CPU for other tasks at the same time (e.g., one of the reformat node uses a GPU model and another runs a randomforest model on CPU and they don't depend on each other)? Maybe we want to think about aync tasks? Which would favor likely a pool of concurrent executors as well

@rbiseck3
Copy link
Contributor Author

rbiseck3 commented Oct 2, 2023

@badGarnet valid points, I'm not sure if some of them should be scoped to their own effort? To limit how much is changed in this PR, the pool approach is what's already being used, this only splits it up into discrete steps run by a pipeline class.

@ryannikolaidis
Copy link
Contributor

-output-dir...Should we still support this parameter? If so, should a step be added to copy the final json files to this location?

yes, I think this is a pretty reasonable expectation for the simple source connector only flow, which folks likely will still hit. I think adding that final copy step makes sense to me.

@ryannikolaidis
Copy link
Contributor

Does it make sense to hash ingest docs on download location? If a new pipeline was ran with the same files but not the same parameters for the pipeline steps, this might not make sense. But we want to be able to pick up from a node in the pipeline to avoid reprocessing if we don't have to.

I know we hash the download location by url, but what do you mean by hash ingest docs ? where do IngestDocs themselves get hashed? or is this in the new flow..is this a hash used for storing data from the intermediate nodes/stages? if it’s the latter, then I agree, at a minimum the hash should include any parameters at that node/stage and all of the parameters from upstream nodes/stages.

@ryannikolaidis
Copy link
Contributor

How to force reprocessing of a particular step? Or would this apply to the whole pipeline if the --reprocess flag passed into the CLI invocation without further granularity?

I think we’ll still want a way to just allow a user to reprocess the whole pipeline. maybe --reprocess still does that and add a new flag to specify reprocessing a specific stage? presumably you can only specify one specific one since anything downstream of the earliest reprocessed node would also have to get reprocessed?

@rbiseck3
Copy link
Contributor Author

rbiseck3 commented Oct 2, 2023

@ryannikolaidis, regarding the hash, the pipeline context that gets passed around across nodes contains a

ingest_docs_map: dict

Which gets populated via

ingest_docs_map[get_ingest_doc_hash(doc)] = doc

Where doc is ingest_doc.to_json(). And the hash is derived from

hashed = hashlib.sha256(json_as_dict.get("filename").encode()).hexdigest()[:32]

This allows every stage be able to map the content they're producing back to the original ingest doc class.

@ryannikolaidis
Copy link
Contributor

@ryannikolaidis, regarding the hash, the pipeline context that gets passed around across nodes contains a

ingest_docs_map: dict

Which gets populated via

ingest_docs_map[get_ingest_doc_hash(doc)] = doc

Where doc is ingest_doc.to_json(). And the hash is derived from

hashed = hashlib.sha256(json_as_dict.get("filename").encode()).hexdigest()[:32]

This allows every stage be able to map the content they're producing back to the original ingest doc class.

got it. yea, then I agree, you definitely can't just rely on download location for this hash.

@rbiseck3 rbiseck3 force-pushed the roman/ingest-pipeline branch 5 times, most recently from 247958d to 2474bd2 Compare October 5, 2023 22:07
CHANGELOG.md Outdated Show resolved Hide resolved
@ryannikolaidis
Copy link
Contributor

given how much this changes, probably worth a refresh to the README and supporting diagrams to describe the entire flow in the context of these changes.

@rbiseck3 rbiseck3 force-pushed the roman/ingest-pipeline branch from c9f9c31 to 4bb636b Compare October 6, 2023 13:23
@rbiseck3
Copy link
Contributor Author

rbiseck3 commented Oct 6, 2023

@ryannikolaidis, I added a new issue to update all documentation given this new approach. If I was to do that in the same PR, it would a ton more changed files to the already large list of changes.

@rbiseck3 rbiseck3 linked an issue Oct 6, 2023 that may be closed by this pull request
@ryannikolaidis
Copy link
Contributor

@ryannikolaidis, I added a new issue to update all documentation given this new approach. If I was to do that in the same PR, it would a ton more changed files to the already large list of changes.

yep, all good. thanks!

Copy link
Contributor

@ryannikolaidis ryannikolaidis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good! thanks for all of the changes.

@ryannikolaidis ryannikolaidis changed the title roman/refactor ingest as pipeline refactor: unstructured ingest as a pipeline Oct 6, 2023
@ryannikolaidis ryannikolaidis added this pull request to the merge queue Oct 6, 2023
@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Oct 6, 2023
@ryannikolaidis ryannikolaidis added this pull request to the merge queue Oct 6, 2023
Merged via the queue into main with commit 2e1404e Oct 6, 2023
39 checks passed
@ryannikolaidis ryannikolaidis deleted the roman/ingest-pipeline branch October 6, 2023 19:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

refactor ingest process to run as pipeline
3 participants