swh.journal.client module#
- class swh.journal.client.EofBehavior(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
- Bases: - Enum- Possible behaviors when reaching the end of the log - CONTINUE = 'continue'#
 - STOP = 'stop'#
 - RESTART = 'restart'#
 
- swh.journal.client.get_journal_client(cls: str, **kwargs: Any)[source]#
- Factory function to instantiate a journal client object. - Currently, only the “kafka” journal client is supported. 
- class swh.journal.client.JournalClient(brokers: str | List[str], group_id: str, prefix: str | None = None, object_types: List[str] | None = None, privileged: bool = False, stop_after_objects: int | None = None, batch_size: int = 200, process_timeout: float | None = None, auto_offset_reset: str = 'earliest', stop_on_eof: bool | None = None, on_eof: EofBehavior | str | None = None, value_deserializer: Callable[[str, bytes], Any] | None = None, create_topics: bool = False, **kwargs)[source]#
- Bases: - object- A base client for the Software Heritage journal. - The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the prefix argument is None (default value), it will take the default value ‘swh.journal.objects’. - Clients subscribe to events specific to each object type as listed in the - object_typesargument (if unset, defaults to all existing kafka topic under the prefix).- Clients can be sharded by setting the - group_idto a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id.- Messages are processed by the - worker_fncallback passed to the process method, in batches of maximum- batch_sizemessages (defaults to 200).- The objects passed to the - worker_fncallback are the result of the kafka message converted by the- value_deserializerfunction. By default (if this argument is not given), it will produce dicts (using the- kafka_to_valuefunction). This signature of the function is:- value_deserializer(object_type: str, kafka_msg: bytes) -> Any - If the value returned by - value_deserializeris None, it is ignored and not passed the- worker_fnfunction.- Parameters:
- stop_after_objects – If set, the processing stops after processing this number of messages in total. 
- on_eof – What to do when reaching the end of each partition (keep consuming, stop, or restart from earliest offsets); defaults to continuing. This can be either a - EofBehaviorvariant or a string containing the name of one of the variants.
- stop_on_eof – (deprecated) equivalent to passing - on_eof=EofBehavior.STOP
- auto_offset_reset – sets the behavior of the client when the consumer group initializes: - 'earliest'(the default) processes all objects since the inception of the topics;- ''
- create_topics – Create kafka topics if they do not exist, not for production, this flag should only be enabled in development environments. 
 
 - Any other named argument is passed directly to KafkaConsumer(). - subscribe()[source]#
- Subscribe to topics listed in self.subscription - This can be overridden if you need, for instance, to manually assign partitions. 
 - process(worker_fn: Callable[[Dict[str, List[dict]]], None])[source]#
- Polls Kafka for a batch of messages, and calls the worker_fn with these messages. - Parameters:
- worker_fn – Function called with the messages as argument.