Source code for swh.counters.kafka_client
# Copyright (C) 2021  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from typing import Dict
from confluent_kafka import KafkaError
from swh.journal.client import EofBehavior, JournalClient, _error_cb
[docs]
class KeyOrientedJournalClient(JournalClient):
    """Journal Client implementation which only decodes the message keys.
    This does not need to bother with the message deserialization (contrary
    to :class:`swh.journal.client.JournalClient`)
    Message values are still passed unparsed to ``worker_fn`` so it can
    deserialize and use it if needed.
    """
[docs]
    def handle_messages(self, messages, worker_fn):
        objects: Dict[str, Dict[bytes, bytes]] = defaultdict(dict)
        nb_processed = 0
        for message in messages:
            error = message.error()
            if error is not None:
                if error.code() == KafkaError._PARTITION_EOF:
                    self.eof_reached.add((message.topic(), message.partition()))
                else:
                    _error_cb(error)
                continue
            if message.value() is None:
                # ignore message with no payload, these can be generated in tests
                continue
            nb_processed += 1
            object_type = message.topic().split(".")[-1]
            objects[object_type][message.key()] = message.value()
        if objects:
            worker_fn(dict(objects))
            self.consumer.commit()
        at_eof = self.on_eof == EofBehavior.STOP and all(
            (tp.topic, tp.partition) in self.eof_reached
            for tp in self.consumer.assignment()
        )
        return nb_processed, at_eof