# Copyright (C) 2015-2018  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module in charge of hashing function definitions. This is the base
module use to compute swh's hashes.
Only a subset of hashing algorithms is supported as defined in the
ALGORITHMS set. Any provided algorithms not in that list will result
in a ValueError explaining the error.
This module defines a MultiHash class to ease the softwareheritage
hashing algorithms computation. This allows to compute hashes from
file object, path, data using a similar interface as what the standard
hashlib module provides.
Basic usage examples:
- file object: MultiHash.from_file(
                 file_object, hash_names=DEFAULT_ALGORITHMS).digest()
- path (filepath): MultiHash.from_path(b'foo').hexdigest()
- data (bytes): MultiHash.from_data(b'foo').bytehexdigest()
"Complex" usage, defining a swh hashlib instance first:
- To compute length, integrate the length to the set of algorithms to
  compute, for example:
  .. code-block:: python
     h = MultiHash(hash_names=set({'length'}).union(DEFAULT_ALGORITHMS))
     with open(filepath, 'rb') as f:
         h.update(f.read(HASH_BLOCK_SIZE))
     hashes = h.digest()  # returns a dict of {hash_algo_name: hash_in_bytes}
- Write alongside computing hashing algorithms (from a stream), example:
  .. code-block:: python
     h = MultiHash(length=length)
     with open(filepath, 'wb') as f:
         for chunk in r.iter_content():  # r a stream of sort
             h.update(chunk)
             f.write(chunk)
     hashes = h.hexdigest()  # returns a dict of {hash_algo_name: hash_in_hex}
