scandi_reddit.build

Builds a Scandinavian Reddit dataset.

View Source
"""Builds a Scandinavian Reddit dataset."""

import json
import logging
import subprocess
from multiprocessing import cpu_count
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional

import pandas as pd
import zstandard
from datasets.arrow_dataset import Dataset
from joblib import Parallel, delayed
from nlp_dedup import Deduper
from tqdm.auto import tqdm

from scandi_reddit.postprocess import postprocess

from .download import download_reddit_file
from .language_filter import filter_comment

# Set up logging
logger = logging.getLogger(__name__)


def build_reddit_dataset(
    overwrite: bool = False,
    n_jobs: int = -2,
    starting_year: int = 2005,
    starting_month: int = 1,
    skip_download: bool = False,
    hub_repo_id: Optional[str] = None,
) -> None:
    """Build a Scandinavian Reddit dataset.

    Args:
        overwrite (bool, optional):
            Whether to overwrite existing files. Defaults to False.
        n_jobs (int, optional):
            The number of jobs to run in parallel. Can be set to a negative number to
            use all but that number of cores. Defaults to -2.
        starting_year (int, optional):
            The year to start downloading from. Defaults to 2005.
        starting_month (int, optional):
            The month to start downloading from. Defaults to 1.
        skip_download (bool, optional):
            Whether to skip downloading the files. If this is set then the "data/raw"
            directory must contain the files "reddit-da.jsonl", "reddit-no.jsonl",
            "reddit-sv.jsonl" and "reddit-is.jsonl". Defaults to False.
        hub_repo_id (Optional[str], optional):
            The ID of the Hugging Face Hub repository to upload the dataset to. If
            this is set then the dataset will be uploaded to the Hugging Face Hub.
            If None then the dataset will not be uploaded. Defaults to None.
    """
    # Set up paths to data directories
    raw_data_dir = Path("data") / "raw"
    processed_data_dir = Path("data") / "processed"
    final_data_dir = Path("data") / "final"

    # Create language mapping
    language_mapping = {
        "da": "Danish",
        "sv": "Swedish",
        "no": "Norwegian",
        "is": "Icelandic",
    }

    # Set up the output files
    output_paths = {
        lang: processed_data_dir / f"reddit-{lang}.jsonl"
        for lang in language_mapping.keys()
    }

    # Ensure `n_jobs` is non-negative
    if n_jobs < 0:
        n_jobs = cpu_count() + n_jobs + 1

    # Remove the previous files if `overwrite` is set
    if overwrite:
        for path in output_paths.values():
            path.unlink(missing_ok=True)

    # Replace starting year and month by the newest file present in the raw data
    # folder, if any
    existing_files = list(raw_data_dir.glob("RC_*.zst"))
    for file in existing_files:
        year = int(file.stem.split("_")[1].split("-")[0])
        month = int(file.stem.split("_")[1].split("-")[1])
        starting_year = max(starting_year, year)
        starting_month = max(starting_month, month)

    # Download the Reddit dumps and apply the language filter
    if not skip_download:

        logger.info(f"Fetching Reddit comments using {n_jobs} jobs in parallel.")

        for year in range(starting_year, 2030):
            for month in range(starting_month, 13):

                # Download the file
                input_path = download_reddit_file(year=year, month=month)

                # If the download failed then skip to the next month
                if not input_path.exists():
                    continue

                # Extract the comments from the file
                extract_comments_from_file(
                    input_path=input_path,
                    output_paths=output_paths,
                    n_jobs=n_jobs,
                )

                # Delete the input file again
                input_path.unlink()

            # Set the starting month to 1
            starting_month = 1

    # Post-process the files
    for lang, path in output_paths.items():
        logger.info(f"Post-processing the {language_mapping[lang]} corpus.")
        postprocess(path=path, suffix="-postprocessed")

    # Initialise the Deduper
    deduper = Deduper(
        split_method="word_ngram",
        num_minhashes=128,
        ngram_size=5,
        similarity_threshold=0.8,
        batch_size=1_000_000,
        n_jobs=n_jobs,
        random_seed=4242,
        store_config_to_disk=True,
        store_mask_to_disk=True,
        store_lsh_cache_to_disk=False,
        store_corpus_to_disk=False,
    )

    # Create the corpus generator
    def build_corpus() -> Generator[str, None, None]:
        for path in output_paths.values():
            path_processed = path.parent / f"{path.stem}-postprocessed.jsonl"
            with path_processed.open() as f:
                for line in f:
                    line = json.loads(line)
                    yield line["doc"]  # type: ignore[index]

    # Count the lines in the corpus
    num_docs = 0
    for path in output_paths.values():
        proc = subprocess.Popen(["wc", "-l", str(path)], stdout=subprocess.PIPE)
        num_docs += int(proc.communicate()[0].decode().split()[0])

    # Deduplicate the files
    deduper.deduplicate(
        corpus=build_corpus(),
        output_dir=processed_data_dir / "deduplicated",
        num_docs=num_docs,
        overwrite=True,
    )

    # Load the deduplication mask
    mask_path = processed_data_dir / "deduplicated" / "mask.jsonl"
    with mask_path.open() as f:
        mask = [json.loads(line) for line in f]

    # Load all the deduplicated files
    all_records: List[Dict[str, Any]] = list()
    idx: int = 0
    for path in output_paths.values():
        path_processed = path.parent / f"{path.stem}-postprocessed.jsonl"
        with path_processed.open() as f:
            for line in f:
                if not mask[idx]["duplicate"]:
                    record = json.loads(line)
                    all_records.append(record)
                idx += 1

    # Convert the records to a Hugging Face dataset
    df = pd.DataFrame.from_records(all_records)
    dataset = Dataset.from_pandas(df)

    # Save the dataset to disk
    dataset.save_to_disk(str(final_data_dir / "scandireddit"))

    # Push the dataset to the Hugging Face Hub
    if hub_repo_id is not None:
        dataset.push_to_hub(hub_repo_id)


