# Copyright (C) 2017-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
from typing import List, Optional
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import (
    ListedOrigin,
    Lister,
    Task,
    TaskArguments,
    TaskPolicy,
    TaskPriority,
)
[docs]
def utcnow():
    return datetime.now(tz=timezone.utc) 
[docs]
def get_task(task_name):
    """Retrieve task object in our application instance by its fully
    qualified python name.
    Args:
        task_name (str): task's name (e.g
                         swh.loader.git.tasks.LoadDiskGitRepository)
    Returns:
        Instance of task
    """
    from swh.scheduler.celery_backend.config import app
    for module in app.conf.CELERY_IMPORTS:
        __import__(module)
    return app.tasks[task_name] 
[docs]
def create_task(
    type: str,
    policy: TaskPolicy,
    *args,
    next_run: Optional[datetime] = None,
    priority: Optional[TaskPriority] = None,
    retries_left: Optional[int] = None,
    **kwargs,
) -> Task:
    """Create a task with type and policy, scheduled for as soon as
       possible.
    Args:
        type: Type of oneshot task as per swh-scheduler's db
                    table task_type's column (Ex: load-git,
                    check-deposit)
        policy: oneshot or recurring policy
        next_run: optional date and time from which the task can be executed,
            use current time otherwise
    Returns:
        Expected dictionary for the one-shot task scheduling api
        (swh.scheduler.backend.create_tasks)
    """
    task = Task(
        policy=policy,
        type=type,
        next_run=next_run or utcnow(),
        arguments=TaskArguments(
            args=list(args) if args else [],
            kwargs=kwargs if kwargs else {},
        ),
        priority=priority,
        retries_left=retries_left,
    )
    return task 
[docs]
def create_origin_task(origin: ListedOrigin, lister: Lister) -> Task:
    if origin.lister_id != lister.id:
        raise ValueError(
            "origin.lister_id and lister.id differ", origin.lister_id, lister.id
        )
    return Task(
        type=f"load-{origin.visit_type}",
        arguments=TaskArguments(
            args=[],
            kwargs={
                "url": origin.url,
                "lister_name": lister.name,
                "lister_instance_name": lister.instance_name or None,
                **origin.extra_loader_arguments,
            },
        ),
        next_run=utcnow(),
    ) 
[docs]
def create_origin_tasks(
    origins: List[ListedOrigin], scheduler: SchedulerInterface
) -> List[Task]:
    """Returns a task dict for each origin, in the same order."""
    lister_ids = {o.lister_id for o in origins}
    listers = {
        lister.id: lister
        for lister in scheduler.get_listers_by_id(list(map(str, lister_ids)))
    }
    missing_lister_ids = lister_ids - set(listers)
    assert not missing_lister_ids, f"Missing listers: {missing_lister_ids}"
    return [create_origin_task(o, listers[o.lister_id]) for o in origins] 
[docs]
def create_oneshot_task(
    type: str, *args, next_run: Optional[datetime] = None, **kwargs
) -> Task:
    """Create a oneshot task scheduled for as soon as possible.
    Args:
        type: Type of oneshot task as per swh-scheduler's db
            table task_type's column (Ex: load-git, check-deposit)
        next_run: optional date and time from which the task can be executed,
            use current time otherwise
    Returns:
        Expected dictionary for the one-shot task scheduling api
        (:func:`swh.scheduler.backend.create_tasks`)
    """
    return create_task(type, "oneshot", *args, next_run=next_run, **kwargs)