Source code for swh.scheduler.pytest_plugin
# Copyright (C) 2020-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
from functools import partial
from importlib.metadata import entry_points
from celery.contrib.testing import worker
from celery.contrib.testing.app import TestApp, setup_default_app
import pytest
from pytest_postgresql import factories
from swh.core.db.db_utils import initialize_database_for_module
from swh.scheduler import get_scheduler
from swh.scheduler.backend import SchedulerBackend
from swh.scheduler.model import TaskType
# celery tasks for testing purpose; tasks themselves should be
# in swh/scheduler/tests/tasks.py
TASK_NAMES = ["ping", "multiping", "add", "error", "echo"]
scheduler_postgresql_proc = factories.postgresql_proc(
    load=[
        partial(
            initialize_database_for_module,
            modname="scheduler",
            version=SchedulerBackend.current_version,
        )
    ],
)
postgresql_scheduler = factories.postgresql("scheduler_postgresql_proc")
[docs]
@pytest.fixture
def swh_scheduler_class():
    return "postgresql" 
[docs]
@pytest.fixture
def swh_scheduler_config(postgresql_scheduler):
    return {
        "db": postgresql_scheduler.info.dsn,
    } 
[docs]
@pytest.fixture
def swh_scheduler(swh_scheduler_class, swh_scheduler_config):
    scheduler = get_scheduler(swh_scheduler_class, **swh_scheduler_config)
    for taskname in TASK_NAMES:
        scheduler.create_task_type(
            TaskType(
                type=f"swh-test-{taskname}",
                description=f"The {taskname} testing task",
                backend_name=f"swh.scheduler.tests.tasks.{taskname}",
                default_interval=timedelta(days=1),
                min_interval=timedelta(hours=6),
                max_interval=timedelta(days=12),
            )
        )
    return scheduler 
# this alias is used to be able to easily instantiate a db-backed Scheduler
# eg. for the RPC client/server test suite.
swh_db_scheduler = swh_scheduler
[docs]
@pytest.fixture(scope="session")
def swh_scheduler_celery_app():
    """Set up a Celery app as swh.scheduler and swh worker tests would expect it"""
    test_app = TestApp(
        set_as_current=True,
        enable_logging=True,
        task_cls="swh.scheduler.task:SWHTask",
        config={
            "accept_content": ["application/x-msgpack", "application/json"],
            "broker_url": "memory://guest@localhost//",
            "task_serializer": "msgpack",
            "result_serializer": "json",
        },
    )
    with setup_default_app(test_app, use_trap=False):
        from swh.scheduler.celery_backend import config
        config.app = test_app
        test_app.set_default()
        test_app.set_current()
        yield test_app 
[docs]
@pytest.fixture(scope="session")
def swh_scheduler_celery_includes():
    """List of task modules that should be loaded by the swh_scheduler_celery_worker on
    startup."""
    task_modules = ["swh.scheduler.tests.tasks"]
    for entrypoint in entry_points().select(group="swh.workers"):
        task_modules.extend(entrypoint.load()().get("task_modules", []))
    return task_modules 
[docs]
@pytest.fixture(scope="session")
def swh_scheduler_celery_worker(
    swh_scheduler_celery_app,
    swh_scheduler_celery_includes,
):
    """Spawn a worker"""
    for module in swh_scheduler_celery_includes:
        swh_scheduler_celery_app.loader.import_task_module(module)
    with worker.start_worker(swh_scheduler_celery_app, pool="solo") as w:
        yield w