swh.journal.pytest_plugin module#
- swh.journal.pytest_plugin.ensure_lists(value: Any) Any[source]#
- >>> ensure_lists(["foo", 42]) ['foo', 42] >>> ensure_lists(("foo", 42)) ['foo', 42] >>> ensure_lists({"a": ["foo", 42]}) {'a': ['foo', 42]} >>> ensure_lists({"a": ("foo", 42)}) {'a': ['foo', 42]} 
- swh.journal.pytest_plugin.consume_messages(consumer, kafka_prefix, expected_messages)[source]#
- Consume expected_messages from the consumer; Sort them all into a consumed_objects dict 
- swh.journal.pytest_plugin.assert_all_objects_consumed(consumed_messages: Dict, exclude: Collection | None = None)[source]#
- Check whether all objects from TEST_OBJECTS have been consumed - exclude can be a list of object types for which we do not want to compare the values (eg. for anonymized object). 
- swh.journal.pytest_plugin.kafka_prefix()[source]#
- Pick a random prefix for kafka topics on each call 
- swh.journal.pytest_plugin.kafka_consumer_group(kafka_prefix: str)[source]#
- Pick a random consumer group for kafka consumers on each call 
- swh.journal.pytest_plugin.privileged_object_types()[source]#
- Set of object types to precreate privileged topics for. 
- swh.journal.pytest_plugin.kafka_server(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str]) str[source]#
- A kafka server with existing topics - Unprivileged topics are built as - {kafka_prefix}.{object_type}with object_type from the- object_typeslist.- Privileged topics are built as - {kafka_prefix}_privileged.{object_type}with object_type from the- privileged_object_typeslist.
- swh.journal.pytest_plugin.kafka_server_base() Iterator[str][source]#
- Create a mock kafka cluster suitable for tests. - Yield a connection string. - Note: this is a generator to keep the mock broker alive during the whole test session.