def extract_comments_from_file(
    input_path: Path,
    output_paths: dict[str, Path],
    n_jobs: int,
) -> None:
    """Extract comments from a Reddit file.

    Args:
        input_path (Path):
            The path to the input file.
        output_paths (dict[str, Path]):
            The paths to the output files.
        n_jobs (int):
            The number of jobs to run in parallel.
    """
    # Open the file
    f = input_path.open("rb")

    # Open up the output files
    output_files = {
        lang: output_file.open("a") for lang, output_file in output_paths.items()
    }

    # Create a decompressor
    decompressor = zstandard.ZstdDecompressor(max_window_size=2**31)

    # Create a stream reader
    stream_reader = decompressor.stream_reader(f)

    # Initialise the buffer
    buffer: str = ""

    # Create progress bar, with unit being millions
    progress_bar = tqdm(
        desc=f"Processing comments from {input_path.name}",
        unit_scale=True,
    )

    # Infinite loop, break when we reach the end of the file
    while True:

        # Load a batch of data, break if it cannot be loaded
        try:
            batch = stream_reader.read(1_000_000_000)
        except zstandard.ZstdError:
            logger.debug("Could not load batch.")
            break

        # Decode the batch, skip if it cannot be decoded
        try:
            batch = batch.decode()
        except UnicodeDecodeError:
            logger.debug(f"Could not decode batch from {input_path.name}")
            continue

        # Break if we reached the end of the file
        if not batch:
            logger.debug(f"Reached end of file {input_path.name}")
            break

        # Add the buffer
        batch = buffer + batch

        # Split the batch into individual comments
        comments = batch.splitlines()

        # Process the comments in parallel
        with Parallel(n_jobs=n_jobs) as parallel:
            records = parallel(
                delayed(filter_comment)(comment) for comment in comments[:-1]
            )

        # If `records` is None then skip to the next file
        if records is None:
            logger.debug(f"No records found in {input_path.name}")
            continue

        # Iterate over the records, writing them to the output files
        for item in records:

            # Skip if the record is None
            if item is None:
                progress_bar.update()
                continue

            # Unpack the record
            record, lang = item

            # Write the record to the correct file
            if lang in output_files:
                output_files[lang].write(record + "\n")

            # Up the progress bar
            progress_bar.update()

        # Update the buffer
        buffer = comments[-1]

    # Close the progress bar
    progress_bar.close()

    # Close the output files
    for output_file in output_files.values():
        output_file.close()

    # Close the file
    f.close()
