Source code for swh.storage.metrics
# Copyright (C) 2019  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from contextlib import contextmanager
from functools import wraps
import logging
from time import monotonic
from types import TracebackType
from typing import Dict, Optional, Type
from swh.core.statsd import statsd
logger = logging.getLogger(__name__)
OPERATIONS_METRIC = "swh_storage_operations_total"
OPERATIONS_UNIT_METRIC = "swh_storage_operations_{unit}_total"
DURATION_METRIC = "swh_storage_request_duration_seconds"
[docs]
def timed(f):
    """Time that function!"""
    @wraps(f)
    def d(*a, **kw):
        with statsd.timed(DURATION_METRIC, tags={"endpoint": f.__name__}):
            return f(*a, **kw)
    return d 
[docs]
class DifferentialTimer:
    """Compute differential timing metrics and send them to StatsD.
    The DifferentialTimer will send the ``metric`` to statsd, computing the
    difference between the time spent within the whole block, with the time
    spent within any blocks wrapped in the :meth:`inner` decorator.
    If an exception is raised,when sending the metric, the DifferentialTimer
    will add a ``inner_exc`` (when the exception is raised by an inner-timed
    section) or ``outer_exc`` tag with the name of the exception class that was
    raised .
    Arguments:
      metric: name of the timing metric that is sent to StatsD when the context
        manager exits
      tags: tags to attach to the metric when it is sent
    Example:
      To generate a ``metric_seconds`` timed metric, recording the overhead of
      running ``run_pre_processing`` and ``run_post_processing``, use:
      .. code-block:: python
          with DifferentialTimer("metric_seconds") as t:
              run_pre_processing()
              with t.inner():
                  run_inner_method()
              run_post_processing()
    """
    def __init__(self, metric: str, tags: Optional[Dict[str, str]] = None) -> None:
        self.start_ts: Optional[float] = None
        self.inner_elapsed = 0
        self.exc_source = "outer"
        self.metric = metric
        self.tags = tags or {}
    def __enter__(self) -> "DifferentialTimer":
        self.start_ts = monotonic()
        return self
[docs]
    @contextmanager
    def inner(self):
        """Add the duration of this block to the inner elapsed time counter."""
        start = monotonic()
        try:
            yield
        except BaseException:
            self.exc_source = "inner"
            raise
        finally:
            self.inner_elapsed += monotonic() - start 
    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        _exc_value: Optional[BaseException],
        _traceback: Optional[TracebackType],
    ) -> None:
        if self.start_ts is None:
            logger.warning("DifferentialTimer exited without being started")
            return
        total_elapsed = monotonic() - self.start_ts
        extra_tags = {}
        if exc_type:
            extra_tags[f"{self.exc_source}_exc"] = exc_type.__name__
        if self.inner_elapsed == 0 and not exc_type:
            logger.warning("DifferentialTimer didn't record calls to `inner`?")
        # Statsd expects timings in milliseconds
        value = (total_elapsed - self.inner_elapsed) * 1000
        statsd.timing(
            self.metric,
            value,
            tags={**self.tags, **extra_tags},
        ) 
[docs]
def send_metric(metric, count, method_name):
    """Send statsd metric with count for method `method_name`
    If count is 0, the metric is discarded.  If the metric is not
    parseable, the metric is discarded with a log message.
    Args:
        metric (str): Metric's name (e.g content:add, content:add:bytes)
        count (int): Associated value for the metric
        method_name (str): Method's name
    Returns:
        Bool to explicit if metric has been set or not
    """
    if count == 0:
        return False
    metric_type = metric.split(":")
    _length = len(metric_type)
    if _length == 2:
        object_type, operation = metric_type
        metric_name = OPERATIONS_METRIC
    elif _length == 3:
        object_type, operation, unit = metric_type
        metric_name = OPERATIONS_UNIT_METRIC.format(unit=unit)
    else:
        logging.warning("Skipping unknown metric {%s: %s}" % (metric, count))
        return False
    statsd.increment(
        metric_name,
        count,
        tags={
            "endpoint": method_name,
            "object_type": object_type,
            "operation": operation,
        },
    )
    return True 
[docs]
def process_metrics(f):
    """Increment object counters for the decorated function."""
    @wraps(f)
    def d(*a, **kw):
        r = f(*a, **kw)
        for metric, count in r.items():
            send_metric(metric=metric, count=count, method_name=f.__name__)
        return r
    return d