Source code for swh.scrubber.utils
# Copyright (C) 2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Callable, Optional
import psycopg
from swh.model.swhids import CoreSWHID
from .db import CorruptObject, ScrubberDb
[docs]
def iter_corrupt_objects(
    db: ScrubberDb,
    start_object: CoreSWHID,
    end_object: CoreSWHID,
    origin_url: Optional[str],
    cb: Callable[[CorruptObject, psycopg.Cursor], None],
) -> None:
    """Fetches objects and calls ``cb`` on each of them.
    objects are fetched with an update lock, with the same transaction as ``cb``,
    which is automatically committed after ``cb`` runs."""
    while True:
        with db.cursor() as cur:
            if origin_url:
                corrupt_objects = list(
                    db.corrupt_object_grab_by_origin(
                        cur, origin_url, start_object, end_object
                    )
                )
            else:
                corrupt_objects = list(
                    db.corrupt_object_grab_by_id(cur, start_object, end_object)
                )
            if corrupt_objects and corrupt_objects[0].id == start_object:
                # TODO: don't needlessly fetch duplicate objects
                del corrupt_objects[0]
            if not corrupt_objects:
                # Nothing more to do
                break
            for corrupt_object in corrupt_objects:
                cb(corrupt_object, cur)
            db.conn.commit()  # XXX: is this redundant with db.conn.__exit__?
        start_object = corrupt_objects[-1].id