# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
This module implements the inventory stage of the
:ref:`removal algorithm <alter_removal_algorithm>`.
"""
from contextlib import suppress
import itertools
import logging
from typing import (
    Any,
    Callable,
    Collection,
    Dict,
    Iterable,
    Iterator,
    List,
    NamedTuple,
    Optional,
    Set,
    Tuple,
)
from igraph import Vertex
from swh.core.api.classes import stream_results, stream_results_optional
from swh.graph.http_client import GraphArgumentException, RemoteGraphClient
from swh.model.model import Origin, Revision
from swh.model.swhids import ExtendedObjectType as ObjectType
from swh.model.swhids import ExtendedSWHID
from swh.storage.algos.origin import iter_origin_visit_statuses, iter_origin_visits
from swh.storage.algos.snapshot import snapshot_get_all_branches
from swh.storage.interface import StorageInterface
from .progressbar import ProgressBar, ProgressBarInit, no_progressbar
from .subgraph import Subgraph
from .utils import filter_objects_missing_from_storage
logger = logging.getLogger(__name__)
[docs]
class RootsNotFound(Exception):
    def __init__(self, swhids: Iterable[ExtendedSWHID]):
        self.swhids = list(sorted(swhids))
[docs]
    def get_labels(self, requested: Collection[Origin | ExtendedSWHID]) -> List[str]:
        """Returns a list of either an origin URL if it can be found in requested and the
        SWHID otherwise."""
        return [
            next(
                (
                    x.url
                    for x in requested
                    if isinstance(x, Origin) and x.swhid() == swhid
                ),
                str(swhid),
            )
            for swhid in self.swhids
        ] 
 
[docs]
class StuckInventoryException(Exception):
    def __init__(self, swhids: List[ExtendedSWHID]):
        self.swhids = swhids 
ITERATIONS_BEFORE_FORFEIT = 6
[docs]
class InventorySubgraph(Subgraph):
    """A subgraph holding an inventory of all candidates for removal
    When all references from a given node have been accounted for,
    the ``complete`` attribute is set to True.
    """
    default_vertex_attributes: Dict[str, Any] = {"complete": False}
    def __init__(self, *args, **kwargs):
        """See :py:class:`Subgraph`"""
        super().__init__(*args, **kwargs)
        self["name"] = "Inventory"
        self["root_swhids"] = []
        if "complete" not in self.vs.attributes():
            self.vs["complete"] = [False for _ in self.vs]
[docs]
    def select_incomplete(self) -> List[Vertex]:
        """Return vertices known to be incomplete ordered by object type from
        origins to contents."""
        # We want to order incomplete vertices from origin to content in order to
        # increase the chance to complete as many vertices as possible using swh.graph
        # traversal which follows the same direction.
        return self.select_ordered(complete_eq=False) 
    def dot_node_attributes(self, v: Vertex) -> List[str]:
        """Get a list of attributes in DOT format for the given vertex.
        On top of default attributes, use a bold font for root objects, and color the background
        if the vertex is known to be incomplete.
        :meta private:
        """
        attrs = super().dot_node_attributes(v)
        if v["swhid"] in self["root_swhids"]:
            attrs.append('fontname="times bold"')
        if not v["complete"]:
            attrs.append("color=gray")
        return attrs 
class Lister:
    """A class encapsulating our inventory algorithm.
    :meta private:
    """
    def __init__(
        self,
        storage: StorageInterface,
        graph_client: RemoteGraphClient,
        subgraph: InventorySubgraph,
        /,
        known_missing: Optional[Set[ExtendedSWHID]] = None,
        progressbar: Optional[ProgressBar] = None,
    ):
        self._subgraph = subgraph
        self._storage = storage
        self._graph_client = graph_client
        self._known_missing = known_missing or set()
        self._progressbar = progressbar
    @property
    def subgraph(self):
        return self._subgraph
    def inventory_candidates(self, root: ExtendedSWHID) -> None:
        self._subgraph["root_swhids"].append(root)
        # Add our starting point as incomplete
        self._subgraph.add_swhid(root, complete=False)
        # Iterate until everything has been fetched
        logger.debug("inventory_candidates: added %s", root)
        stats_history: List[Tuple[int, int]] = []
        for remaining in self._iter_inventory_candidates():
            total = len(self._subgraph.vs)
            stats_history.append((total, remaining))
            # Does the last iterations always have the same stats?
            if (
                len(stats_history) > ITERATIONS_BEFORE_FORFEIT
                and len(set(stats_history[-ITERATIONS_BEFORE_FORFEIT:])) == 1
            ):
                raise StuckInventoryException(
                    [vertex["swhid"] for vertex in self._subgraph.select_incomplete()]
                )
            if self._progressbar:
                self._progressbar.update(
                    1, current_item=ProgressBarItem(root, total, remaining)
                )
            logger.debug(
                "inventory_candidates: %4d SWHIDS known, %4d need to be looked up.",
                total,
                remaining,
            )
        if self._progressbar:
            self._progressbar.update(
                1, current_item=ProgressBarItem(None, len(self._subgraph.vs), 0)
            )
    def _fetch_candidates_using_graph(self, vertex) -> None:
        # We don’t except all SWHID to be present in swh.graph
        with suppress(GraphArgumentException):
            self.add_edges_traversing_graph(vertex["swhid"])
    def _fetch_candidates_using_storage(self, vertex) -> None:
        self.add_edges_using_storage(vertex["swhid"])
    def _iter_inventory_candidates(self) -> Iterator[int]:
        # We cycle from retrieving from swh.graph (fast but potentially incomplete),
        # and swh.storage (complete, but slow and one level at a time).
        # Note: SWHIDs discovered from swh.graph will not
        # require further fetching. Because objects are immutable,
        # nothing could have been added after the graph has been
        # exported. The sole exception is origin objects.
        # References from SWHIDs discovered from swh.storage
        # have to be looked up though. As there is a chance they
        # can be found them in swh.graph, we’ll do that at our
        # next iteration.
        fetchers = itertools.cycle(
            [self._fetch_candidates_using_graph, self._fetch_candidates_using_storage]
        )
        while to_fetch := [
            v
            for v in self._subgraph.select_incomplete()
            if v["swhid"] not in self._known_missing
        ]:
            yield len(to_fetch)
            fetch = next(fetchers)
            for vertex in to_fetch:
                # fetcher might complete more than a given vertex, so
                # this one might just have been completed since we got
                # the list of incomplete vertices
                if vertex["complete"]:
                    continue
                fetch(vertex)
    def add_edges_traversing_graph(self, start: ExtendedSWHID) -> None:
        # Mapping between SWHID string and integer vertex index
        swhid_vertices: Dict[str, int] = {}
        # SWHIDs that haven't been added yet, and whether they will be complete
        pending_swhids: Set[str] = set()
        # All SWHID -> SWHID edges seen so far
        seen_edges: Set[Tuple[str, str]] = set()
        # Edges that still need to be added
        pending_edges: Set[Tuple[str, str]] = set()
        def add_swhid(swhid):
            """Record a swhid to be added to the subgraph"""
            if swhid in swhid_vertices:
                return
            pending_swhids.add(swhid)
        def add_edge(src, dst):
            """Record an edge to be added to the subgraph"""
            edge = (src, dst)
            if edge in seen_edges:
                return
            add_swhid(src)
            add_swhid(dst)
            pending_edges.add((src, dst))
            seen_edges.add((src, dst))
            # Add new edges in bulk
            if len(pending_edges) > 10000:
                flush_edges()
        def flush_edges():
            """Add all the pending swhids, and all pending edges, to the subgraph."""
            # Check for swhids that are already in the subgraph and record them
            # in swhid_vertices
            found = {
                v["name"]: v.index
                for v in self._subgraph.vs.select(name_in=pending_swhids)
            }
            swhid_vertices.update(found)
            # Filter the really new swhids and insert them
            new_swhids = pending_swhids - found.keys()
            if new_swhids:
                added = self._subgraph.add_swhids(new_swhids)
                swhid_vertices.update(added)
            # Mark all non-origin nodes as completely visited
            for name in pending_swhids:
                if not name.startswith("swh:1:ori:"):
                    self._subgraph.vs[swhid_vertices[name]]["complete"] = True
            pending_swhids.clear()
            # Then, add the edges in bulk
            self._subgraph.add_edges(
                (swhid_vertices[src], swhid_vertices[dst]) for src, dst in pending_edges
            )
            pending_edges.clear()
        # We want everything except dir→rev edges which represent submodules.
        # See the relevant comment in `_add_edges_using_storage_for_directory` below.
        edge_restriction = "ori:*,snp:*,rel:*,rev:*,dir:dir,dir:cnt"
        # XXX: We should be able to pass a SWHID object to visit_edges()
        for src, dst in self._graph_client.visit_edges(
            str(start), edges=edge_restriction
        ):
            add_edge(src, dst)
        # Always manually flush the last batch of swhids/edges
        flush_edges()
    def add_edges_using_storage(self, source: ExtendedSWHID) -> None:
        _ADD_EDGES_USING_STORAGE_METHODS_PER_OBJECT_TYPE[source.object_type](
            self, source
        )
    def _add_edges_using_storage_for_content(self, source: ExtendedSWHID) -> None:
        _ = self._subgraph.add_swhid(source, complete=True)
        # Nothing is referenced by content objects so we have no edges to add
    def _add_edges_using_storage_for_directory(self, source: ExtendedSWHID) -> None:
        v_directory = self._subgraph.add_swhid(source, complete=True)
        entries = stream_results_optional(
            self._storage.directory_get_entries, source.object_id
        )
        if not entries:
            logger.warning("Directory %s not found", source)
            return
        for entry in entries:
            target = entry.swhid().to_extended()
            # References from directory to revision represents submodules. As of
            # April 2023, loaders will record the reference to a submodule, and
            # that’s it. They don’t handle loading the associated origin. If the
            # revision for a submodule is in the archive, it means that we the
            # origin has been retrieved separately. The policy is thus that to
            # remove the code in a submodule, one should remove the associated
            # origin. Long story short, revisions used as submodule (and their
            # references) are not considered as candidates for removal when
            # inventorying a directory.
            if target.object_type == ObjectType.REVISION:
                logger.debug(
                    "Ignored submodule %s (%s), listed in directory %s",
                    entry.name,
                    entry.swhid(),
                    source,
                )
                continue
            # Content objects are our leafs. Therefore we already know all about
            # them as soon as we know they exist and we can consider them complete.
            if target.object_type == ObjectType.CONTENT:
                v_target = self._subgraph.add_swhid(target, complete=True)
            else:
                v_target = self._subgraph.add_swhid(target)
            # It is possible to get the same target multiple times
            # for a single directory. For example if it contains multiple
            # files with the same content.
            self._subgraph.add_edge(v_directory, v_target, skip_duplicates=True)
    def _add_edges_using_storage_for_revision(self, source: ExtendedSWHID) -> None:
        # We limit the search to not retrieve too much at once from storage.
        # We will have to retrieve the oldest parent revisions if needed in any cases.
        for rev_d in self._storage.revision_log([source.object_id], limit=100):
            if rev_d is None:
                continue
            elif isinstance(rev_d, Revision):
                revision = rev_d
            else:
                # TODO: remove this conditional once swh-storage fully migrated to
                # returning revision objects instead of dicts
                revision = Revision.from_dict(rev_d)
            revision_swhid = revision.swhid().to_extended()
            # We might know about this revision already from a previous revision
            # log, we can skip it. We can't skip further commits though, as the
            # history doesn't have to be strictly linear.
            if self._subgraph.vs.select(name=str(revision_swhid), complete=True):
                continue
            # We can say these don’t need to be fetched as we know that
            # we are getting everything about this revision here:
            # its directory and parents.
            v_revision = self._subgraph.add_swhid(revision_swhid, complete=True)
            v_directory = self._subgraph.add_swhid(revision.directory_swhid())
            self._subgraph.add_edge(v_revision, v_directory)
            # We create a set here because some revisions point to the same
            # parent multiple times.
            for parent_swhid in set(revision.parent_swhids()):
                v_parent = self._subgraph.add_swhid(parent_swhid)
                self._subgraph.add_edge(v_revision, v_parent)
    def _add_edges_using_storage_for_release(self, source: ExtendedSWHID) -> None:
        v_release = self._subgraph.add_swhid(source, complete=True)
        [release] = self._storage.release_get([source.object_id])
        if not release:
            logger.warning("Release %s not found", source)
            return
        v_target = self._subgraph.add_swhid(release.target_swhid())
        self._subgraph.add_edge(v_release, v_target)
    def _add_edges_using_storage_for_snapshot(self, source: ExtendedSWHID) -> None:
        v_snapshot = self._subgraph.add_swhid(source, complete=True)
        snapshot = snapshot_get_all_branches(self._storage, source.object_id)
        if not snapshot:
            logger.warning("Snapshot %s not found", source)
            return
        # We need to deduplicate targets as multiple branches can point to the
        # same head and we only need one edge for each target.
        # TODO: Better document aliases in swh-model. I had to look at mercurial
        #       loader to understand what they were useful for.
        # We also skip aliases here as they are referring to a branch name
        # inside the snapshot. As we inventory the reference for every branches
        # the branch pointed by an alias will always be dealt with.
        target_swhids = {
            branch.swhid()
            for branch in snapshot.branches.values()
            if branch and branch.target_type.value != "alias"
        }
        for swhid in target_swhids:
            v_branch = self._subgraph.add_swhid(swhid)
            self._subgraph.add_edge(v_snapshot, v_branch)
    def _add_edges_using_storage_for_origin(self, source: ExtendedSWHID) -> None:
        v_source = self._subgraph.add_swhid(source, complete=True)
        [origin_d] = self._storage.origin_get_by_sha1([source.object_id])
        if not origin_d:
            raise ValueError(f"Origin “{source}” not found in storage")
        for visit in iter_origin_visits(self._storage, origin_d["url"]):
            assert visit.visit is not None  # make mypy happy
            for status in iter_origin_visit_statuses(
                self._storage, visit.origin, visit.visit
            ):
                snapshot_swhid = status.snapshot_swhid()
                if snapshot_swhid:
                    v_snapshot = self._subgraph.add_swhid(snapshot_swhid)
                    self._subgraph.add_edge(v_source, v_snapshot, skip_duplicates=True)
_ADD_EDGES_USING_STORAGE_METHODS_PER_OBJECT_TYPE: Dict[
    ObjectType, Callable[[Lister, ExtendedSWHID], None]
] = {
    ObjectType.CONTENT: Lister._add_edges_using_storage_for_content,
    ObjectType.DIRECTORY: Lister._add_edges_using_storage_for_directory,
    ObjectType.REVISION: Lister._add_edges_using_storage_for_revision,
    ObjectType.RELEASE: Lister._add_edges_using_storage_for_release,
    ObjectType.SNAPSHOT: Lister._add_edges_using_storage_for_snapshot,
    ObjectType.ORIGIN: Lister._add_edges_using_storage_for_origin,
}
[docs]
class ProgressBarItem(NamedTuple):
    origin: Optional[ExtendedSWHID]
    total: int
    remaining: int
    def __str__(self) -> str:
        return (
            f"{self.origin or 'done'} "
            f"({self.total} objects found / {self.remaining} left to look up)"
        ) 
def _ensure_swhids_exist_in_storage(
    storage: StorageInterface, swhids: List[ExtendedSWHID]
) -> None:
    """Raise RootsNotFound if any of the given swhids cannot be found
    in the given storage."""
    wanted_swhids = set(swhids)
    existing_swhids = set(filter_objects_missing_from_storage(storage, wanted_swhids))
    diff = wanted_swhids - existing_swhids
    if diff:
        raise RootsNotFound(diff)
[docs]
def make_inventory(
    storage,
    graph_client,
    swhids: List[ExtendedSWHID],
    known_missing: Optional[Set[ExtendedSWHID]] = None,
    progressbar: Optional[ProgressBarInit] = None,
) -> InventorySubgraph:
    """Inventory candidates for removal from the given set of SWHID.
    By querying the given `storage` and `graph_client`, create a subgraph
    with all objects belonging to the given set of SWHIDs (typically origins).
    The result should then used to verify which candidate can safely be
    removed.
    """
    _ensure_swhids_exist_in_storage(storage, swhids)
    subgraph = InventorySubgraph()
    known_missing = known_missing or set()
    progressbar_init: ProgressBarInit = progressbar or no_progressbar
    bar: ProgressBar[ProgressBarItem]
    with progressbar_init(
        # Giving an infinite iterator is one of the rare ways
        # to get click.progressbar() to display a moving cursor.
        # In any cases, we are not going to use its value as we
        # manually call `.update()` in `Lister.inventory_candidates()`.
        # But we still need to make mypy happy and give the right
        # type for the progressbar item.
        itertools.cycle([ProgressBarItem(swhids[0], 0, 0)]),
        label="Inventorying all reachable objects…",
        show_percent=False,
        show_pos=False,
        item_show_func=lambda s: str(s) if s else "",
    ) as bar:
        lister = Lister(
            storage,
            graph_client,
            subgraph,
            known_missing=known_missing,
            progressbar=bar,
        )
        for swhid in swhids:
            lister.inventory_candidates(swhid)
    return lister.subgraph