diff --git a/examples/pipelines/commoncrawl/components/download_warc_files/Dockerfile b/examples/pipelines/commoncrawl/components/extract_images_from_warc/Dockerfile similarity index 100% rename from examples/pipelines/commoncrawl/components/download_warc_files/Dockerfile rename to examples/pipelines/commoncrawl/components/extract_images_from_warc/Dockerfile diff --git a/examples/pipelines/commoncrawl/components/download_warc_files/README.md b/examples/pipelines/commoncrawl/components/extract_images_from_warc/README.md similarity index 100% rename from examples/pipelines/commoncrawl/components/download_warc_files/README.md rename to examples/pipelines/commoncrawl/components/extract_images_from_warc/README.md diff --git a/examples/pipelines/commoncrawl/components/download_warc_files/fondant_component.yaml b/examples/pipelines/commoncrawl/components/extract_images_from_warc/fondant_component.yaml similarity index 71% rename from examples/pipelines/commoncrawl/components/download_warc_files/fondant_component.yaml rename to examples/pipelines/commoncrawl/components/extract_images_from_warc/fondant_component.yaml index ddc2171fa..01c8e38f4 100644 --- a/examples/pipelines/commoncrawl/components/download_warc_files/fondant_component.yaml +++ b/examples/pipelines/commoncrawl/components/extract_images_from_warc/fondant_component.yaml @@ -1,6 +1,6 @@ -name: download warc component -description: A component that downloads parts of the common crawl -image: ghcr.io/ml6team/common_crawl_download_warc:cadb918 +name: Extract image licenses from warc +description: A component that extracts images and their licenses from warc files +image: ghcr.io/ml6team/extract_images_from_warc:d4619b5 consumes: warc: diff --git a/examples/pipelines/commoncrawl/components/download_warc_files/requirements.txt b/examples/pipelines/commoncrawl/components/extract_images_from_warc/requirements.txt similarity index 51% rename from examples/pipelines/commoncrawl/components/download_warc_files/requirements.txt rename to examples/pipelines/commoncrawl/components/extract_images_from_warc/requirements.txt index da66ef09e..5d984b293 100644 --- a/examples/pipelines/commoncrawl/components/download_warc_files/requirements.txt +++ b/examples/pipelines/commoncrawl/components/extract_images_from_warc/requirements.txt @@ -1,3 +1,4 @@ trafilatura==1.6.1 beautifulsoup4==4.12.2 -fastwarc +fastwarc==0.14.5 +distributed==2023.8.1 diff --git a/examples/pipelines/commoncrawl/components/download_warc_files/src/main.py b/examples/pipelines/commoncrawl/components/extract_images_from_warc/src/main.py similarity index 82% rename from examples/pipelines/commoncrawl/components/download_warc_files/src/main.py rename to examples/pipelines/commoncrawl/components/extract_images_from_warc/src/main.py index 1755b8876..29f4160ed 100644 --- a/examples/pipelines/commoncrawl/components/download_warc_files/src/main.py +++ b/examples/pipelines/commoncrawl/components/extract_images_from_warc/src/main.py @@ -2,11 +2,12 @@ import logging import typing as t +import dask import dask.dataframe as dd import pandas as pd from bs4 import BeautifulSoup from fondant.component import DaskTransformComponent -from fastwarc.warc import ArchiveIterator, WarcRecordType +from fastwarc import ArchiveIterator, StreamError, WarcRecordType from utils.download_utils import download_warc_file from utils.license_utils import get_license_type, get_license_location @@ -14,6 +15,8 @@ logger = logging.getLogger(__name__) +dask.config.set(scheduler="processes") + CC_BASE_URL = "http://data.commoncrawl.org" @@ -72,17 +75,20 @@ def filter_(record): return False return True - for record in ArchiveIterator( - file, - record_types=WarcRecordType.response, - func_filter=filter_, - ): - url = record.headers.get("WARC-Target-URI") - content = record.reader.read().decode("utf-8", "replace") - if content: - image_info = self.get_image_info_from_webpage(url, content) - if image_info: - images.extend(image_info) + try: + for record in ArchiveIterator( + file, + record_types=WarcRecordType.response, + func_filter=filter_, + ): + url = record.headers.get("WARC-Target-URI") + content = record.reader.read().decode("utf-8", "replace") + if content: + image_info = self.get_image_info_from_webpage(url, content) + if image_info: + images.extend(image_info) + except StreamError as e: + logging.warning(e) return images @@ -97,8 +103,12 @@ def download_and_extract_warc( """ logger.warning(f"Processing WARC file: {warc_file}...") - response = download_warc_file(warc_file) - return self.extract_images(response.raw) + try: + response = download_warc_file(warc_file) + return self.extract_images(response.raw) + except BaseException as e: + logging.warning(e) + return [] def download_and_extract_dataframe(self, dataframe: pd.DataFrame) -> pd.DataFrame: """Download and extract all warc files in a dataframe.""" diff --git a/examples/pipelines/commoncrawl/components/download_warc_files/src/utils/download_utils.py b/examples/pipelines/commoncrawl/components/extract_images_from_warc/src/utils/download_utils.py similarity index 89% rename from examples/pipelines/commoncrawl/components/download_warc_files/src/utils/download_utils.py rename to examples/pipelines/commoncrawl/components/extract_images_from_warc/src/utils/download_utils.py index 3bfdd8f0a..72058cad6 100644 --- a/examples/pipelines/commoncrawl/components/download_warc_files/src/utils/download_utils.py +++ b/examples/pipelines/commoncrawl/components/extract_images_from_warc/src/utils/download_utils.py @@ -12,7 +12,7 @@ def download_warc_file( - warc_file: str, retries: int = 3, backoff_factor: int = 5 + warc_file: str, retries: int = 10, backoff_factor: int = 5 ) -> requests.Response: """Downloads a WARC file using http requests. Args: @@ -33,6 +33,6 @@ def download_warc_file( response = session.get(COMMONCRAWL_BASE_URL + warc_file, stream=True) response.raise_for_status() return response - except requests.exceptions.RequestException as e: + except Exception as e: logger.error(f"Error downloading WARC file: {e}") raise diff --git a/examples/pipelines/commoncrawl/components/download_warc_files/src/utils/image_utils.py b/examples/pipelines/commoncrawl/components/extract_images_from_warc/src/utils/image_utils.py similarity index 100% rename from examples/pipelines/commoncrawl/components/download_warc_files/src/utils/image_utils.py rename to examples/pipelines/commoncrawl/components/extract_images_from_warc/src/utils/image_utils.py diff --git a/examples/pipelines/commoncrawl/components/download_warc_files/src/utils/license_utils.py b/examples/pipelines/commoncrawl/components/extract_images_from_warc/src/utils/license_utils.py similarity index 100% rename from examples/pipelines/commoncrawl/components/download_warc_files/src/utils/license_utils.py rename to examples/pipelines/commoncrawl/components/extract_images_from_warc/src/utils/license_utils.py diff --git a/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml b/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml index 2154bca91..229afb05e 100644 --- a/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml +++ b/examples/pipelines/commoncrawl/components/read_warc_paths/fondant_component.yaml @@ -1,6 +1,6 @@ name: Common crawl download component description: A component that downloads parts of the common crawl -image: ghcr.io/ml6team/read_warc_paths:dev +image: ghcr.io/ml6team/read_warc_paths:57404ff produces: warc: diff --git a/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py b/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py index 55419f6c5..7c642fba3 100644 --- a/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py +++ b/examples/pipelines/commoncrawl/components/read_warc_paths/src/main.py @@ -41,4 +41,4 @@ def load(self) -> dd.DataFrame: if self.n_records_to_download is not None: df = df.head(self.n_records_to_download) - return dd.from_pandas(df, npartitions=len(df)) + return dd.from_pandas(df, npartitions=len(df) // 100) diff --git a/examples/pipelines/commoncrawl/pipeline.py b/examples/pipelines/commoncrawl/pipeline.py index f74b9dbde..1cb7725a6 100644 --- a/examples/pipelines/commoncrawl/pipeline.py +++ b/examples/pipelines/commoncrawl/pipeline.py @@ -14,14 +14,17 @@ read_warc_paths_op = ComponentOp( component_dir="components/read_warc_paths", - arguments={"common_crawl_indices": ["CC-MAIN-2023-23"], "n_records_to_download": 1}, + arguments={"common_crawl_indices": ["CC-MAIN-2023-06"]}, + cache=False, ) -load_warc_files_op = ComponentOp( - component_dir="components/download_warc_files", +extract_images_op = ComponentOp( + component_dir="components/extract_images_from_warc", + node_pool_label="node_pool", + node_pool_name="n2-standard-128-pool-3", ) pipeline = Pipeline(pipeline_name=pipeline_name, base_path=PipelineConfigs.BASE_PATH) pipeline.add_op(read_warc_paths_op) -pipeline.add_op(load_warc_files_op, dependencies=[read_warc_paths_op]) +pipeline.add_op(extract_images_op, dependencies=[read_warc_paths_op]) diff --git a/src/fondant/cli.py b/src/fondant/cli.py index f0b32691d..946b6568d 100644 --- a/src/fondant/cli.py +++ b/src/fondant/cli.py @@ -344,7 +344,7 @@ def run(args): else: spec_ref = args.output_path logging.info( - "Found reference to un-compiled pipeline... compiling to {spec_ref}", + f"Found reference to un-compiled pipeline... compiling to {spec_ref}", ) compiler = KubeFlowCompiler() compiler.compile(pipeline=pipeline, output_path=spec_ref) diff --git a/src/fondant/data_io.py b/src/fondant/data_io.py index ef817577f..436570ae1 100644 --- a/src/fondant/data_io.py +++ b/src/fondant/data_io.py @@ -261,7 +261,6 @@ def _create_write_task( schema=schema, overwrite=False, compute=False, - write_metadata_file=True, ) logging.info(f"Creating write task for: {location}") return write_task