Source code for swh.graph.grpc_server
# Copyright (C) 2021-2023  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
A simple tool to start the swh-graph gRPC server in Rust.
"""
import logging
import os
import shlex
import shutil
import subprocess
import aiohttp.test_utils
import aiohttp.web
from swh.graph.config import check_config
logger = logging.getLogger(__name__)
[docs]
class ExecutableNotFound(EnvironmentError):
    pass 
[docs]
def build_rust_grpc_server_cmdline(**config):
    logger.debug("Checking configuration and populating default values")
    config = check_config(config)
    port = config.pop("port", None)
    if port is None:
        port = aiohttp.test_utils.unused_port()
        logger.debug("Port not configured, using random port %s", port)
    grpc_path = config["rust_executable_dir"] + "swh-graph-grpc-serve"
    if not os.path.isfile(grpc_path):
        grpc_path = shutil.which("swh-graph-grpc-serve")
    if not grpc_path or not os.path.isfile(grpc_path):
        raise ExecutableNotFound("swh-graph-grpc-serve executable not found")
    cmd = [grpc_path]
    if config.get("masked_nodes"):
        cmd.extend(["--masked-nodes", config["masked_nodes"]])
    logger.debug("Configuration: %r", config)
    cmd.extend(["--bind", f"[::]:{port}", str(config["path"])])
    print(f"Started GRPC using dataset from {str(config['path'])}")
    return cmd, port 
[docs]
def spawn_rust_grpc_server(**config):
    cmd, port = build_rust_grpc_server_cmdline(**config)
    print(cmd)
    # XXX: shlex.join() is in 3.8
    # logger.info("Starting gRPC server: %s", shlex.join(cmd))
    logger.info("Starting gRPC server: %s", " ".join(shlex.quote(x) for x in cmd))
    env = dict(os.environ)
    if config.get("target") == "debug":
        env.setdefault("RUST_LOG", "debug,h2=info")  # h2 is very verbose at DEBUG level
    if "statsd_host" in config:
        env["STATSD_HOST"] = config["statsd_host"]
    if "statsd_port" in config:
        env["STATSD_PORT"] = str(config["statsd_port"])
    server = subprocess.Popen(cmd, env=env)
    return server, port 
[docs]
def stop_grpc_server(server: subprocess.Popen, timeout: int = 15):
    server.terminate()
    try:
        server.wait(timeout=timeout)
    except subprocess.TimeoutExpired:
        logger.warning("Server did not terminate, sending kill signal...")
        server.kill()