Source code for swh.journal.serializers
# Copyright (C) 2016-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from enum import Enum
from typing import Any, BinaryIO, Union
import msgpack
from swh.model.model import KeyType
[docs]
class MsgpackExtTypeCodes(Enum):
    LONG_INT = 1
    LONG_NEG_INT = 2 
# this as been copied from swh.core.api.serializer
# TODO refactor swh.core to make this function available
def _msgpack_encode_longint(value):
    # needed because msgpack will not handle long integers with more than 64 bits
    # which we unfortunately happen to have to deal with from time to time
    if value > 0:
        code = MsgpackExtTypeCodes.LONG_INT.value
    else:
        code = MsgpackExtTypeCodes.LONG_NEG_INT.value
        value = -value
    length, rem = divmod(value.bit_length(), 8)
    if rem:
        length += 1
    return msgpack.ExtType(code, int.to_bytes(value, length, "big"))
[docs]
def msgpack_ext_encode_types(obj):
    if isinstance(obj, int):
        return _msgpack_encode_longint(obj)
    return obj 
[docs]
def msgpack_ext_hook(code, data):
    if code == MsgpackExtTypeCodes.LONG_INT.value:
        return int.from_bytes(data, "big")
    if code == MsgpackExtTypeCodes.LONG_NEG_INT.value:
        return -int.from_bytes(data, "big")
    raise ValueError("Unknown msgpack extended code %s" % code) 
# for BW compat
[docs]
def decode_types_bw(obj):
    if set(obj.keys()) == {b"d", b"swhtype"} and obj[b"swhtype"] == "datetime":
        return datetime.datetime.fromisoformat(obj[b"d"])
    return obj 
[docs]
def stringify_key_item(k: str, v: Union[str, bytes]) -> str:
    """Turn the item of a dict key into a string"""
    if isinstance(v, str):
        return v
    if k == "url":
        return v.decode("utf-8")
    return v.hex() 
[docs]
def pprint_key(key: KeyType) -> str:
    """Pretty-print a kafka key"""
    if isinstance(key, dict):
        return "{%s}" % ", ".join(
            f"{k}: {stringify_key_item(k, v)}" for k, v in key.items()
        )
    elif isinstance(key, bytes):
        return key.hex()
    else:
        return key 
[docs]
def key_to_kafka(key: KeyType) -> bytes:
    """Serialize a key, possibly a dict, in a predictable way"""
    p = msgpack.Packer(use_bin_type=True)
    if isinstance(key, dict):
        return p.pack_map_pairs(sorted(key.items()))
    else:
        return p.pack(key) 
[docs]
def kafka_to_key(kafka_key: bytes) -> KeyType:
    """Deserialize a key"""
    return msgpack.loads(kafka_key, raw=False) 
[docs]
def value_to_kafka(value: Any) -> bytes:
    """Serialize some data for storage in kafka"""
    return msgpack.packb(
        value,
        use_bin_type=True,
        datetime=True,  # encode datetime as msgpack.Timestamp
        default=msgpack_ext_encode_types,
    ) 
[docs]
def kafka_to_value(kafka_value: bytes) -> Any:
    """Deserialize some data stored in kafka"""
    return msgpack.unpackb(
        kafka_value,
        raw=False,
        object_hook=decode_types_bw,
        ext_hook=msgpack_ext_hook,
        strict_map_key=False,
        timestamp=3,  # convert Timestamp in datetime objects (tz UTC)
    ) 
[docs]
def kafka_stream_to_value(file_like: BinaryIO) -> msgpack.Unpacker:
    """Return a deserializer for data stored in kafka"""
    return msgpack.Unpacker(
        file_like,
        raw=False,
        object_hook=decode_types_bw,
        ext_hook=msgpack_ext_hook,
        strict_map_key=False,
        use_list=False,
        timestamp=3,  # convert Timestamp in datetime objects (tz UTC)
    )