#   def build_reddit_dataset( overwrite: bool = False, n_jobs: int = -2, starting_year: int = 2005, starting_month: int = 1, skip_download: bool = False, hub_repo_id: Optional[str] = None ) -> None:
View Source
def build_reddit_dataset(
    overwrite: bool = False,
    n_jobs: int = -2,
    starting_year: int = 2005,
    starting_month: int = 1,
    skip_download: bool = False,
    hub_repo_id: Optional[str] = None,
) -> None:
    """Build a Scandinavian Reddit dataset.

    Args:
        overwrite (bool, optional):
            Whether to overwrite existing files. Defaults to False.
        n_jobs (int, optional):
            The number of jobs to run in parallel. Can be set to a negative number to
            use all but that number of cores. Defaults to -2.
        starting_year (int, optional):
            The year to start downloading from. Defaults to 2005.
        starting_month (int, optional):
            The month to start downloading from. Defaults to 1.
        skip_download (bool, optional):
            Whether to skip downloading the files. If this is set then the "data/raw"
            directory must contain the files "reddit-da.jsonl", "reddit-no.jsonl",
            "reddit-sv.jsonl" and "reddit-is.jsonl". Defaults to False.
        hub_repo_id (Optional[str], optional):
            The ID of the Hugging Face Hub repository to upload the dataset to. If
            this is set then the dataset will be uploaded to the Hugging Face Hub.
            If None then the dataset will not be uploaded. Defaults to None.
    """
    # Set up paths to data directories
    raw_data_dir = Path("data") / "raw"
    processed_data_dir = Path("data") / "processed"
    final_data_dir = Path("data") / "final"

    # Create language mapping
    language_mapping = {
        "da": "Danish",
        "sv": "Swedish",
        "no": "Norwegian",
        "is": "Icelandic",
    }

    # Set up the output files
    output_paths = {
        lang: processed_data_dir / f"reddit-{lang}.jsonl"
        for lang in language_mapping.keys()
    }

    # Ensure `n_jobs` is non-negative
    if n_jobs < 0:
        n_jobs = cpu_count() + n_jobs + 1

    # Remove the previous files if `overwrite` is set
    if overwrite:
        for path in output_paths.values():
            path.unlink(missing_ok=True)

    # Replace starting year and month by the newest file present in the raw data
    # folder, if any
    existing_files = list(raw_data_dir.glob("RC_*.zst"))
    for file in existing_files:
        year = int(file.stem.split("_")[1].split("-")[0])
        month = int(file.stem.split("_")[1].split("-")[1])
        starting_year = max(starting_year, year)
        starting_month = max(starting_month, month)

    # Download the Reddit dumps and apply the language filter
    if not skip_download:

        logger.info(f"Fetching Reddit comments using {n_jobs} jobs in parallel.")

        for year in range(starting_year, 2030):
            for month in range(starting_month, 13):

                # Download the file
                input_path = download_reddit_file(year=year, month=month)

                # If the download failed then skip to the next month
                if not input_path.exists():
                    continue

                # Extract the comments from the file
                extract_comments_from_file(
                    input_path=input_path,
                    output_paths=output_paths,
                    n_jobs=n_jobs,
                )

                # Delete the input file again
                input_path.unlink()

            # Set the starting month to 1
            starting_month = 1

    # Post-process the files
    for lang, path in output_paths.items():
        logger.info(f"Post-processing the {language_mapping[lang]} corpus.")
        postprocess(path=path, suffix="-postprocessed")

    # Initialise the Deduper
    deduper = Deduper(
        split_method="word_ngram",
        num_minhashes=128,
        ngram_size=5,
        similarity_threshold=0.8,
        batch_size=1_000_000,
        n_jobs=n_jobs,
        random_seed=4242,
        store_config_to_disk=True,
        store_mask_to_disk=True,
        store_lsh_cache_to_disk=False,
        store_corpus_to_disk=False,
    )

    # Create the corpus generator
    def build_corpus() -> Generator[str, None, None]:
        for path in output_paths.values():
            path_processed = path.parent / f"{path.stem}-postprocessed.jsonl"
            with path_processed.open() as f:
                for line in f:
                    line = json.loads(line)
                    yield line["doc"]  # type: ignore[index]

    # Count the lines in the corpus
    num_docs = 0
    for path in output_paths.values():
        proc = subprocess.Popen(["wc", "-l", str(path)], stdout=subprocess.PIPE)
        num_docs += int(proc.communicate()[0].decode().split()[0])

    # Deduplicate the files
    deduper.deduplicate(
        corpus=build_corpus(),
        output_dir=processed_data_dir / "deduplicated",
        num_docs=num_docs,
        overwrite=True,
    )

    # Load the deduplication mask
    mask_path = processed_data_dir / "deduplicated" / "mask.jsonl"
    with mask_path.open() as f:
        mask = [json.loads(line) for line in f]

    # Load all the deduplicated files
    all_records: List[Dict[str, Any]] = list()
    idx: int = 0
    for path in output_paths.values():
        path_processed = path.parent / f"{path.stem}-postprocessed.jsonl"
        with path_processed.open() as f:
            for line in f:
                if not mask[idx]["duplicate"]:
                    record = json.loads(line)
                    all_records.append(record)
                idx += 1

    # Convert the records to a Hugging Face dataset
    df = pd.DataFrame.from_records(all_records)
    dataset = Dataset.from_pandas(df)

    # Save the dataset to disk
    dataset.save_to_disk(str(final_data_dir / "scandireddit"))

    # Push the dataset to the Hugging Face Hub
    if hub_repo_id is not None:
        dataset.push_to_hub(hub_repo_id)

