Skip to content

Commit

Permalink
Improve commoncrawl components (#403)
Browse files Browse the repository at this point in the history
Improved commoncrawl download components for the license-free image use case.
  • Loading branch information
RobbeSneyders authored Sep 13, 2023
1 parent 89e8cde commit eec81a5
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: download warc component
description: A component that downloads parts of the common crawl
image: ghcr.io/ml6team/common_crawl_download_warc:cadb918
name: Extract image licenses from warc
description: A component that extracts images and their licenses from warc files
image: ghcr.io/ml6team/extract_images_from_warc:d4619b5

consumes:
warc:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
trafilatura==1.6.1
beautifulsoup4==4.12.2
fastwarc
fastwarc==0.14.5
distributed==2023.8.1
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@
import logging
import typing as t

import dask
import dask.dataframe as dd
import pandas as pd
from bs4 import BeautifulSoup
from fondant.component import DaskTransformComponent
from fastwarc.warc import ArchiveIterator, WarcRecordType
from fastwarc import ArchiveIterator, StreamError, WarcRecordType

from utils.download_utils import download_warc_file
from utils.license_utils import get_license_type, get_license_location
from utils.image_utils import get_images_from_soup, get_unique_images

logger = logging.getLogger(__name__)

dask.config.set(scheduler="processes")

CC_BASE_URL = "http://data.commoncrawl.org"


Expand Down Expand Up @@ -72,17 +75,20 @@ def filter_(record):
return False
return True

for record in ArchiveIterator(
file,
record_types=WarcRecordType.response,
func_filter=filter_,
):
url = record.headers.get("WARC-Target-URI")
content = record.reader.read().decode("utf-8", "replace")
if content:
image_info = self.get_image_info_from_webpage(url, content)
if image_info:
images.extend(image_info)
try:
for record in ArchiveIterator(
file,
record_types=WarcRecordType.response,
func_filter=filter_,
):
url = record.headers.get("WARC-Target-URI")
content = record.reader.read().decode("utf-8", "replace")
if content:
image_info = self.get_image_info_from_webpage(url, content)
if image_info:
images.extend(image_info)
except StreamError as e:
logging.warning(e)

return images

Expand All @@ -97,8 +103,12 @@ def download_and_extract_warc(
"""
logger.warning(f"Processing WARC file: {warc_file}...")

response = download_warc_file(warc_file)
return self.extract_images(response.raw)
try:
response = download_warc_file(warc_file)
return self.extract_images(response.raw)
except BaseException as e:
logging.warning(e)
return []

def download_and_extract_dataframe(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"""Download and extract all warc files in a dataframe."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


def download_warc_file(
warc_file: str, retries: int = 3, backoff_factor: int = 5
warc_file: str, retries: int = 10, backoff_factor: int = 5
) -> requests.Response:
"""Downloads a WARC file using http requests.
Args:
Expand All @@ -33,6 +33,6 @@ def download_warc_file(
response = session.get(COMMONCRAWL_BASE_URL + warc_file, stream=True)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
except Exception as e:
logger.error(f"Error downloading WARC file: {e}")
raise
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: Common crawl download component
description: A component that downloads parts of the common crawl
image: ghcr.io/ml6team/read_warc_paths:dev
image: ghcr.io/ml6team/read_warc_paths:57404ff

produces:
warc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ def load(self) -> dd.DataFrame:
if self.n_records_to_download is not None:
df = df.head(self.n_records_to_download)

return dd.from_pandas(df, npartitions=len(df))
return dd.from_pandas(df, npartitions=len(df) // 100)
11 changes: 7 additions & 4 deletions examples/pipelines/commoncrawl/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

read_warc_paths_op = ComponentOp(
component_dir="components/read_warc_paths",
arguments={"common_crawl_indices": ["CC-MAIN-2023-23"], "n_records_to_download": 1},
arguments={"common_crawl_indices": ["CC-MAIN-2023-06"]},
cache=False,
)

load_warc_files_op = ComponentOp(
component_dir="components/download_warc_files",
extract_images_op = ComponentOp(
component_dir="components/extract_images_from_warc",
node_pool_label="node_pool",
node_pool_name="n2-standard-128-pool-3",
)

pipeline = Pipeline(pipeline_name=pipeline_name, base_path=PipelineConfigs.BASE_PATH)

pipeline.add_op(read_warc_paths_op)
pipeline.add_op(load_warc_files_op, dependencies=[read_warc_paths_op])
pipeline.add_op(extract_images_op, dependencies=[read_warc_paths_op])
2 changes: 1 addition & 1 deletion src/fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ def run(args):
else:
spec_ref = args.output_path
logging.info(
"Found reference to un-compiled pipeline... compiling to {spec_ref}",
f"Found reference to un-compiled pipeline... compiling to {spec_ref}",
)
compiler = KubeFlowCompiler()
compiler.compile(pipeline=pipeline, output_path=spec_ref)
Expand Down
1 change: 0 additions & 1 deletion src/fondant/data_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ def _create_write_task(
schema=schema,
overwrite=False,
compute=False,
write_metadata_file=True,
)
logging.info(f"Creating write task for: {location}")
return write_task

0 comments on commit eec81a5

Please sign in to comment.