Source code for swh.vault.to_disk
# Copyright (C) 2016-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import collections
import concurrent
import os
from typing import Any, Dict, Optional
from swh.model import hashutil
from swh.model.from_disk import DentryPerms, mode_to_perms
from swh.objstorage.interface import ObjStorageInterface, objid_from_dict
from swh.storage.interface import StorageInterface
MISSING_MESSAGE = (
    b"This content is missing from the Software Heritage archive "
    b"(or from the mirror used while retrieving it)."
)
SKIPPED_MESSAGE = (
    b"This content has not been retrieved in the "
    b"Software Heritage archive due to its size."
)
HIDDEN_MESSAGE = b"This content is hidden."
[docs]
def get_filtered_file_content(
    storage: StorageInterface,
    file_data: Dict[str, Any],
    objstorage: Optional[ObjStorageInterface] = None,
) -> Dict[str, Any]:
    """Retrieve the file specified by file_data and apply filters for skipped
    and missing content.
    Args:
        storage: the storage from which to retrieve the objects
        file_data: a file entry as returned by directory_ls()
    Returns:
        The entry given in file_data with a new 'content' key that points to
        the file content in bytes.
        The contents can be replaced by a specific message to indicate that
        they could not be retrieved (either due to privacy policy or because
        their sizes were too big for us to archive it).
    """
    status = file_data["status"]
    if status == "visible":
        hashes = objid_from_dict(file_data)
        data: Optional[bytes]
        if objstorage is not None:
            data = objstorage.get(hashes)
        else:
            data = storage.content_get_data(hashes)
        if data is None:
            content = SKIPPED_MESSAGE
        else:
            content = data
    elif status == "absent":
        content = SKIPPED_MESSAGE
    elif status == "hidden":
        content = HIDDEN_MESSAGE
    elif status is None:
        content = MISSING_MESSAGE
    else:
        assert False, (
            f"unexpected status {status!r} "
            f"for content {hashutil.hash_to_hex(file_data['target'])}"
        )
    return {"content": content, **file_data} 
[docs]
class DirectoryBuilder:
    """Reconstructs the on-disk representation of a directory in the storage."""
    def __init__(
        self,
        storage: StorageInterface,
        root: bytes,
        dir_id: bytes,
        thread_pool_size: int = 10,
        objstorage: Optional[ObjStorageInterface] = None,
    ):
        """Initialize the directory builder.
        Args:
            storage: the storage object
            root: the path where the directory should be reconstructed
            dir_id: the identifier of the directory in the storage
        """
        self.storage = storage
        self.root = root
        self.dir_id = dir_id
        self.thread_pool_size = thread_pool_size
        self.objstorage = objstorage
[docs]
    def build(self) -> None:
        """Perform the reconstruction of the directory in the given root."""
        def file_fetcher(file_data: Dict[str, Any]) -> None:
            file_data = get_filtered_file_content(
                self.storage, file_data, self.objstorage
            )
            path = os.path.join(self.root, file_data["path"])
            self._create_file(path, file_data["content"], file_data["perms"])
        executor = concurrent.futures.ThreadPoolExecutor(self.thread_pool_size)
        futures = []
        os.makedirs(self.root, exist_ok=True)
        queue = collections.deque([(b"", self.dir_id)])
        while queue:
            path, dir_id = queue.popleft()
            dir_entries = self.storage.directory_ls(dir_id)
            for dir_entry in dir_entries:
                dir_entry["path"] = os.path.join(path, dir_entry["name"])
                match dir_entry["type"]:
                    case "dir":
                        self._create_tree(dir_entry)
                        queue.append((dir_entry["path"], dir_entry["target"]))
                    case "rev":
                        self._create_revision(dir_entry)
                    case "file":
                        futures.append(executor.submit(file_fetcher, dir_entry))
                    case _:
                        raise ValueError(
                            f"Unsupported directory entry type {dir_entry['type']} for "
                            f"{dir_entry['name']:r} in directory swh:1:dir:{dir_id.hex()}"
                        )
        concurrent.futures.wait(futures) 
    def _create_tree(self, directory: Dict[str, Any]) -> None:
        """Create a directory tree from root for the given path."""
        os.makedirs(os.path.join(self.root, directory["path"]), exist_ok=True)
    def _create_revision(self, rev_data: Dict[str, Any]) -> None:
        """Create the revision in the tree as a broken symlink to the target
        identifier."""
        os.makedirs(os.path.join(self.root, rev_data["path"]), exist_ok=True)
    def _create_file(
        self, path: bytes, content: bytes, mode: int = DentryPerms.content
    ) -> None:
        """Create the given file and fill it with content."""
        perms = mode_to_perms(mode)
        if perms == DentryPerms.symlink:
            os.symlink(content, path)
        else:
            with open(path, "wb") as f:
                f.write(content)
            os.chmod(path, perms.value)