# Copyright (C) 2021  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
import os
import time
from typing import Dict, List, Optional, Tuple
import requests
logger = logging.getLogger(__name__)
[docs]
class History:
    """Manage the historical data of the counters"""
    def __init__(
        self,
        prometheus_host: str,
        prometheus_port: int,
        live_data_start: int,
        cache_base_directory: str,
        interval: str = "12h",
        prometheus_collection: str = "swh_archive_object_total",
        query_range_uri="/api/v1/query_range",
        labels: Dict[str, str] = {},
    ):
        self.prometheus_host = prometheus_host
        self.prometheus_port = prometheus_port
        self.cache_base_directory = cache_base_directory
        self.live_data_start = live_data_start
        self.interval = interval
        self.prometheus_collection = prometheus_collection
        self.query_range_uri = query_range_uri
        self.labels = labels
    def _validate_filename(self, filename: str):
        if "/" in str(filename):
            raise ValueError("filename must not contain path information")
    def _compute_url(
        self,
        object: str,
        end: int,
    ) -> Tuple[str, Dict[str, str]]:
        """Compute the api url to request data from, specific to a label.
        Args:
            object: object_type/label data (ex: content, revision, ...)
            end: retrieve the data until this date (timestamp)
        Returns:
            The api url to fetch the label's data
        """
        labels = self.labels.copy()
        labels["object_type"] = object
        formated_labels = ",".join([f'{k}="{v}"' for k, v in labels.items()])
        url = (
            f"http://{self.prometheus_host}:{self.prometheus_port}/"
            f"{self.query_range_uri}"
        )
        params = {
            "query": f"sum(max by (object_type)"
            f"({self.prometheus_collection}{{{formated_labels}}}))",
            "start": f"{self.live_data_start}",
            "end": f"{end}",
            "step": f"{self.interval}",
        }
        return (url, params)
[docs]
    def get_history(self, cache_file: str) -> Dict:
        self._validate_filename(cache_file)
        path = f"{self.cache_base_directory}/{cache_file}"
        with open(path, "r") as f:
            return json.load(f) 
    def _adapt_format(self, item: List) -> List:
        """Javascript expects timestamps to be in milliseconds
        and counter values as floats
        Args
            item: List of 2 elements, timestamp and counter
        Return:
            Normalized tuple (timestamp in js expected time, counter as float)
        """
        timestamp = int(item[0])
        counter_value = item[1]
        return [timestamp * 1000, float(counter_value)]
    def _get_timestamp_history(
        self,
        object: str,
    ) -> List:
        """Return the live values of an object"""
        result = []
        now = int(time.time())
        (url, params) = self._compute_url(
            object=object,
            end=now,
        )
        response = requests.get(url, params)
        if response.ok:
            data = response.json()
            # data answer format:
            # {"status":"success","data":{"result":[{"values":[[1544586427,"5375557897"]...  # noqa
            # Prometheus-provided data has to be adapted to js expectations
            result = [
                self._adapt_format(i) for i in data["data"]["result"][0]["values"]
            ]
        return result
[docs]
    def refresh_history(
        self,
        cache_file: str,
        objects: List[str],
        static_file: Optional[str] = None,
    ):
        self._validate_filename(cache_file)
        if static_file is not None:
            static_data = self.get_history(static_file)
        else:
            static_data = {}
        # for live content, we retrieve existing data and merges with the new one
        live_data = {}
        for object in objects:
            prometheus_data = self._get_timestamp_history(object=object)
            live_data[object] = static_data.get(object, []) + prometheus_data
        target_file = f"{self.cache_base_directory}/{cache_file}"
        tmp_file = f"{target_file}.tmp"
        with open(tmp_file, "w") as f:
            f.write(json.dumps(live_data))
        os.rename(tmp_file, target_file)