Source code for swh.scheduler.api.server
# Copyright (C) 2018-2022  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import os
from typing import Dict
from swh.core import config
from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler, negotiate
from swh.scheduler import get_scheduler
from swh.scheduler.exc import SchedulerException
from swh.scheduler.interface import SchedulerInterface
from .serializers import DECODERS, ENCODERS
scheduler = None
[docs]
def get_global_scheduler():
    global scheduler
    if not scheduler:
        scheduler = get_scheduler(**app.config["scheduler"])
    return scheduler 
[docs]
class SchedulerServerApp(RPCServerApp):
    extra_type_decoders = DECODERS
    extra_type_encoders = ENCODERS 
app = SchedulerServerApp(
    __name__, backend_class=SchedulerInterface, backend_factory=get_global_scheduler
)
[docs]
@app.errorhandler(SchedulerException)
def argument_error_handler(exception):
    return error_handler(exception, encode_data, status_code=400) 
[docs]
@app.errorhandler(Exception)
def my_error_handler(exception):
    return error_handler(exception, encode_data) 
[docs]
def has_no_empty_params(rule):
    return len(rule.defaults or ()) >= len(rule.arguments or ()) 
[docs]
@app.route("/")
def index():
    return """<html>
<head><title>Software Heritage scheduler RPC server</title></head>
<body>
<p>You have reached the
<a href="https://www.softwareheritage.org/">Software Heritage</a>
scheduler RPC server.<br />
See its
<a href="https://docs.softwareheritage.org/devel/swh-scheduler/">documentation
and API</a> for more information</p>
</body>
</html>""" 
[docs]
@app.route("/site-map")
@negotiate(MsgpackFormatter)
@negotiate(JSONFormatter)
def site_map():
    links = []
    for rule in app.url_map.iter_rules():
        if has_no_empty_params(rule) and hasattr(SchedulerInterface, rule.endpoint):
            links.append(
                dict(
                    rule=rule.rule,
                    description=getattr(SchedulerInterface, rule.endpoint).__doc__,
                )
            )
    # links is now a list of url, endpoint tuples
    return links 
[docs]
def load_and_check_config(config_path: str, type: str = "postgresql") -> Dict:
    """Check the minimal configuration is set to run the api or raise an
       error explanation.
    Args:
        config_path: Configuration file path to load
        type: Configuration type, for 'postgresql' type (the default), more checks are
              done.
    Raises:
        Error if the setup is not as expected
    Returns:
        configuration as a dict
    """
    if not config_path:
        raise EnvironmentError("Configuration file must be defined")
    if not os.path.exists(config_path):
        raise FileNotFoundError(f"Configuration file {config_path} does not exist")
    cfg = config.read(config_path)
    vcfg = cfg.get("scheduler")
    if not vcfg:
        raise KeyError("Missing '%scheduler' configuration")
    if type == "postgresql":
        cls = vcfg.get("cls")
        if cls not in ("local", "postgresql"):
            raise ValueError(
                "The scheduler backend can only be started with a 'postgresql' "
                "configuration"
            )
        db = vcfg.get("db")
        if not db:
            raise KeyError("Invalid configuration; missing 'db' config entry")
    return cfg 
api_cfg = None
[docs]
def make_app_from_configfile():
    """Run the WSGI app from the webserver, loading the configuration from
    a configuration file.
    SWH_CONFIG_FILENAME environment variable defines the
    configuration path to load.
    """
    global api_cfg
    if not api_cfg:
        config_path = os.environ.get("SWH_CONFIG_FILENAME")
        api_cfg = load_and_check_config(config_path)
        app.config.update(api_cfg)
    handler = logging.StreamHandler()
    app.logger.addHandler(handler)
    return app 
if __name__ == "__main__":
    print('Please use the "swh-scheduler api-server" command')