Source code for swh.scanner.policy
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import itertools
from typing import Any, Callable, Iterable, List, Optional, no_type_check
from swh.model import discovery, from_disk
from swh.model.from_disk import model
from swh.model.model import Sha1Git
from swh.web.client.client import WebAPIClient
from .data import MerkleNodeInfo
[docs]
def source_size(source_tree: from_disk.Directory):
    """return the size of a source tree as the number of nodes it contains"""
    return sum(1 for n in source_tree.iter_tree(dedup=False)) 
[docs]
class Policy(metaclass=abc.ABCMeta):
    data: MerkleNodeInfo
    """information about contents and directories of the merkle tree"""
    source_tree: from_disk.Directory
    """representation of a source code project directory in the merkle tree"""
    def __init__(
        self,
        source_tree: from_disk.Directory,
        data: MerkleNodeInfo,
    ):
        self.source_tree = source_tree
        self.data = data
[docs]
    @abc.abstractmethod
    def run(
        self, client: WebAPIClient, update_info: Optional[Callable[[Any], None]] = None
    ):
        """Scan a source code project"""
        raise NotImplementedError("Must implement run method") 
    def _set_info(self, obj, known):
        self.data[obj.swhid()]["known"] = known 
[docs]
class WebAPIConnection(discovery.ArchiveDiscoveryInterface):
    """Use the web APIs to query the archive"""
    def __init__(
        self,
        contents: List[model.Content],
        skipped_contents: List[model.SkippedContent],
        directories: List[model.Directory],
        client: WebAPIClient,
    ) -> None:
        self.contents = contents
        self.skipped_contents = skipped_contents
        self.directories = directories
        self.client = client
        self.sha_to_swhid = {}
        self.swhid_to_sha = {}
        for content in contents:
            swhid = str(content.swhid())
            self.sha_to_swhid[content.sha1_git] = swhid
            self.swhid_to_sha[swhid] = content.sha1_git
        for directory in directories:
            swhid = str(directory.swhid())
            self.sha_to_swhid[directory.id] = swhid
            self.swhid_to_sha[swhid] = directory.id
[docs]
    def content_missing(self, contents: List[Sha1Git]) -> List[Sha1Git]:
        """List content missing from the archive by sha1"""
        return self._missing(contents) 
[docs]
    def skipped_content_missing(
        self, skipped_contents: List[Sha1Git]
    ) -> Iterable[Sha1Git]:
        """List skipped content missing from the archive by sha1"""
        # TODO what should we do about skipped contents?
        return skipped_contents 
[docs]
    def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
        """List directories missing from the archive by sha1"""
        return self._missing(directories) 
    def _missing(self, shas: List[Sha1Git]) -> List[Sha1Git]:
        # Ignore mypy complaining about string being passed, since `known`
        # transforms them to string immediately.
        res = self.client.known([self.sha_to_swhid[o] for o in shas])
        return [k.object_id for k, v in res.items() if not v["known"]] 
[docs]
class RandomDirSamplingPriority(Policy):
    """Check the Merkle tree querying random directories. Set all ancestors to
    unknown for unknown directories, otherwise set all descendants to known.
    Finally check all the remaining file contents.
    """
[docs]
    @no_type_check
    def run(
        self, client: WebAPIClient, update_info: Optional[Callable[[Any], None]] = None
    ):
        contents, skipped_contents, directories = from_disk.iter_directory(
            self.source_tree
        )
        # `filter_known_objects` only does filtering by random directory
        # sampling for now.
        # In the future, it could/will grow a parameter to choose/pass in a
        # different discovery implementation.
        # From this call site, we are relying on this behavior in order to
        # *actually* be a random directory sampling policy, but any change away
        # from under us in `filter_known_objects` should trigger a test failure.
        connection = WebAPIConnection(contents, skipped_contents, directories, client)
        if update_info is None:
            callback = self._set_info
        else:
            def callback(*args, **kwargs):
                self._set_info(*args, **kwargs)
                update_info(*args, **kwargs)
        get_unknowns = discovery.filter_known_objects(
            connection,
            update_info_callback=callback,
        )
        unknowns = set(itertools.chain(*get_unknowns))
        # double check the result
        for obj in itertools.chain(contents, skipped_contents, directories):
            assert self.data[obj.swhid()]["known"] == (obj not in unknowns)