swh.storage.replay module#
- class swh.storage.replay.ModelObjectDeserializer(validate: bool = True, raise_on_error: bool = False, reporter: Callable[[str, bytes], None] | None = None, known_mismatched_hashes: Tuple[Tuple[str, bytes, bytes]] | None = None)[source]#
- Bases: - object- A swh.journal object deserializer that checks object validity and reports invalid objects - The deserializer will directly produce BaseModel objects from journal objects representations. - If validation is activated and the object is hashable, it will check if the computed hash matches the identifier of the object. - If the object is invalid and a ‘reporter’ function is given, it will be called with 2 arguments: - reporter(object_id, journal_msg) - Where ‘object_id’ is a string representation of the object identifier (from the journal message), and ‘journal_msg’ is the row message (bytes) retrieved from the journal. - If ‘raise_on_error’ is True, a ‘StorageArgumentException’ exception is raised. - If ‘known_mismatched_hashes’ is given, it must be a tuple of triplets (object_type, object_id, expected_id) listing objects that store invalid hash (object_id) instead of the computed expected_id, but should not be discarded (i.e. they should be replicated by the replayer despite being invalid). - Typical usage: - deserializer = ModelObjectDeserializer(validate=True, reporter=reporter_cb) client = get_journal_client( cls="kafka", value_deserializer=deserializer, **cfg) 
- swh.storage.replay.process_replay_objects(all_objects: Dict[str, List[BaseModel]], *, storage: StorageInterface) None[source]#
- swh.storage.replay.collision_aware_content_add(contents: List[ContentType], content_add_fn: Callable[[List[ContentType]], Dict[str, int]]) Dict[str, int][source]#
- Add contents to storage. If a hash collision is detected, an error is
- logged. Then this adds the other non colliding contents to the storage. 
 - Parameters:
- content_add_fn – Storage content callable 
- contents – List of contents or skipped contents to add to storage