Source code for swh.scrubber.base_checker
# Copyright (C) 2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from abc import ABC, abstractmethod
from itertools import count, islice
import logging
from typing import Any, Dict, Iterable, Optional
import psycopg
import tenacity
from swh.core.statsd import Statsd
from swh.model import swhids
from .db import ConfigEntry, Datastore, ScrubberDb
logger = logging.getLogger(__name__)
[docs]
class BaseChecker(ABC):
    """Base Checker class wrapping common features."""
    def __init__(
        self,
        db: ScrubberDb,
        config_id: int,
    ):
        self.db = db
        self.config_id = config_id
        self._config: Optional[ConfigEntry] = None
        self._statsd: Optional[Statsd] = None
        self.statsd_constant_tags: Dict[str, Any] = {
            "object_type": self.object_type.name.lower(),
            "datastore_package": self.datastore.package,
            "datastore_cls": self.datastore.cls,
            "datastore_instance": self.datastore.instance,
        }
    @property
    def config(self) -> ConfigEntry:
        """Returns a :class:`ConfigEntry` instance containing checker configuration."""
        if self._config is None:
            self._config = self.db.config_get(self.config_id)
        assert self._config is not None
        return self._config
    @property
    def datastore(self) -> Datastore:
        """Returns a :class:`Datastore` instance representing the source of data
        being checked."""
        return self.config.datastore
    @property
    def statsd(self) -> Statsd:
        """Returns a :class:`Statsd` instance to send statsd metrics."""
        if self._statsd is None:
            self._statsd = Statsd(
                namespace="swh_scrubber",
                constant_tags=self.statsd_constant_tags,
            )
        return self._statsd
    @property
    def object_type(self) -> swhids.ObjectType:
        """Returns the type of object being checked."""
        return self.config.object_type
    @property
    def check_hashes(self) -> bool:
        return self.config.check_hashes
    @property
    def check_references(self) -> bool:
        return self.config.check_references
[docs]
    @abstractmethod
    def run(self) -> None:
        """Run the checker processing, derived classes must implement this method."""
        pass 
 
[docs]
class BasePartitionChecker(BaseChecker):
    """Base class for checkers processing partition of objects."""
    def __init__(
        self,
        db: ScrubberDb,
        config_id: int,
        limit: int = 0,
    ):
        super().__init__(db=db, config_id=config_id)
        self.limit = limit
        self.statsd_constant_tags["nb_partitions"] = self.nb_partitions
    @property
    def nb_partitions(self) -> int:
        """Returns the number of partitions set in configuration."""
        return self.config.nb_partitions
[docs]
    def run(self) -> None:
        """Runs on all objects of ``object_type`` in each partition between
        ``start_partition_id`` (inclusive) and ``end_partition_id`` (exclusive).
        """
        counter: Iterable[int] = count()
        if self.limit:
            counter = islice(counter, 0, self.limit)
        for _, partition_id in zip(
            counter, self.db.checked_partition_iter_next(self.config_id)
        ):
            logger.debug(
                "Processing %s partition %d/%d",
                self.object_type,
                partition_id,
                self.nb_partitions,
            )
            self._check_partition(self.object_type, partition_id)
            self.db.checked_partition_upsert(
                self.config_id,
                partition_id,
            ) 
    @tenacity.retry(
        retry=tenacity.retry_if_exception_type(psycopg.OperationalError),
        wait=tenacity.wait_random_exponential(min=10, max=180),
    )
    def _check_partition(
        self, object_type: swhids.ObjectType, partition_id: int
    ) -> None:
        "Retryable method checking objects in partition."
        return self.check_partition(object_type, partition_id)
[docs]
    @abstractmethod
    def check_partition(
        self, object_type: swhids.ObjectType, partition_id: int
    ) -> None:
        """Abstract method that derived classes must implement to check objects
        in partition."""
        pass