diff --git a/document_ai_warehouse/common/src/common/utils/document_ai_utils.py b/document_ai_warehouse/common/src/common/utils/document_ai_utils.py index 0e0b4d9cc..ff18a3478 100644 --- a/document_ai_warehouse/common/src/common/utils/document_ai_utils.py +++ b/document_ai_warehouse/common/src/common/utils/document_ai_utils.py @@ -3,9 +3,9 @@ import time from typing import Any, Dict, List, Optional +from common.utils.helper import split_uri_2_bucket_prefix from common.utils.logging_handler import Logger from common.utils.storage_utils import read_binary_object -from common.utils.helper import split_uri_2_bucket_prefix from google.api_core.client_options import ClientOptions from google.api_core.exceptions import InternalServerError from google.api_core.exceptions import RetryError @@ -54,11 +54,11 @@ def get_processor(self, processor_id: str): return client.get_processor(request=request) def process_file_from_gcs( - self, - processor_id: str, - bucket_name: str, - file_path: str, - mime_type: str = "application/pdf", + self, + processor_id: str, + bucket_name: str, + file_path: str, + mime_type: str = "application/pdf", ) -> documentai.Document: client = self.get_docai_client() parent = self.get_parent() @@ -67,12 +67,8 @@ def process_file_from_gcs( document_content = read_binary_object(bucket_name, file_path) - document = documentai.RawDocument( - content=document_content, mime_type=mime_type - ) - request = documentai.ProcessRequest( - raw_document=document, name=processor_name - ) + document = documentai.RawDocument(content=document_content, mime_type=mime_type) + request = documentai.ProcessRequest(raw_document=document, name=processor_name) response = client.process_document(request) @@ -103,11 +99,11 @@ def get_entity_key_value_pairs(docai_document): return fields def batch_extraction( - self, - processor_id: str, - input_uris: List[str], - gcs_output_bucket: str, - timeout=600, + self, + processor_id: str, + input_uris: List[str], + gcs_output_bucket: str, + timeout=600, ): if len(input_uris) == 0: return [] @@ -176,7 +172,9 @@ def batch_extraction( f"batch_extraction - Batch Process Failed: {metadata.state_message}" ) - documents: Dict[str, Any] = {} # Contains per processed document, keys are path to original document + documents: Dict[ + str, Any + ] = {} # Contains per processed document, keys are path to original document # One process per Input Document for process in metadata.individual_process_statuses: @@ -258,9 +256,9 @@ def merge_json_files(files): # Handling Nested labels for CDE processor def get_key_values_dic( - entity: documentai.Document.Entity, - document_entities: Dict[str, List[Any]], - parent_key: Optional[str] = None, + entity: documentai.Document.Entity, + document_entities: Dict[str, List[Any]], + parent_key: Optional[str] = None, ) -> None: # Fields detected. For a full list of fields for each processor see # the processor documentation: @@ -272,8 +270,8 @@ def get_key_values_dic( if normalized_value: if ( - isinstance(normalized_value, dict) - and "booleanValue" in normalized_value.keys() + isinstance(normalized_value, dict) + and "booleanValue" in normalized_value.keys() ): normalized_value = normalized_value.get("booleanValue") else: diff --git a/document_ai_warehouse/common/src/common/utils/document_warehouse_utils.py b/document_ai_warehouse/common/src/common/utils/document_warehouse_utils.py index 8c15ab839..8f39f86ad 100644 --- a/document_ai_warehouse/common/src/common/utils/document_warehouse_utils.py +++ b/document_ai_warehouse/common/src/common/utils/document_warehouse_utils.py @@ -248,13 +248,10 @@ def set_raw_document_file_type_from_mimetype( mime_to_dw_mime_enum = { "application/pdf": document.raw_document_file_type.RAW_DOCUMENT_FILE_TYPE_PDF, - "application/vnd.openxmlformats-officedocument.wordprocessingml.document": - document.raw_document_file_type.RAW_DOCUMENT_FILE_TYPE_DOCX, + "application/vnd.openxmlformats-officedocument.wordprocessingml.document": document.raw_document_file_type.RAW_DOCUMENT_FILE_TYPE_DOCX, "text/plain": document.raw_document_file_type.RAW_DOCUMENT_FILE_TYPE_TEXT, - "application/vnd.openxmlformats-officedocument.presentationml.presentation": - document.raw_document_file_type.RAW_DOCUMENT_FILE_TYPE_PPTX, - "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": - document.raw_document_file_type.RAW_DOCUMENT_FILE_TYPE_XLSX, + "application/vnd.openxmlformats-officedocument.presentationml.presentation": document.raw_document_file_type.RAW_DOCUMENT_FILE_TYPE_PPTX, + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": document.raw_document_file_type.RAW_DOCUMENT_FILE_TYPE_XLSX, } if mime_type.lower() in mime_to_dw_mime_enum: document.raw_document_file_type = mime_to_dw_mime_enum[mime_type.lower()] diff --git a/document_ai_warehouse/common/src/common/utils/logging_handler.py b/document_ai_warehouse/common/src/common/utils/logging_handler.py index ffe7ecd1a..aafca78f2 100644 --- a/document_ai_warehouse/common/src/common/utils/logging_handler.py +++ b/document_ai_warehouse/common/src/common/utils/logging_handler.py @@ -13,8 +13,9 @@ See the License for the specific language governing permissions and limitations under the License. """ -import os import logging +import os + import google.cloud.logging_v2 """class and methods for logs handling.""" diff --git a/document_ai_warehouse/common/src/common/utils/storage_utils.py b/document_ai_warehouse/common/src/common/utils/storage_utils.py index 433559bd3..4dfc8afc7 100644 --- a/document_ai_warehouse/common/src/common/utils/storage_utils.py +++ b/document_ai_warehouse/common/src/common/utils/storage_utils.py @@ -22,7 +22,12 @@ def file_exists(bucket_name: str, file_name: str): return stats -def write_gcs_blob(bucket_name: str, file_name: str, content_as_str: str, content_type: str = "text/plain"): +def write_gcs_blob( + bucket_name: str, + file_name: str, + content_as_str: str, + content_type: str = "text/plain", +): bucket = storage_client.get_bucket(bucket_name) gcs_file = bucket.blob(file_name) gcs_file.upload_from_string(content_as_str, content_type=content_type) diff --git a/document_ai_warehouse/document_ai_warehouse_batch_ingestion/main.py b/document_ai_warehouse/document_ai_warehouse_batch_ingestion/main.py index d7341c0b9..c663fee5e 100644 --- a/document_ai_warehouse/document_ai_warehouse_batch_ingestion/main.py +++ b/document_ai_warehouse/document_ai_warehouse_batch_ingestion/main.py @@ -2,7 +2,16 @@ import json import os import time -from typing import List, Dict, Any, Set, Tuple, Optional +from typing import Any, Dict, List, Optional, Set, Tuple + +from common.utils import helper +from common.utils import storage_utils +from common.utils.docai_warehouse_helper import get_key_value_pairs +from common.utils.docai_warehouse_helper import get_metadata_properties +from common.utils.document_ai_utils import DocumentaiUtils +from common.utils.document_warehouse_utils import DocumentWarehouseUtils +from common.utils.helper import is_date +from common.utils.logging_handler import Logger from config import API_LOCATION from config import CALLER_USER from config import DOCAI_PROJECT_NUMBER @@ -13,14 +22,6 @@ from google.api_core.exceptions import NotFound from google.cloud import contentwarehouse_v1 from google.cloud import storage -from common.utils import helper -from common.utils import storage_utils -from common.utils.docai_warehouse_helper import get_key_value_pairs -from common.utils.docai_warehouse_helper import get_metadata_properties -from common.utils.document_ai_utils import DocumentaiUtils -from common.utils.document_warehouse_utils import DocumentWarehouseUtils -from common.utils.helper import is_date -from common.utils.logging_handler import Logger dw_utils = DocumentWarehouseUtils( project_number=DOCAI_WH_PROJECT_NUMBER, api_location=API_LOCATION @@ -45,8 +46,10 @@ def get_schema(args: argparse.Namespace): f"CALLER_USER={CALLER_USER}" ) - assert processor_id, "processor_id is not set as PROCESSOR_ID env variable and " \ - "is not provided as an input parameter (-p)" + assert processor_id, ( + "processor_id is not set as PROCESSOR_ID env variable and " + "is not provided as an input parameter (-p)" + ) assert GCS_OUTPUT_BUCKET, "GCS_OUTPUT_BUCKET not set" assert DOCAI_PROJECT_NUMBER, "DOCAI_PROJECT_NUMBER not set" @@ -112,18 +115,27 @@ def batch_ingest(args: argparse.Namespace) -> None: f"CALLER_USER={CALLER_USER}" ) - assert processor_id, "processor_id is not set as PROCESSOR_ID env variable and " \ - "is not provided as an input parameter (-p)" + assert processor_id, ( + "processor_id is not set as PROCESSOR_ID env variable and " + "is not provided as an input parameter (-p)" + ) assert GCS_OUTPUT_BUCKET, "GCS_OUTPUT_BUCKET not set" assert DOCAI_PROJECT_NUMBER, "DOCAI_PROJECT_NUMBER not set" assert DOCAI_WH_PROJECT_NUMBER, "DOCAI_WH_PROJECT_NUMBER not set" initial_start_time = time.time() - created_folders, files_to_parse, processed_files, processed_dirs, error_files = \ - prepare_file_structure(dir_uri, folder_name, overwrite, flatten) + ( + created_folders, + files_to_parse, + processed_files, + processed_dirs, + error_files, + ) = prepare_file_structure(dir_uri, folder_name, overwrite, flatten) - created_schemas, document_id_list = proces_documents(files_to_parse, schema_id, schema_name, processor_id, options) + created_schemas, document_id_list = proces_documents( + files_to_parse, schema_id, schema_name, processor_id, options + ) process_time = time.time() - initial_start_time time_elapsed = round(process_time) @@ -147,11 +159,12 @@ def batch_ingest(args: argparse.Namespace) -> None: ) -FUNCTION_MAP = {'batch_ingest': batch_ingest, - 'get_schema': get_schema, - 'upload_schema': upload_schema, - 'delete_schema': delete_schema, - } +FUNCTION_MAP = { + "batch_ingest": batch_ingest, + "get_schema": get_schema, + "upload_schema": upload_schema, + "delete_schema": delete_schema, +} def main(): @@ -186,19 +199,17 @@ def get_args(): """, ) - args_parser.add_argument('command', choices=FUNCTION_MAP.keys()) + args_parser.add_argument("command", choices=FUNCTION_MAP.keys()) args_parser.add_argument( "-d", dest="dir_uri", help="Path to gs directory uri, containing data with PDF documents to be loaded. " - "All original structure of sub-folders will be preserved.", + "All original structure of sub-folders will be preserved.", ) args_parser.add_argument( "-s", dest="schema_id", help="Optional existing schema_id." ) - args_parser.add_argument( - "-p", dest="processor_id", help="Processor_ID." - ) + args_parser.add_argument("-p", dest="processor_id", help="Processor_ID.") args_parser.add_argument( "-sn", dest="schema_name", @@ -235,7 +246,7 @@ def get_args(): "-n", dest="root_name", help="Name of the root folder inside DW for batch ingestion." - " When skipped, will use the same name of the folder being loaded from.", + " When skipped, will use the same name of the folder being loaded from.", ) args_parser.add_argument( "-sns", @@ -255,11 +266,11 @@ def get_args(): def proces_documents( - files_to_parse: Dict[str, Any], - schema_id: str, - schema_name: str, - processor_id: str, - options: bool + files_to_parse: Dict[str, Any], + schema_id: str, + schema_name: str, + processor_id: str, + options: bool, ) -> Tuple[Set[str], List[str]]: created_schemas: Set[str] = set() document_id_list: List[str] = [] @@ -334,7 +345,6 @@ def prepare_file_structure( overwrite: bool, flatten: bool, ): - created_folders = [] files_to_parse = {} processed_files = [] @@ -541,7 +551,9 @@ def create_folder_schema(schema_path: str) -> str: return folder_schema_id -def create_folder(folder_schema_id: str, display_name: str, reference_id: str) -> Optional[str]: +def create_folder( + folder_schema_id: str, display_name: str, reference_id: str +) -> Optional[str]: reference_path = f"referenceId/{reference_id}" try: document = dw_utils.get_document(reference_path, CALLER_USER) diff --git a/toolbox-batch-processing/documentai-toolbox-batch-entity-extraction.ipynb b/toolbox-batch-processing/documentai-toolbox-batch-entity-extraction.ipynb index 118532a99..57a357d58 100644 --- a/toolbox-batch-processing/documentai-toolbox-batch-entity-extraction.ipynb +++ b/toolbox-batch-processing/documentai-toolbox-batch-entity-extraction.ipynb @@ -1,316 +1,316 @@ { - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "1ouFwNhyEWPf", - "metadata": { - "id": "1ouFwNhyEWPf" - }, - "outputs": [], - "source": [ - "%pip install --upgrade google-cloud-documentai google-cloud-documentai-toolbox pandas --user" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "Sh1oolV7Mael", - "metadata": { - "id": "Sh1oolV7Mael" - }, - "outputs": [], - "source": [ - "!gcloud auth application-default login" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "id": "x9hTHkFrX_5N", - "metadata": { - "executionInfo": { - "elapsed": 257, - "status": "ok", - "timestamp": 1694541469217, - "user": { - "displayName": "", - "userId": "" - }, - "user_tz": 300 - }, - "id": "x9hTHkFrX_5N" - }, - "outputs": [], - "source": [ - "from typing import List, Optional\n", - "\n", - "# https://googleapis.dev/python/google-api-core/latest/client_options.html\n", - "from google.api_core.client_options import ClientOptions\n", - "\n", - "# https://cloud.google.com/python/docs/reference/documentai/latest\n", - "from google.cloud import documentai\n", - "\n", - "# https://cloud.google.com/document-ai/docs/toolbox\n", - "from google.cloud import documentai_toolbox\n", - "\n", - "import pandas as pd\n", - "from tabulate import tabulate" - ] - }, - { - "cell_type": "code", - "source": [ - "# TODO(developer): Fill these variables before running the sample.\n", - "project_id = \"document-ai-test-337818\"\n", - "location = \"us\" # Format is \"us\" or \"eu\"\n", - "processor_id = \"6ed36cb2acbc5389\" # Create processor before running sample\n", - "processor_version_id = \"193c6216c19e4b93\"\n", - "\n", - "gcs_input_uri = \"gs://cloud-samples-data/documentai/SampleDocuments/CONTRACT_PROCESSOR/\" # Format: `gs://bucket/directory/`\n", - "gcs_output_uri = \"gs://document-ai-test-bucket-2/genai_cde/\" # Must end with a trailing slash `/`. Format: `gs://bucket/directory/subdirectory/`\n", - "\n", - "batch_size = 1000\n", - "field_mask = \"text,entities,pages.pageNumber\" # Optional. The fields to return in the Document object." - ], - "metadata": { - "id": "7DSQUZl7wtY8" - }, - "id": "7DSQUZl7wtY8", - "execution_count": null, - "outputs": [] - }, - { - "cell_type": "markdown", - "source": [ - "## Batch Processing\n", - "\n", - "- Create batches of 1000 documents in Google Cloud Storage.\n", - "- Make a batch processing request for each batch.\n", - "- Get long-running operation ID for each request." - ], - "metadata": { - "id": "3Iaq7M5MvkqG" - }, - "id": "3Iaq7M5MvkqG" - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "35856bf2-aa5e-436b-977a-9e5725b1a595", - "metadata": { - "executionInfo": { - "elapsed": 3, - "status": "ok", - "timestamp": 1694541463780, - "user": { - "displayName": "", - "userId": "" - }, - "user_tz": 300 - }, - "id": "35856bf2-aa5e-436b-977a-9e5725b1a595", - "trusted": true - }, - "outputs": [], - "source": [ - "def batch_process_toolbox(\n", - " project_id: str,\n", - " location: str,\n", - " processor_id: str,\n", - " processor_version_id: str,\n", - " gcs_input_uri: str,\n", - " gcs_output_uri: str,\n", - " batch_size: int,\n", - " field_mask: Optional[str] = None,\n", - " skip_human_review: bool = True\n", - ") -> List[str]:\n", - " client = documentai.DocumentProcessorServiceClient(\n", - " client_options=ClientOptions(\n", - " api_endpoint=f\"{location}-documentai.googleapis.com\"\n", - " )\n", - " )\n", - "\n", - " # The full resource name of the processor version, e.g.:\n", - " # projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}\n", - " name = client.processor_version_path(\n", - " project_id, location, processor_id, processor_version_id\n", - " )\n", - "\n", - " # Cloud Storage URI for the Output Directory\n", - " output_config = documentai.DocumentOutputConfig(\n", - " gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig(\n", - " gcs_uri=gcs_output_uri, field_mask=field_mask\n", - " )\n", - " )\n", - "\n", - " # Create batches of documents for processing\n", - " # https://cloud.google.com/python/docs/reference/documentai-toolbox/latest/google.cloud.documentai_toolbox.utilities.gcs_utilities\n", - " gcs_bucket_name, gcs_prefix = documentai_toolbox.gcs_utilities.split_gcs_uri(\n", - " gcs_input_uri\n", - " )\n", - " batches = documentai_toolbox.gcs_utilities.create_batches(\n", - " gcs_bucket_name, gcs_prefix, batch_size=batch_size\n", - " )\n", - "\n", - " operation_names: List[str] = []\n", - "\n", - " print(f\"{len(batches)} batches created.\")\n", - " for batch in batches:\n", - " print(f\"{len(batch.gcs_documents.documents)} files in batch.\")\n", - " print(batch.gcs_documents.documents)\n", - " request = documentai.BatchProcessRequest(\n", - " name=name,\n", - " input_documents=batch,\n", - " document_output_config=output_config,\n", - " skip_human_review=skip_human_review,\n", - " )\n", - "\n", - " # https://cloud.google.com/document-ai/docs/send-request?hl=en#async-processor\n", - " # `batch_process_documents()` returns a Long Running Operation (LRO)\n", - " operation = client.batch_process_documents(request)\n", - " # Operation Name Format: `projects/{project_id}/locations/{location}/operations/{operation_id}`\n", - " operation_names.append(operation.operation.name)\n", - "\n", - " return operation_names" - ] - }, - { - "cell_type": "markdown", - "source": [ - "## Retrieve results once processing is complete\n", - "\n", - "- Get output [`Document`](https://cloud.google.com/document-ai/docs/reference/rest/v1/Document) JSON files from `gcs_output_bucket` based on the Operation ID." - ], - "metadata": { - "id": "op0ZCWTIwDgR" - }, - "id": "op0ZCWTIwDgR" - }, - { - "cell_type": "code", - "source": [ - "def retrieve_results(\n", - " operation_names: List[str],\n", - ") -> List[documentai_toolbox.document.Document]:\n", - " # Can do this asynchronously to avoid blocking\n", - " all_documents: List[documentai_toolbox.document.Document] = []\n", - "\n", - " for operation in operation_names:\n", - " # https://cloud.google.com/document-ai/docs/long-running-operations\n", - " print(f\"Waiting for operation {operation}\")\n", - " documents: List = (\n", - " documentai_toolbox.document.Document.from_batch_process_operation(\n", - " location=location, operation_name=operation\n", - " )\n", - " )\n", - " all_documents.extend(documents)\n", - " return all_documents" - ], - "metadata": { - "id": "2-KpEaoRwEHv" - }, - "id": "2-KpEaoRwEHv", - "execution_count": null, - "outputs": [] + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "1ouFwNhyEWPf", + "metadata": { + "id": "1ouFwNhyEWPf" + }, + "outputs": [], + "source": [ + "%pip install --upgrade google-cloud-documentai google-cloud-documentai-toolbox pandas --user" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "Sh1oolV7Mael", + "metadata": { + "id": "Sh1oolV7Mael" + }, + "outputs": [], + "source": [ + "!gcloud auth application-default login" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "x9hTHkFrX_5N", + "metadata": { + "executionInfo": { + "elapsed": 257, + "status": "ok", + "timestamp": 1694541469217, + "user": { + "displayName": "", + "userId": "" + }, + "user_tz": 300 }, - { - "cell_type": "markdown", - "source": [ - "## Print results\n", - "\n", - "- Export extracted entities as dictionary\n", - "- Load into Pandas DataFrame\n", - "- Print Dataframe" - ], - "metadata": { - "id": "445FQsfrwc4N" - }, - "id": "445FQsfrwc4N" + "id": "x9hTHkFrX_5N" + }, + "outputs": [], + "source": [ + "from typing import List, Optional\n", + "\n", + "# https://googleapis.dev/python/google-api-core/latest/client_options.html\n", + "from google.api_core.client_options import ClientOptions\n", + "\n", + "# https://cloud.google.com/python/docs/reference/documentai/latest\n", + "from google.cloud import documentai\n", + "\n", + "# https://cloud.google.com/document-ai/docs/toolbox\n", + "from google.cloud import documentai_toolbox\n", + "\n", + "import pandas as pd\n", + "from tabulate import tabulate" + ] + }, + { + "cell_type": "code", + "source": [ + "# TODO(developer): Fill these variables before running the sample.\n", + "project_id = \"document-ai-test-337818\"\n", + "location = \"us\" # Format is \"us\" or \"eu\"\n", + "processor_id = \"6ed36cb2acbc5389\" # Create processor before running sample\n", + "processor_version_id = \"193c6216c19e4b93\"\n", + "\n", + "gcs_input_uri = \"gs://cloud-samples-data/documentai/SampleDocuments/CONTRACT_PROCESSOR/\" # Format: `gs://bucket/directory/`\n", + "gcs_output_uri = \"gs://document-ai-test-bucket-2/genai_cde/\" # Must end with a trailing slash `/`. Format: `gs://bucket/directory/subdirectory/`\n", + "\n", + "batch_size = 1000\n", + "field_mask = \"text,entities,pages.pageNumber\" # Optional. The fields to return in the Document object." + ], + "metadata": { + "id": "7DSQUZl7wtY8" + }, + "id": "7DSQUZl7wtY8", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Batch Processing\n", + "\n", + "- Create batches of 1000 documents in Google Cloud Storage.\n", + "- Make a batch processing request for each batch.\n", + "- Get long-running operation ID for each request." + ], + "metadata": { + "id": "3Iaq7M5MvkqG" + }, + "id": "3Iaq7M5MvkqG" + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "35856bf2-aa5e-436b-977a-9e5725b1a595", + "metadata": { + "executionInfo": { + "elapsed": 3, + "status": "ok", + "timestamp": 1694541463780, + "user": { + "displayName": "", + "userId": "" + }, + "user_tz": 300 }, - { - "cell_type": "code", - "execution_count": 7, - "id": "KxVFCVNVLLwW", - "metadata": { - "colab": { - "base_uri": "https://localhost:8080/" - }, - "executionInfo": { - "elapsed": 2604, - "status": "ok", - "timestamp": 1694541481158, - "user": { - "displayName": "", - "userId": "" - }, - "user_tz": 300 - }, - "id": "KxVFCVNVLLwW", - "outputId": "2ada6f15-b774-4f55-fa73-b0e7064cd437" - }, - "outputs": [ - { - "output_type": "stream", - "name": "stdout", - "text": [ - "Waiting for operation projects/908687846511/locations/us/operations/10629941502076889990\n", - "+----+----------------------------------------------+------------------+----------------------------+------------------------------------+-----------------------+----------------+\n", - "| | contract_title | execution_date | jurisdiction_governed_by | legal_parties | renewal_term_length | term_length |\n", - "|----+----------------------------------------------+------------------+----------------------------+------------------------------------+-----------------------+----------------|\n", - "| 0 | WORLDWIDE LICENSE AND DISTRIBUTION AGREEMENT | August 6, 2015 | New York. | ['Cymbal Inc.', 'B-Cafetal, Inc.'] | five (5) year | Ten (10) years |\n", - "+----+----------------------------------------------+------------------+----------------------------+------------------------------------+-----------------------+----------------+\n" - ] - } - ], - "source": [ - "operation_names = batch_process_toolbox(\n", - " project_id,\n", - " location,\n", - " processor_id,\n", - " processor_version_id,\n", - " gcs_input_uri,\n", - " gcs_output_uri,\n", - " batch_size,\n", - " field_mask,\n", - ")\n", - "\n", - "documents = retrieve_results(operation_names)\n", - "\n", - "for document in documents:\n", - " # https://cloud.google.com/python/docs/reference/documentai-toolbox/latest/google.cloud.documentai_toolbox.wrappers.document.Document#google_cloud_documentai_toolbox_wrappers_document_Document_entities_to_dict\n", - " entities = document.entities_to_dict()\n", - " # Optional: Export to BQ\n", - " # job = document.entities_to_bigquery(dataset_name, table_name, project_id=project_id)\n", - "\n", - " df = pd.DataFrame([entities])\n", - "\n", - " print(tabulate(df, headers=\"keys\", tablefmt=\"psql\"))" - ] - } - ], - "metadata": { + "id": "35856bf2-aa5e-436b-977a-9e5725b1a595", + "trusted": true + }, + "outputs": [], + "source": [ + "def batch_process_toolbox(\n", + " project_id: str,\n", + " location: str,\n", + " processor_id: str,\n", + " processor_version_id: str,\n", + " gcs_input_uri: str,\n", + " gcs_output_uri: str,\n", + " batch_size: int,\n", + " field_mask: Optional[str] = None,\n", + " skip_human_review: bool = True,\n", + ") -> List[str]:\n", + " client = documentai.DocumentProcessorServiceClient(\n", + " client_options=ClientOptions(\n", + " api_endpoint=f\"{location}-documentai.googleapis.com\"\n", + " )\n", + " )\n", + "\n", + " # The full resource name of the processor version, e.g.:\n", + " # projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}\n", + " name = client.processor_version_path(\n", + " project_id, location, processor_id, processor_version_id\n", + " )\n", + "\n", + " # Cloud Storage URI for the Output Directory\n", + " output_config = documentai.DocumentOutputConfig(\n", + " gcs_output_config=documentai.DocumentOutputConfig.GcsOutputConfig(\n", + " gcs_uri=gcs_output_uri, field_mask=field_mask\n", + " )\n", + " )\n", + "\n", + " # Create batches of documents for processing\n", + " # https://cloud.google.com/python/docs/reference/documentai-toolbox/latest/google.cloud.documentai_toolbox.utilities.gcs_utilities\n", + " gcs_bucket_name, gcs_prefix = documentai_toolbox.gcs_utilities.split_gcs_uri(\n", + " gcs_input_uri\n", + " )\n", + " batches = documentai_toolbox.gcs_utilities.create_batches(\n", + " gcs_bucket_name, gcs_prefix, batch_size=batch_size\n", + " )\n", + "\n", + " operation_names: List[str] = []\n", + "\n", + " print(f\"{len(batches)} batches created.\")\n", + " for batch in batches:\n", + " print(f\"{len(batch.gcs_documents.documents)} files in batch.\")\n", + " print(batch.gcs_documents.documents)\n", + " request = documentai.BatchProcessRequest(\n", + " name=name,\n", + " input_documents=batch,\n", + " document_output_config=output_config,\n", + " skip_human_review=skip_human_review,\n", + " )\n", + "\n", + " # https://cloud.google.com/document-ai/docs/send-request?hl=en#async-processor\n", + " # `batch_process_documents()` returns a Long Running Operation (LRO)\n", + " operation = client.batch_process_documents(request)\n", + " # Operation Name Format: `projects/{project_id}/locations/{location}/operations/{operation_id}`\n", + " operation_names.append(operation.operation.name)\n", + "\n", + " return operation_names" + ] + }, + { + "cell_type": "markdown", + "source": [ + "## Retrieve results once processing is complete\n", + "\n", + "- Get output [`Document`](https://cloud.google.com/document-ai/docs/reference/rest/v1/Document) JSON files from `gcs_output_bucket` based on the Operation ID." + ], + "metadata": { + "id": "op0ZCWTIwDgR" + }, + "id": "op0ZCWTIwDgR" + }, + { + "cell_type": "code", + "source": [ + "def retrieve_results(\n", + " operation_names: List[str],\n", + ") -> List[documentai_toolbox.document.Document]:\n", + " # Can do this asynchronously to avoid blocking\n", + " all_documents: List[documentai_toolbox.document.Document] = []\n", + "\n", + " for operation in operation_names:\n", + " # https://cloud.google.com/document-ai/docs/long-running-operations\n", + " print(f\"Waiting for operation {operation}\")\n", + " documents: List = (\n", + " documentai_toolbox.document.Document.from_batch_process_operation(\n", + " location=location, operation_name=operation\n", + " )\n", + " )\n", + " all_documents.extend(documents)\n", + " return all_documents" + ], + "metadata": { + "id": "2-KpEaoRwEHv" + }, + "id": "2-KpEaoRwEHv", + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "## Print results\n", + "\n", + "- Export extracted entities as dictionary\n", + "- Load into Pandas DataFrame\n", + "- Print Dataframe" + ], + "metadata": { + "id": "445FQsfrwc4N" + }, + "id": "445FQsfrwc4N" + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "KxVFCVNVLLwW", + "metadata": { "colab": { - "provenance": [] + "base_uri": "https://localhost:8080/" }, - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" + "executionInfo": { + "elapsed": 2604, + "status": "ok", + "timestamp": 1694541481158, + "user": { + "displayName": "", + "userId": "" + }, + "user_tz": 300 }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.10.4" + "id": "KxVFCVNVLLwW", + "outputId": "2ada6f15-b774-4f55-fa73-b0e7064cd437" + }, + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": [ + "Waiting for operation projects/908687846511/locations/us/operations/10629941502076889990\n", + "+----+----------------------------------------------+------------------+----------------------------+------------------------------------+-----------------------+----------------+\n", + "| | contract_title | execution_date | jurisdiction_governed_by | legal_parties | renewal_term_length | term_length |\n", + "|----+----------------------------------------------+------------------+----------------------------+------------------------------------+-----------------------+----------------|\n", + "| 0 | WORLDWIDE LICENSE AND DISTRIBUTION AGREEMENT | August 6, 2015 | New York. | ['Cymbal Inc.', 'B-Cafetal, Inc.'] | five (5) year | Ten (10) years |\n", + "+----+----------------------------------------------+------------------+----------------------------+------------------------------------+-----------------------+----------------+\n" + ] } + ], + "source": [ + "operation_names = batch_process_toolbox(\n", + " project_id,\n", + " location,\n", + " processor_id,\n", + " processor_version_id,\n", + " gcs_input_uri,\n", + " gcs_output_uri,\n", + " batch_size,\n", + " field_mask,\n", + ")\n", + "\n", + "documents = retrieve_results(operation_names)\n", + "\n", + "for document in documents:\n", + " # https://cloud.google.com/python/docs/reference/documentai-toolbox/latest/google.cloud.documentai_toolbox.wrappers.document.Document#google_cloud_documentai_toolbox_wrappers_document_Document_entities_to_dict\n", + " entities = document.entities_to_dict()\n", + " # Optional: Export to BQ\n", + " # job = document.entities_to_bigquery(dataset_name, table_name, project_id=project_id)\n", + "\n", + " df = pd.DataFrame([entities])\n", + "\n", + " print(tabulate(df, headers=\"keys\", tablefmt=\"psql\"))" + ] + } + ], + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" }, - "nbformat": 4, - "nbformat_minor": 5 + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 } \ No newline at end of file