Source code for swh.storage.algos.discovery
from typing import Iterable, List
from swh.model import discovery, model
from swh.model.model import Sha1Git
from swh.storage.interface import StorageInterface
[docs]
class DiscoveryStorageConnection(discovery.ArchiveDiscoveryInterface):
    """Use the storage APIs to query the archive"""
    def __init__(
        self,
        contents: List[model.Content],
        skipped_contents: List[model.SkippedContent],
        directories: List[model.Directory],
        swh_storage: StorageInterface,
    ) -> None:
        self.contents = contents
        self.skipped_contents = skipped_contents
        self.directories = directories
        self.storage = swh_storage
[docs]
    def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
        """List content missing from the archive by sha1"""
        return self.storage.content_missing_per_sha1_git(contents) 
[docs]
    def skipped_content_missing(
        self, skipped_contents: List[Sha1Git]
    ) -> Iterable[Sha1Git]:
        """List skipped content missing from the archive by sha1"""
        contents = [
            {"sha1_git": s, "sha1": None, "sha256": None, "blake2s256": None}
            for s in skipped_contents
        ]
        return (d["sha1_git"] for d in self.storage.skipped_content_missing(contents)) 
[docs]
    def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
        """List directories missing from the archive by sha1"""
        return self.storage.directory_missing(directories)