Source code for swh.journal.writer.interface
# Copyright (C) 2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict, Iterable, Optional, TypeVar
from typing_extensions import Protocol, runtime_checkable
from swh.model.model import KeyType
TSelf = TypeVar("TSelf")
[docs]
class ValueProtocol(Protocol):
[docs]
    def anonymize(self: TSelf) -> Optional[TSelf]: ... 
[docs]
    def unique_key(self) -> KeyType: ... 
[docs]
    def to_dict(self) -> Dict[str, Any]: ... 
 
[docs]
@runtime_checkable
class JournalWriterInterface(Protocol):
[docs]
    def write_addition(self, object_type: str, object_: ValueProtocol) -> None:
        """Add a SWH object of type object_type in the journal."""
        ... 
[docs]
    def write_additions(
        self, object_type: str, objects: Iterable[ValueProtocol]
    ) -> None:
        """Add a list of SWH objects of type object_type in the journal."""
        ... 
[docs]
    def flush(self) -> None:
        """Flush the pending object additions in the backend, if any."""
        ...