"""
import binascii
import functools
import hashlib
from io import BytesIO
import os
from typing import Callable, Dict, Optional, Union
ALGORITHMS = set(
    ["sha1", "sha256", "sha1_git", "blake2s256", "blake2b512", "md5", "sha512"]
)
"""Hashing algorithms supported by this module"""
DEFAULT_ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256"])
"""Algorithms computed by default when calling the functions from this module.
Subset of :const:`ALGORITHMS`.
"""
HASH_BLOCK_SIZE = 32768
"""Block size for streaming hash computations made in this module"""
_blake2_hash_cache: Dict[str, Callable] = {}
[docs]
class MultiHash:
    """Hashutil class to support multiple hashes computation.
    Args:
        hash_names (set): Set of hash algorithms (+ optionally length)
                          to compute hashes (cf. DEFAULT_ALGORITHMS)
        length (int): Length of the total sum of chunks to read
    If the length is provided as algorithm, the length is also
    computed and returned.
    """
    def __init__(self, hash_names=DEFAULT_ALGORITHMS, length=None):
        self.state = {}
        self.track_length = False
        for name in hash_names:
            if name == "length":
                self.state["length"] = 0
                self.track_length = True
            else:
                self.state[name] = _new_hash(name, length)
[docs]
    @classmethod
    def from_state(cls, state, track_length):
        ret = cls([])
        ret.state = state
        ret.track_length = track_length 
[docs]
    @classmethod
    def from_file(cls, fobj, hash_names=DEFAULT_ALGORITHMS, length=None):
        ret = cls(length=length, hash_names=hash_names)
        while True:
            chunk = fobj.read(HASH_BLOCK_SIZE)
            if not chunk:
                break
            ret.update(chunk)
        return ret 
[docs]
    @classmethod
    def from_path(cls, path, hash_names=DEFAULT_ALGORITHMS):
        length = os.path.getsize(path)
        with open(path, "rb") as f:
            ret = cls.from_file(f, hash_names=hash_names, length=length)
        return ret 
[docs]
    @classmethod
    def from_data(cls, data, hash_names=DEFAULT_ALGORITHMS):
        length = len(data)
        fobj = BytesIO(data)
        return cls.from_file(fobj, hash_names=hash_names, length=length) 
[docs]
    def update(self, chunk):
        for name, h in self.state.items():
            if name == "length":
                continue
            h.update(chunk)
        if self.track_length:
            self.state["length"] += len(chunk) 
[docs]
    def digest(self):
        return {
            name: h.digest() if name != "length" else h
            for name, h in self.state.items()
        } 
[docs]
    def hexdigest(self):
        return {
            name: h.hexdigest() if name != "length" else h
            for name, h in self.state.items()
        } 
[docs]
    def bytehexdigest(self):
        return {
            name: hash_to_bytehex(h.digest()) if name != "length" else h
            for name, h in self.state.items()
        } 
[docs]
    def copy(self):
        copied_state = {
            name: h.copy() if name != "length" else h for name, h in self.state.items()
        }
        return self.from_state(copied_state, self.track_length) 
 
def _new_blake2_hash(algo):
    """Return a function that initializes a blake2 hash."""
    if algo in _blake2_hash_cache:
        return _blake2_hash_cache[algo]()
    lalgo = algo.lower()
    if not lalgo.startswith("blake2"):
        raise ValueError("Algorithm %s is not a blake2 hash" % algo)
    blake_family = lalgo[:7]
    digest_size = None
    if lalgo[7:]:
        try:
            digest_size, remainder = divmod(int(lalgo[7:]), 8)
        except ValueError:
            raise ValueError("Unknown digest size for algo %s" % algo) from None
        if remainder:
            raise ValueError(
                "Digest size for algorithm %s must be a multiple of 8" % algo
            )
    blake2 = getattr(hashlib, blake_family)
    _blake2_hash_cache[algo] = lambda: blake2(digest_size=digest_size)
    return _blake2_hash_cache[algo]()
def _new_hashlib_hash(algo):
    """Initialize a digest object from hashlib.
    Handle the swh-specific names for the blake2-related algorithms
    """
    if algo.startswith("blake2"):
        return _new_blake2_hash(algo)
    else:
        return hashlib.new(algo)
def _new_hash(algo: str, length: Optional[int] = None):
    """Initialize a digest object (as returned by python's hashlib) for
    the requested algorithm. See the constant ALGORITHMS for the list
    of supported algorithms. If a git-specific hashing algorithm is
    requested (e.g., "sha1_git"), the hashing object will be pre-fed
    with the needed header; for this to work, length must be given.
    Args:
        algo (str): a hashing algorithm (one of ALGORITHMS)
        length (int): the length of the hashed payload (needed for
          git-specific algorithms)
    Returns:
        a hashutil.hash object
    Raises:
        ValueError if algo is unknown, or length is missing for a git-specific
        hash.
    """
    if algo not in ALGORITHMS:
        raise ValueError(
            "Unexpected hashing algorithm %s, expected one of %s"
            % (algo, ", ".join(sorted(ALGORITHMS)))
        )
    if algo.endswith("_git"):
        if length is None:
            raise ValueError("Missing length for git hashing algorithm")
        base_algo = algo[:-4]
        h = _new_hashlib_hash(base_algo)
        h.update(git_object_header("blob", length))
        return h
    return _new_hashlib_hash(algo)
[docs]
def hash_git_data(data, git_type, base_algo="sha1"):
    """Hash the given data as a git object of type git_type.
    Args:
        data: a bytes object
        git_type: the git object type
        base_algo: the base hashing algorithm used (default: sha1)
    Returns: a dict mapping each algorithm to a bytes digest
    Raises:
        ValueError if the git_type is unexpected.
    """
    h = _new_hashlib_hash(base_algo)
    h.update(git_object_header(git_type, len(data)))
    h.update(data)
    return h.digest() 
[docs]
@functools.lru_cache()
def hash_to_hex(hash: Union[str, bytes]) -> str:
    """Converts a hash (in hex or bytes form) to its hexadecimal ascii form
    Args:
      hash (str or bytes): a :class:`bytes` hash or a :class:`str` containing
        the hexadecimal form of the hash
    Returns:
      str: the hexadecimal form of the hash
    """
    if isinstance(hash, str):
        return hash
    return binascii.hexlify(hash).decode("ascii") 
[docs]
@functools.lru_cache()
def hash_to_bytehex(hash: bytes) -> bytes:
    """Converts a hash to its hexadecimal bytes representation
    Args:
      hash (bytes): a :class:`bytes` hash
    Returns:
      bytes: the hexadecimal form of the hash, as :class:`bytes`
    """
    return binascii.hexlify(hash) 
[docs]
@functools.lru_cache()
def hash_to_bytes(hash: Union[str, bytes]) -> bytes:
    """Converts a hash (in hex or bytes form) to its raw bytes form
    Args:
      hash (str or bytes): a :class:`bytes` hash or a :class:`str` containing
        the hexadecimal form of the hash
    Returns:
      bytes: the :class:`bytes` form of the hash
    """
    if isinstance(hash, bytes):
        return hash
    return bytes.fromhex(hash) 
[docs]
@functools.lru_cache()
def bytehex_to_hash(hex: bytes) -> bytes:
    """Converts a hexadecimal bytes representation of a hash to that hash
    Args:
      hash (bytes): a :class:`bytes` containing the hexadecimal form of the
        hash encoded in ascii
    Returns:
      bytes: the :class:`bytes` form of the hash
    """
    return hash_to_bytes(hex.decode())