Source code for swh.scrubber.journal_checker
# Copyright (C) 2021-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Reads all objects in a swh-storage instance and recomputes their checksums."""
import json
import logging
from typing import Any, Dict, List
import attr
from swh.journal.client import get_journal_client
from swh.journal.serializers import kafka_to_value
from swh.model import model
from .base_checker import BaseChecker
from .db import Datastore, ScrubberDb
logger = logging.getLogger(__name__)
[docs]
def get_datastore(journal_cfg) -> Datastore:
    if journal_cfg.get("cls") == "kafka":
        datastore = Datastore(
            package="journal",
            cls="kafka",
            instance=json.dumps(
                {
                    "brokers": journal_cfg["brokers"],
                    "group_id": journal_cfg["group_id"],
                    "prefix": journal_cfg["prefix"],
                }
            ),
        )
    else:
        raise NotImplementedError(
            f"JournalChecker(journal_client_config={journal_cfg!r}).datastore()"
        )
    return datastore 
[docs]
class JournalChecker(BaseChecker):
    """Reads a chunk of a swh-storage database, recomputes checksums, and
    reports errors in a separate database."""
    def __init__(
        self, db: ScrubberDb, config_id: int, journal_client_config: Dict[str, Any]
    ):
        super().__init__(db=db, config_id=config_id)
        if self.config.check_references:
            raise ValueError(
                "The journal checker cannot check for references, please set "
                "the 'check_references' to False in the config entry %s.",
                self.config_id,
            )
        self.journal_client_config = journal_client_config.copy()
        if "object_types" in self.journal_client_config:
            raise ValueError(
                "The journal_client configuration entry should not define the "
                "object_types field; this is handled by the scrubber configuration entry"
            )
        self.journal_client_config["object_types"] = [self.object_type.name.lower()]
        self.journal_client = get_journal_client(
            **self.journal_client_config,
            # Remove default deserializer; so process_kafka_values() gets the message
            # verbatim so it can archive it with as few modifications a possible.
            value_deserializer=lambda obj_type, msg: msg,
        )
[docs]
    def run(self) -> None:
        """Runs a journal client with the given configuration.
        This method does not return, unless the client is configured with ``on_eof``
        parameter equals to ``EofBehavior.STOP`` (``stop`` in YAML).
        """
        self.journal_client.process(self.process_kafka_messages) 
[docs]
    def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]):
        for object_type, messages in all_messages.items():
            logger.debug("Processing %s %s", len(messages), object_type)
            cls = getattr(model, object_type.capitalize())
            for message in messages:
                if object_type == "directory":
                    d = kafka_to_value(message)
                    (
                        has_duplicate_dir_entries,
                        object_,
                    ) = cls.from_possibly_duplicated_entries(
                        entries=tuple(
                            map(model.DirectoryEntry.from_dict, d["entries"])
                        ),
                        raw_manifest=d.get("raw_manifest"),
                    )
                    object_ = attr.evolve(object_, id=d["id"])
                    if has_duplicate_dir_entries:
                        self.statsd.increment(
                            "duplicate_directory_entries_total",
                            tags={"object_type": "directory"},
                        )
                else:
                    object_ = cls.from_dict(kafka_to_value(message))
                    has_duplicate_dir_entries = False
                real_id = object_.compute_hash()
                if object_.id != real_id or has_duplicate_dir_entries:
                    self.db.corrupt_object_add(object_.swhid(), self.config, message)