Source code for swh.deposit.api.private.deposit_read
# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from contextlib import contextmanager
import os
from pathlib import Path
import shutil
import tempfile
from typing import Any, Dict, Iterator, List, Optional, Tuple
from xml.etree import ElementTree
from rest_framework import status
from swh.core import tarball
from swh.deposit.api.common import APIGet
from swh.deposit.api.private import APIPrivateView, DepositReadMixin
from swh.deposit.config import ARCHIVE_TYPE, SWH_PERSON
from swh.deposit.models import Deposit
from swh.deposit.utils import NAMESPACES, normalize_date
from swh.model.hashutil import hash_to_hex
from swh.model.model import MetadataAuthorityType
from swh.model.swhids import CoreSWHID
[docs]
@contextmanager
def aggregate_tarballs(extraction_dir: str, archives: List) -> Iterator[str]:
    """Aggregate multiple tarballs into one and returns this new archive's
       path.
    Args:
        extraction_dir: Path to use for the tarballs computation
        archive_paths: Deposit's archive paths
    Returns:
        Tuple (directory to clean up, archive path (aggregated or not))
    """
    # rebuild one zip archive from (possibly) multiple ones
    os.makedirs(extraction_dir, 0o755, exist_ok=True)
    dir_path = tempfile.mkdtemp(prefix="swh.deposit-", dir=extraction_dir)
    # root folder to build an aggregated tarball
    aggregated_tarball_rootdir = os.path.join(dir_path, "aggregate")
    download_tarball_rootdir = os.path.join(dir_path, "download")
    # uncompress in a temporary location all client's deposit archives
    for archive in archives:
        with archive.open("rb") as archive_fp:
            try:
                # For storage which supports the path method access, let's retrieve it
                archive_path = archive.path
            except NotImplementedError:
                # otherwise for remote backend which do not support it, let's download
                # the tarball locally first
                tarball_path = Path(archive.name)
                tarball_path_dir = Path(download_tarball_rootdir) / tarball_path.parent
                tarball_path_dir.mkdir(0o755, parents=True, exist_ok=True)
                archive_path = str(tarball_path_dir / tarball_path.name)
                with open(archive_path, "wb") as f:
                    while chunk := archive_fp.read(10 * 1024 * 1024):
                        f.write(chunk)
        tarball.uncompress(archive_path, aggregated_tarball_rootdir)
    # Aggregate into one big tarball the multiple smaller ones
    temp_tarpath = shutil.make_archive(
        aggregated_tarball_rootdir, "tar", aggregated_tarball_rootdir
    )
    # can already clean up temporary directory
    shutil.rmtree(aggregated_tarball_rootdir)
    try:
        yield temp_tarpath
    finally:
        shutil.rmtree(dir_path)
[docs]
class APIReadArchives(APIPrivateView, APIGet, DepositReadMixin):
    """Dedicated class to read a deposit's raw archives content.
    Only GET is supported.
    """
    def __init__(self):
        super().__init__()
        self.extraction_dir = self.config["extraction_dir"]
        if not os.path.exists(self.extraction_dir):
            os.makedirs(self.extraction_dir)
[docs]
    def process_get(
        self, request, collection_name: str, deposit: Deposit
    ) -> Tuple[int, Any, str]:
        """Build a unique tarball from the multiple received and stream that
           content to the client.
        Args:
            request (Request):
            collection_name: Collection owning the deposit
            deposit: Deposit concerned by the reading
        Returns:
            Tuple status, stream of content, content-type
        """
        archives = [
            r.archive
            for r in self._deposit_requests(deposit, request_type=ARCHIVE_TYPE)
        ]
        return (
            status.HTTP_200_OK,
            aggregate_tarballs(self.extraction_dir, archives),
            "swh/generator",
        )
[docs]
class APIReadMetadata(APIPrivateView, APIGet, DepositReadMixin):
    """Class in charge of aggregating metadata on a deposit."""
    def _parse_dates(
        self, deposit: Deposit, metadata: ElementTree.Element
    ) -> Tuple[dict, dict]:
        """Normalize the date to use as a tuple of author date, committer date
           from the incoming metadata.
        Returns:
            Tuple of author date, committer date. Those dates are
            swh normalized.
        """
        commit_date_elt = metadata.find("codemeta:datePublished", namespaces=NAMESPACES)
        author_date_elt = metadata.find("codemeta:dateCreated", namespaces=NAMESPACES)
        author_date: Any
        commit_date: Any
        if author_date_elt is None and commit_date_elt is None:
            author_date = commit_date = deposit.complete_date
        elif commit_date_elt is None:
            author_date = commit_date = author_date_elt.text  # type: ignore
        elif author_date_elt is None:
            author_date = commit_date = commit_date_elt.text
        else:
            author_date = author_date_elt.text
            commit_date = commit_date_elt.text
        return (normalize_date(author_date), normalize_date(commit_date))
[docs]
    def metadata_read(self, deposit: Deposit) -> Dict[str, Any]:
        """Read and aggregate multiple deposit information into one unified dictionary.
        Args:
            deposit: Deposit to retrieve information from
        Returns:
            Dictionary of deposit information read by the deposit loader, with the
            following keys:
                **origin** (Dict): Information about the origin
                **raw_metadata** (str): List of raw metadata received for the
                  deposit
                **provider** (Dict): the metadata provider information about the
                  deposit client
                **tool** (Dict): the deposit information
                **deposit** (Dict): deposit information relevant to build the revision
                  (author_date, committer_date, etc...)
        """
        raw_metadata = self._metadata_get(deposit)
        author_date: Optional[dict]
        commit_date: Optional[dict]
        if raw_metadata:
            metadata_tree = ElementTree.fromstring(raw_metadata)
            author_date, commit_date = self._parse_dates(deposit, metadata_tree)
            release_notes_elements = metadata_tree.findall(
                "codemeta:releaseNotes", namespaces=NAMESPACES
            )
        else:
            author_date = commit_date = None
            release_notes_elements = []
        if deposit.parent and deposit.parent.swhid:
            parent_swhid = deposit.parent.swhid
            assert parent_swhid is not None
            swhid = CoreSWHID.from_string(parent_swhid)
            parent_revision = hash_to_hex(swhid.object_id)
            parents = [parent_revision]
        else:
            parents = []
        release_notes: Optional[str]
        if release_notes_elements:
            release_notes = "\n\n".join(
                element.text for element in release_notes_elements if element.text
            )
        else:
            release_notes = None
        return {
            "origin": {"type": "deposit", "url": deposit.origin_url},
            "provider": {
                "provider_name": deposit.client.last_name,
                "provider_url": deposit.client.provider_url,
                "provider_type": MetadataAuthorityType.DEPOSIT_CLIENT.value,
                "metadata": {},
            },
            "tool": self.tool,
            "raw_metadata": raw_metadata,
            "deposit": {
                "id": deposit.id,
                "client": deposit.client.username,
                "collection": deposit.collection.name,
                "author": SWH_PERSON,
                "author_date": author_date,
                "committer": SWH_PERSON,
                "committer_date": commit_date,
                "revision_parents": parents,
                "release_notes": release_notes,
            },
        }
[docs]
    def process_get(
        self, request, collection_name: str, deposit: Deposit
    ) -> Tuple[int, Dict, str]:
        data = self.metadata_read(deposit)
        return status.HTTP_200_OK, data if data else {}, "application/json"