Build a Scandinavian Reddit dataset.

Args
  • overwrite (bool, optional): Whether to overwrite existing files. Defaults to False.
  • n_jobs (int, optional): The number of jobs to run in parallel. Can be set to a negative number to use all but that number of cores. Defaults to -2.
  • starting_year (int, optional): The year to start downloading from. Defaults to 2005.
  • starting_month (int, optional): The month to start downloading from. Defaults to 1.
  • skip_download (bool, optional): Whether to skip downloading the files. If this is set then the "data/raw" directory must contain the files "reddit-da.jsonl", "reddit-no.jsonl", "reddit-sv.jsonl" and "reddit-is.jsonl". Defaults to False.
  • hub_repo_id (Optional[str], optional): The ID of the Hugging Face Hub repository to upload the dataset to. If this is set then the dataset will be uploaded to the Hugging Face Hub. If None then the dataset will not be uploaded. Defaults to None.
#   def extract_comments_from_file( input_path: pathlib.Path, output_paths: dict[str, pathlib.Path], n_jobs: int ) -> None:
View Source
def extract_comments_from_file(
    input_path: Path,
    output_paths: dict[str, Path],
    n_jobs: int,
) -> None:
    """Extract comments from a Reddit file.

    Args:
        input_path (Path):
            The path to the input file.
        output_paths (dict[str, Path]):
            The paths to the output files.
        n_jobs (int):
            The number of jobs to run in parallel.
    """
    # Open the file
    f = input_path.open("rb")

    # Open up the output files
    output_files = {
        lang: output_file.open("a") for lang, output_file in output_paths.items()
    }

    # Create a decompressor
    decompressor = zstandard.ZstdDecompressor(max_window_size=2**31)

    # Create a stream reader
    stream_reader = decompressor.stream_reader(f)

    # Initialise the buffer
    buffer: str = ""

    # Create progress bar, with unit being millions
    progress_bar = tqdm(
        desc=f"Processing comments from {input_path.name}",
        unit_scale=True,
    )

    # Infinite loop, break when we reach the end of the file
    while True:

        # Load a batch of data, break if it cannot be loaded
        try:
            batch = stream_reader.read(1_000_000_000)
        except zstandard.ZstdError:
            logger.debug("Could not load batch.")
            break

        # Decode the batch, skip if it cannot be decoded
        try:
            batch = batch.decode()
        except UnicodeDecodeError:
            logger.debug(f"Could not decode batch from {input_path.name}")
            continue

        # Break if we reached the end of the file
        if not batch:
            logger.debug(f"Reached end of file {input_path.name}")
            break

        # Add the buffer
        batch = buffer + batch

        # Split the batch into individual comments
        comments = batch.splitlines()

        # Process the comments in parallel
        with Parallel(n_jobs=n_jobs) as parallel:
            records = parallel(
                delayed(filter_comment)(comment) for comment in comments[:-1]
            )

        # If `records` is None then skip to the next file
        if records is None:
            logger.debug(f"No records found in {input_path.name}")
            continue

        # Iterate over the records, writing them to the output files
        for item in records:

            # Skip if the record is None
            if item is None:
                progress_bar.update()
                continue

            # Unpack the record
            record, lang = item

            # Write the record to the correct file
            if lang in output_files:
                output_files[lang].write(record + "\n")

            # Up the progress bar
            progress_bar.update()

        # Update the buffer
        buffer = comments[-1]

    # Close the progress bar
    progress_bar.close()

    # Close the output files
    for output_file in output_files.values():
        output_file.close()

    # Close the file
    f.close()

Extract comments from a Reddit file.

Args
  • input_path (Path): The path to the input file.
  • output_paths (dict[str, Path]): The paths to the output files.
  • n_jobs (int): The number of jobs to run in parallel.