Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Commoncrawl pipeline] Add load from commoncrawl component #269

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM --platform=linux/amd64 python:3.8-slim

## System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y

# install requirements
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Set the working directory to the component folder
WORKDIR /component/src

# Copy over src-files
COPY src/ .

ENTRYPOINT ["python", "main.py"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Load index file from commoncrawl
description: Component that loads a given index file from commoncrawl
image: ghcr.io/ml6team/load_from_commoncrawl:latest

produces:
segment:
fields:
path:
type: string

args:
index_name:
description: Name of index file on commoncrawl
type: str
n_segments_to_load:
description: Number of segments to load from the commoncrawl index file
type: int
default: None
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
boto3==1.26.158
fondant
pyarrow>=7.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think pyarrow is installed with Fondant so no need to include it here

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""This component loads a dataset from CommonCrawl based on a given index."""
import logging
import typing as t

import io
import boto3
import gzip

import dask.dataframe as dd
import pandas as pd

from fondant.component import LoadComponent

logger = logging.getLogger(__name__)

S3_BASE_URL = "s3://commoncrawl/crawl-data"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not being used anywhere

S3_COMMONCRAWL_BUCKET = "commoncrawl"


def fetch_warc_file_from_s3(s3_bucket: str, s3_key) -> dd.DataFrame:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a docstring for the argumens.

Would also prefer to change s3_key to bucket_path. Feels more intuitive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3_key is the key of the object in the bucket not the bucket path. Would object_key be a better name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, just read on it and it seems to be an AWS specific notation (still used to GCP). It's fine to leave it as is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok great!

"""Fetches a WARC file from S3 and returns its content as a Dask DataFrame."""
logger.info(f"Fetching WARC file from S3: {s3_bucket}/{s3_key}...")

s3 = boto3.client("s3")
file_obj = io.BytesIO()
s3.download_fileobj(s3_bucket, s3_key, file_obj)
file_obj.seek(0)

return file_obj


def read_warc_paths_file(
warc_file: bytes, n_segments_to_load: t.Optional[int] = None
) -> dd.DataFrame:
"""Reads a WARC file and returns its content as a Dask DataFrame."""
logger.info(f"Reading WARC file...")
warc_paths = []
with gzip.open(warc_file, mode="rt") as f:
warc_paths = [line.strip() for line in f]

df = pd.DataFrame(warc_paths, columns=["warc_paths"])
dask_df = dd.from_pandas(df, npartitions=1)
dask_df = dask_df.rename(columns={"warc_paths": "segment_path"})

if n_segments_to_load:
dask_df = dask_df.head(n_segments_to_load)
dask_df = dd.from_pandas(dask_df, npartitions=1)

return dask_df


class LoadFromCommonCrawl(LoadComponent):
def load(
self, index_name: str, n_segments_to_load: t.Optional[int] = None
) -> dd.DataFrame:
logger.info(f"Loading CommonCrawl index {index_name}...")
warc_paths_file_key = f"crawl-data/{index_name}/warc.paths.gz"
warc_paths_file_content = fetch_warc_file_from_s3(
S3_COMMONCRAWL_BUCKET, warc_paths_file_key
)

warc_paths_df = read_warc_paths_file(
warc_paths_file_content, n_segments_to_load
)

return warc_paths_df


if __name__ == "__main__":
component = LoadFromCommonCrawl.from_args()
component.run()