# Copyright (C) 2017-2025  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from itertools import chain
import logging
import os
import re
from shutil import get_unpack_formats
import tarfile
import tempfile
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse
from xml.etree import ElementTree
import zipfile
import requests
import sentry_sdk
from swh.core import config
from swh.deposit.client import PrivateApiDepositClient
from swh.deposit.config import DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_VERIFIED
from swh.deposit.loader.checks import check_metadata
logger = logging.getLogger(__name__)
MANDATORY_ARCHIVE_UNREADABLE = (
    "At least one of its associated archives is not readable"  # noqa
)
MANDATORY_ARCHIVE_INVALID = (
    "Mandatory archive is invalid (i.e contains only one archive)"  # noqa
)
MANDATORY_ARCHIVE_UNSUPPORTED = "Mandatory archive type is not supported"
MANDATORY_ARCHIVE_MISSING = "Deposit without archive is rejected"
ARCHIVE_EXTENSIONS = [
    "zip",
    "tar",
    "tar.gz",
    "xz",
    "tar.xz",
    "bz2",
    "tar.bz2",
    "Z",
    "tar.Z",
    "tgz",
    "7z",
]
PATTERN_ARCHIVE_EXTENSION = re.compile(r".*\.(%s)$" % "|".join(ARCHIVE_EXTENSIONS))
def _check_archive(archive_url: str) -> Tuple[bool, Optional[str]]:
    """Check that a deposit associated archive is ok:
    - readable
    - supported archive format
    - valid content: the archive does not contain a single archive file
    If any of those checks are not ok, return the corresponding
    failing check.
    Args:
        archive_path (DepositRequest): Archive to check
    Returns:
        (True, None) if archive is check compliant, (False,
        <detail-error>) otherwise.
    """
    parsed_archive_url = urlparse(archive_url)
    archive_name = os.path.basename(parsed_archive_url.path)
    if not known_archive_format(archive_name):
        return False, MANDATORY_ARCHIVE_UNSUPPORTED
    try:
        response = requests.get(archive_url, stream=True)
        with tempfile.TemporaryDirectory() as tmpdir:
            archive_path = os.path.join(tmpdir, archive_name)
            with open(archive_path, "wb") as archive_fp:
                for chunk in response.iter_content(chunk_size=10 * 1024 * 1024):
                    archive_fp.write(chunk)
            with open(archive_path, "rb") as archive_fp:
                try:
                    with zipfile.ZipFile(archive_fp) as zip_fp:
                        files = zip_fp.namelist()
                except Exception:
                    try:
                        # rewind since the first tryout reading may have moved the
                        # cursor
                        archive_fp.seek(0)
                        with tarfile.open(fileobj=archive_fp) as tar_fp:
                            files = tar_fp.getnames()
                    except Exception:
                        return False, MANDATORY_ARCHIVE_UNSUPPORTED
    except Exception:
        return False, MANDATORY_ARCHIVE_UNREADABLE
    if len(files) > 1:
        return True, None
    element = files[0]
    if PATTERN_ARCHIVE_EXTENSION.match(element):
        # archive in archive!
        return False, MANDATORY_ARCHIVE_INVALID
    return True, None
def _check_deposit_archives(
    archive_urls: List[str],
) -> Tuple[bool, Optional[Dict]]:
    """Given a deposit, check each deposit request of type archive.
    Args:
        The deposit to check archives for
    Returns
        tuple (status, details): True, None if all archives
        are ok, (False, <detailed-error>) otherwise.
    """
    if len(archive_urls) == 0:  # no associated archive is refused
        return False, {
            "archive": [
                {
                    "summary": MANDATORY_ARCHIVE_MISSING,
                }
            ]
        }
    errors = []
    for archive_url in archive_urls:
        check, error_message = _check_archive(archive_url)
        if not check:
            errors.append({"summary": error_message})
    if not errors:
        return True, None
    return False, {"archive": errors}
[docs]
class DepositChecker:
    """Deposit checker implementation.
    Trigger deposit's checks through the private api.
    """
    def __init__(self):
        self.config: Dict[str, Any] = config.load_from_envvar()
        self.client = PrivateApiDepositClient(config=self.config["deposit"])
[docs]
    def check(self, collection: str, deposit_id: str) -> Dict[str, Any]:
        status = None
        deposit_upload_urls = f"/{deposit_id}/upload-urls/"
        logger.debug("deposit-upload-urls: %s", deposit_upload_urls)
        details_dict: Dict = {}
        try:
            raw_metadata = self.client.metadata_get(f"/{deposit_id}/meta/").get(
                "raw_metadata"
            )
            # will check each deposit's associated request (both of type
            # archive and metadata) for errors
            archive_urls = self.client.do("GET", deposit_upload_urls).json()
            logger.debug("deposit-upload-urls result: %s", archive_urls)
            archives_status_ok, details = _check_deposit_archives(archive_urls)
            if not archives_status_ok:
                assert details is not None
                details_dict.update(details)
            if raw_metadata is None:
                metadata_status_ok = False
                details_dict["metadata"] = [{"summary": "Missing Atom document"}]
            else:
                metadata_tree = ElementTree.fromstring(raw_metadata)
                metadata_status_ok, details = check_metadata(metadata_tree)
                # Ensure in case of error, we do have the rejection details
                assert metadata_status_ok or (
                    not metadata_status_ok and details is not None
                )
                # we can have warnings even if checks are ok (e.g. missing suggested field)
                details_dict.update(details or {})
            deposit_status_ok = archives_status_ok and metadata_status_ok
            # if any details_dict arose, the deposit is rejected
            status = (
                DEPOSIT_STATUS_VERIFIED
                if deposit_status_ok
                else DEPOSIT_STATUS_REJECTED
            )
            self.client.status_update(
                f"/{deposit_id}/update/", status=status, status_detail=details_dict
            )
            status = "eventful" if status == DEPOSIT_STATUS_VERIFIED else "failed"
        except Exception as e:
            sentry_sdk.capture_exception()
            status = "failed"
            details_dict["exception"] = f"{e.__class__.__name__}: {str(e)}"
        logger.debug("Check status: %s", status)
        return {"status": status, "status_detail": details_dict}