Source code for swh.journal.writer
# Copyright (C) 2019-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import sys
from typing import Any, BinaryIO, Dict, Type
import warnings
from .interface import JournalWriterInterface
[docs]
def model_object_dict_sanitizer(
    object_type: str, object_dict: Dict[str, Any]
) -> Dict[str, str]:
    object_dict = object_dict.copy()
    if object_type == "content":
        object_dict.pop("data", None)
    return object_dict 
[docs]
def get_journal_writer(cls, **kwargs) -> JournalWriterInterface:
    if "args" in kwargs:
        warnings.warn(
            'Explicit "args" key is deprecated, use keys directly instead.',
            DeprecationWarning,
        )
        kwargs = kwargs["args"]
    kwargs.setdefault("value_sanitizer", model_object_dict_sanitizer)
    if cls == "inmemory":  # FIXME: Remove inmemory in due time
        warnings.warn(
            "cls = 'inmemory' is deprecated, use 'memory' instead", DeprecationWarning
        )
        cls = "memory"
    JournalWriter: Type[JournalWriterInterface]
    if cls == "memory":
        from .inmemory import InMemoryJournalWriter
        JournalWriter = InMemoryJournalWriter
    elif cls == "kafka":
        from .kafka import KafkaJournalWriter
        JournalWriter = KafkaJournalWriter
    elif cls == "stream":
        from .stream import StreamJournalWriter
        JournalWriter = StreamJournalWriter
        assert "output_stream" in kwargs
        outstream: BinaryIO
        if kwargs["output_stream"] in ("-", b"-"):
            outstream = os.fdopen(sys.stdout.fileno(), "wb", closefd=False)
        elif isinstance(kwargs["output_stream"], (str, bytes)):
            outstream = open(kwargs["output_stream"], "wb")
        else:
            outstream = kwargs["output_stream"]
        kwargs["output_stream"] = outstream
    else:
        raise ValueError("Unknown journal writer class `%s`" % cls)
    return JournalWriter(**kwargs)