# Copyright (C) 2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import logging
import sys
import time
from typing import Any, Dict, Optional
import click
from . import cli
logger = logging.getLogger(__name__)
[docs]
def destination_from_pattern(ctx: click.Context, pattern: Optional[str]):
    """Get the celery destination pattern from host and type values"""
    if pattern is None:
        logger.debug("Matching all workers")
    elif "*" in pattern:
        ctx.obj["inspect"].pattern = pattern
        ctx.obj["inspect"].matcher = "glob"
        logger.debug("Using glob pattern %s", pattern)
    else:
        destination = pattern.split(",")
        ctx.obj["inspect"].destination = destination
        logger.debug("Using destinations %s", ", ".join(destination)) 
@cli.group("celery-monitor")
@click.option(
    "--timeout", type=float, default=3.0, help="Timeout for celery remote control"
)
@click.option("--pattern", help="Celery destination pattern", default=None)
@click.pass_context
def celery_monitor(ctx: click.Context, timeout: float, pattern: Optional[str]) -> None:
    """Monitoring of Celery"""
    from swh.scheduler.celery_backend.config import app
    ctx.obj["timeout"] = timeout
    ctx.obj["inspect"] = app.control.inspect(timeout=timeout)
    destination_from_pattern(ctx, pattern)
@celery_monitor.command("ping-workers")
@click.pass_context
def ping_workers(ctx: click.Context) -> None:
    """Check which workers respond to the celery remote control"""
    response_times = {}
    def ping_callback(response):
        rtt = time.monotonic() - ping_time
        for destination in response:
            logger.debug("Got ping response from %s: %r", destination, response)
            response_times[destination] = rtt
    ctx.obj["inspect"].callback = ping_callback
    ping_time = time.monotonic()
    ret = ctx.obj["inspect"].ping()
    if not ret:
        logger.info("No response in %f seconds", time.monotonic() - ping_time)
        ctx.exit(1)
    for destination in ret:
        logger.info(
            "Got response from %s in %f seconds",
            destination,
            response_times[destination],
        )
    ctx.exit(0)
@celery_monitor.command("list-running")
@click.option(
    "--format",
    help="Output format",
    default="pretty",
    type=click.Choice(["pretty", "csv"]),
)
@click.pass_context
def list_running(ctx: click.Context, format: str):
    """List running tasks on the lister workers"""
    from ast import literal_eval
    import csv
    from operator import itemgetter
    response_times = {}
    def active_callback(response):
        rtt = time.monotonic() - active_time
        for destination in response:
            response_times[destination] = rtt
    ctx.obj["inspect"].callback = active_callback
    active_time = time.monotonic()
    ret = ctx.obj["inspect"].active()
    if not ret:
        logger.info("No response in %f seconds", time.monotonic() - active_time)
        ctx.exit(1)
    def pretty_task_arguments(task: Dict[str, Any]) -> str:
        arg_list = []
        for arg in task["args"]:
            arg_list.append(repr(arg))
        for k, v in task["kwargs"].items():
            arg_list.append(f"{k}={v!r}")
        return f'{task["name"]}({", ".join(arg_list)})'
    def get_task_data(worker: str, task: Dict[str, Any]) -> Dict[str, Any]:
        duration = time.time() - task["time_start"]
        return {
            "worker": worker,
            "name": task["name"],
            "args": literal_eval(task["args"]),
            "kwargs": literal_eval(task["kwargs"]),
            "duration": duration,
            "worker_pid": task["worker_pid"],
        }
    if format == "csv":
        writer = csv.DictWriter(
            sys.stdout, ["worker", "name", "args", "kwargs", "duration", "worker_pid"]
        )
        writer.writeheader()
        def output(data: Dict[str, Any]):
            writer.writerow(data)
    elif format == "pretty":
        def output(data: Dict[str, Any]):
            print(
                f"{data['worker']}: {pretty_task_arguments(data)} "
                f"[for {data['duration']:f}s, pid={data['worker_pid']}]"
            )
    else:
        logger.error("Unknown format %s", format)
        ctx.exit(127)
    for worker, active in sorted(ret.items()):
        if not active:
            logger.info("%s: no active tasks", worker)
            continue
        for task in sorted(active, key=itemgetter("time_start")):
            output(get_task_data(worker, task))
    ctx.exit(0)