Source code for swh.storage.proxies.retry
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import traceback
from tenacity import RetryCallState, retry, stop_after_attempt, wait_random_exponential
from tenacity.wait import wait_base
from swh.core.api import TransientRemoteException
from swh.storage import StorageSpec, get_storage
from swh.storage.exc import NonRetryableException
from swh.storage.interface import StorageInterface
logger = logging.getLogger(__name__)
[docs]
def should_retry_adding(retry_state: RetryCallState) -> bool:
    """Retry if the error/exception is (probably) not about a caller error"""
    attempt = retry_state.outcome
    assert attempt
    if attempt.failed:
        error = attempt.exception()
        if isinstance(error, NonRetryableException):
            # Don't issue retries for persistent exceptions
            return False
        elif isinstance(error, (KeyboardInterrupt, SystemExit)):
            return False
        else:
            # Other exception
            module = getattr(error, "__module__", None)
            if module:
                error_name = error.__module__ + "." + error.__class__.__name__
            else:
                error_name = error.__class__.__name__
            logger.warning(
                "Retrying RPC call",
                exc_info=False,
                extra={
                    "swh_type": "storage_retry",
                    "swh_exception_type": error_name,
                    "swh_exception": traceback.format_exc(),
                },
            )
            return True
    else:
        # No exception
        return False 
[docs]
class wait_transient_exceptions(wait_base):
    """Wait longer when servers return HTTP 503."""
    def __init__(self, wait: float) -> None:
        self.wait = wait
    def __call__(self, retry_state: RetryCallState) -> float:
        attempt = retry_state.outcome
        assert attempt
        if attempt.failed and isinstance(attempt.exception(), TransientRemoteException):
            return self.wait
        else:
            return 0.0 
swh_retry = retry(
    retry=should_retry_adding,
    wait=wait_random_exponential(multiplier=1, max=10) + wait_transient_exceptions(10),
    stop=stop_after_attempt(3),
    reraise=True,
)
[docs]
def retry_function(storage, attribute_name):
    @swh_retry
    def newf(*args, **kwargs):
        return getattr(storage, attribute_name)(*args, **kwargs)
    return newf 
[docs]
class RetryingProxyStorage:
    """Storage implementation which retries adding objects when it specifically
    fails (hash collision, integrity error).
    """
    def __init__(self, storage: StorageSpec):
        self.storage: StorageInterface = get_storage(**storage)
        for attribute_name in dir(StorageInterface):
            if attribute_name.startswith("_"):
                continue
            attribute = getattr(self.storage, attribute_name)
            if hasattr(attribute, "__call__"):
                setattr(
                    self, attribute_name, retry_function(self.storage, attribute_name)
                )