# Copyright (C) 2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import logging
import os
import textwrap
from typing import Optional
import click
from importlib_metadata import version
import requests
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.web.client.client import WebAPIClient
from .config import DEFAULT_CONFIG_PATH, SWH_API_ROOT, get_default_config
from .data import NoProvenanceAPIAccess, get_ignore_patterns_templates
from .setup_wizard import invoke_auth, run_setup, should_run_setup
[docs]
def get_exclude_templates_list_repr(width=0):
    """Format and return a list of ignore patterns templates
    for CLI help"""
    ignore_templates = get_ignore_patterns_templates()
    ignore_templates_list = sorted(ignore_templates.keys())
    ignore_templates_list_str = ", ".join(map(str, ignore_templates_list))
    if width > 0:
        ignore_templates_list_repr = textwrap.fill(
            ignore_templates_list_str, width=width
        )
        return ignore_templates_list_repr
    else:
        return ignore_templates_list_str 
EXCLUDE_TEMPLATES_HELP = f"""Repeatable option to exclude files and
directories using an exclusion template
(e.g., ``Python`` for common exclusion patterns
in a Python project).
Valid values are:
{get_exclude_templates_list_repr(40)}
"""
SCANNER_HELP = """Software Heritage Scanner tools
Scan a source code project to discover files and directories existing in the
Software Heritage archive.
"""
[docs]
def check_auth(ctx):
    """Check there is some authentication configured
    Issue a warning otherwise"""
    assert "config" in ctx.obj
    assert "oidc_client" in ctx.obj
    config = ctx.obj["config"]
    oidc_client = ctx.obj["oidc_client"]
    realm_name = oidc_client.realm_name
    client_id = oidc_client.client_id
    # Check auth for `production` url only
    if "keycloak_tokens" in config and config["keycloak_tokens"][realm_name][client_id]:
        auth_token = config["keycloak_tokens"][realm_name][client_id]
        from swh.auth.keycloak import KeycloakError, keycloak_error_message
        # Ensure authentication token is valid
        try:
            oidc_client.refresh_token(refresh_token=auth_token)["access_token"]
            # TODO: Display more OIDC information (username, realm, client_id)?
            msg = f'Authenticated to "{oidc_client.server_url}".'
            click.echo(click.style(msg, fg="green"))
        except KeycloakError as ke:
            msg = "Error while verifying your authentication configuration."
            click.echo(click.style(msg, fg="yellow"))
            msg = "Run `swh scanner login` to configure or verify authentication."
            click.echo(click.style(msg))
            ctx.fail(keycloak_error_message(ke))
    else:
        msg = "Warning: you are not authenticated with the Software Heritage API\n"
        msg += "Log in to get a higher rate-limit."
        click.echo(click.style(msg, fg="yellow"))
        msg = "Run `swh scanner login` to configure or verify authentication."
        click.echo(click.style(msg)) 
@swh_cli_group.group(
    name="scanner",
    context_settings=CONTEXT_SETTINGS,
    help=SCANNER_HELP,
)
@click.option(
    "-C",
    "--config-file",
    type=click.Path(dir_okay=False, path_type=str),
    help=f"Configuration file path. [default:{DEFAULT_CONFIG_PATH}]",
    envvar="SWH_CONFIG_FILENAME",
    show_default=False,
)
@click.version_option(
    version=version("swh.scanner"),
    prog_name="swh.scanner",
)
@click.pass_context
def scanner(ctx: click.Context, config_file: Optional[str]):
    ctx.ensure_object(dict)
    config_file = config_file or DEFAULT_CONFIG_PATH
    ctx.obj["config_file"] = config_file
    # Get Scanner default config
    cfg = get_default_config()
    # Let the setup do its own auth and config setup
    if ctx.invoked_subcommand != "setup" and not should_run_setup():
        # Invoke auth CLI command to get an OIDC client
        # It will load configuration file if any and populate a ctx 'config' object
        invoke_auth(ctx, config_file=config_file)
        assert ctx.obj["config"]
        # Merge scanner defaults with config object
        ctx.obj["config"] = config.merge_configs(cfg, ctx.obj["config"])
        assert ctx.obj["oidc_client"]
@scanner.command(name="login")
@click.option(
    "--username",
    "username",
    default=None,
    help=("OpenID username"),
)
@click.option(
    "--token",
    "token",
    default=None,
    help=("A valid OpenId connect token to authenticate to"),
)
@click.pass_context
def login(ctx, username: str, token: str):
    """Authentication configuration guide for Swh Api services.
    Helps in verifying authentication credentials
    """
    from swh.auth.cli import auth_config
    if should_run_setup():
        run_setup(ctx)
        click.echo("")  # Separate setup and command a little more
    else:
        ctx.forward(auth_config)
@scanner.command(name="scan")
@click.argument("root_path", default=".", type=click.Path(exists=True))
@click.option(
    "-u",
    "--api-url",
    default=None,
    metavar="API_URL",
    show_default=True,
    help="URL for the api request",
)
@click.option(
    "--exclude-template",
    "-t",
    "exclude_templates",
    metavar="EXCLUDE_TEMPLATES",
    multiple=True,
    help=EXCLUDE_TEMPLATES_HELP,
)
@click.option(
    "--exclude",
    "-x",
    "patterns",
    metavar="PATTERNS",
    multiple=True,
    help="Exclude directories using glob patterns \
    (e.g., ``*.git`` to exclude all .git directories)",
)
@click.option(
    "-f",
    "--output-format",
    "out_fmt",
    default="summary",
    show_default=True,
    type=click.Choice(["summary", "text", "json", "ndjson"], case_sensitive=False),
    help="The output format",
)
@click.option(
    "--web-ui/--no-web-ui",
    "interactive",
    is_flag=True,
    default=True,
    help="Launch the default graphical web browser to explore the results in a dashboard.",
)
@click.option(
    "--provenance",
    "provenance",
    is_flag=True,
    help="Also fetch provenance data (requires special permission from SWH).",
)
@click.option(
    "--debug-http",
    "debug_http",
    is_flag=True,
    help="Show debug information about the http request",
)
@click.option(
    "--disable-global-patterns",
    "disable_global_patterns",
    is_flag=True,
    help="Disable common and global exclusion patterns.",
)
@click.option(
    "--disable-vcs-patterns",
    "disable_vcs_patterns",
    is_flag=True,
    help="Disable vcs ignore detection for exclusion patterns",
)
@click.option(
    "-c",
    "--project-config-file",
    type=click.Path(dir_okay=False, path_type=str),
    help="Project Configuration file path.",
    show_default=False,
)
@click.option(
    "--provenance-concurrency",
    default=5,
    help="Number of concurrent connections to the web API.",
)
@click.option(
    "--provenance-batch-size",
    default=100,
    help="Batch size when querying the provenance API.",
)
@click.pass_context
def scan(
    ctx,
    root_path,
    api_url,
    exclude_templates,
    patterns,
    out_fmt,
    interactive,
    provenance,
    debug_http,
    disable_global_patterns,
    disable_vcs_patterns,
    project_config_file: Optional[str],
    provenance_concurrency,
    provenance_batch_size,
):
    """Scan a source code project to discover files and directories already
    present in the archive.
    The command opens by default an interactive dashboard after scanning. Can
    be disabled by the --no-web-ui flag.
    The command can provide different output using the --output-format option:\n
    \b
      summary: display a general summary of what the scanner found
      text: display the scan result as a text based tree-like view of all the
            file, using color to indicate the file status.
      json: write all collected data on standard output as JSON
      ndjson: write all collected data on standard output as Newline Delimited JSON
    Exclusion patterns can be set with the repeatable -x/--exclude option:\n
    \b
      pattern: glob pattern (e.g., ``*.git`` to exclude all .git directories)
    Common default exclusion patterns and exclusion patterns defined in your global
    SWH configuration file can be disabled using the --disable-global-patterns option.\n
    Version control system ignore files detection for exclusion (e.g. .gitignore,
    .hgignore, svn ignore file) can be disabled using the --disable-vcs-patterns option. \n
    """
    from pathlib import Path
    import swh.scanner.data as data
    import swh.scanner.scanner as scanner
    if should_run_setup():
        run_setup(ctx)
        click.echo("")  # Separate setup and command a little more
    root_path = os.path.abspath(root_path)
    # merge global config with per project one if any
    if project_config_file:
        project_cfg_path = Path(project_config_file)
    else:
        project_cfg_path = Path(root_path) / "swh.scanner.project.yml"
    if project_cfg_path.exists():
        ctx.obj["config"] = config.merge_configs(
            ctx.obj["config"], config.read_raw_config(str(project_cfg_path))
        )
        # Exclude from scan the per project configuration file if it is within root path
        if str(project_cfg_path.parent) in str(root_path):
            ctx.obj["config"]["scanner"]["exclude"].extend([str(project_cfg_path)])
    # override config with command parameters if provided
    if disable_global_patterns:
        ctx.obj["config"]["scanner"][
            "disable_global_patterns"
        ] = disable_global_patterns
        ctx.obj["config"]["scanner"]["exclude"] = []
    if disable_vcs_patterns:
        ctx.obj["config"]["scanner"]["disable_vcs_patterns"] = disable_vcs_patterns
    if exclude_templates is not None:
        ctx.obj["config"]["scanner"]["exclude_templates"].extend(exclude_templates)
    # check that the exclude templates are valid
    if "exclude_templates" in ctx.obj["config"]["scanner"]:
        templates = get_ignore_patterns_templates()
        for template in ctx.obj["config"]["scanner"]["exclude_templates"]:
            if template not in templates:
                err_msg = f"Unknown exclusion template '{template}'. Use one of:\n"
                ctx.fail(
                    click.style(err_msg, fg="yellow")
                    + f"{get_exclude_templates_list_repr()}"
                )
        exclude_templates = ctx.obj["config"]["scanner"]["exclude_templates"]
    if patterns is not None:
        ctx.obj["config"]["scanner"]["exclude"].extend(patterns)
    assert "url" in ctx.obj["config"]["web-api"]
    if api_url is not None:
        ctx.obj["config"]["web-api"]["url"] = api_url
    if debug_http:
        http_logger = logging.getLogger("swh.web.client.client")
        http_logger.setLevel(logging.DEBUG)
    # Check authentication only for production URL
    # TODO why do we do this?
    # TODO Should we remove the `swh scanner login` command in favor of the setup?
    if ctx.obj["config"]["web-api"]["url"] == SWH_API_ROOT:
        check_auth(ctx)
    root_path_fmt = click.format_filename(root_path)
    msg = f"Ready to scan {root_path_fmt}"
    click.echo(click.style(msg, fg="green"), err=True)
    class CLIProgress(scanner.Progress):
        def __init__(
            self,
            step: scanner.Progress.Step,
            total: Optional[int] = None,
            web_client: Optional[WebAPIClient] = None,
        ):
            self._count = 0
            self._total = total
            self._web_client = web_client
            if step == scanner.Progress.Step.DISK_SCAN:
                self._text = "local objects scanned"
            elif step == scanner.Progress.Step.KNOWN_DISCOVERY:
                self._text = "objects compared with the Software Heritage archive"
            elif step == scanner.Progress.Step.PROVENANCE:
                self._text = "provenance data fetched"
        def increment(self, count=1):
            """move the progress forward and refresh the output"""
            self._count += count
            self._display()
        def update(self, current_count, total=None):
            self._count = current_count
            self._total = total
            self._display()
        def _display(self):
            """refresh the output"""
            rate_limit = ""
            rate_limit_delay = getattr(self._web_client, "rate_limit_delay", 0)
            if rate_limit_delay > 0:
                requests_per_second = 1 / rate_limit_delay
                rate_limit = (
                    f" (rate limited: {requests_per_second:.2f} requests / seconds)"
                )
            if self._total is None:
                msg = f"\r{self._count} {self._text}{rate_limit}"
            else:
                msg = f"\r{self._count}/{self._total} {self._text}{rate_limit}"
            click.echo(msg, nl=False, err=True)
        def __enter__(self):
            return self
        def __exit__(self, *args, **kwargs):
            click.echo("", err=True)
    data.MAX_WHEREARE_BATCH = provenance_batch_size
    data.MAX_CONCURRENT_PROVENANCE_QUERIES = provenance_concurrency
    try:
        scanner.scan(
            ctx.obj["config"],
            root_path,
            out_fmt,
            interactive,
            provenance,
            debug_http,
            progress_class=CLIProgress,
        )
    except requests.HTTPError as exc:
        r = exc.response
        click.secho(
            "ERROR: Unexpected errors from the Software Heritage Archive:",
            fg="red",
        )
        click.secho(
            f"ERROR:     {r.url}",
            fg="red",
        )
        click.secho(
            f"ERROR:     {r.status_code} {r.reason}",
            fg="red",
        )
        return 2
    except NoProvenanceAPIAccess:
        msg = (
            "ERROR: Your account does not have permission to query the Provenance API\n"
        )
        msg += "(Contact the Software Heritage team to get such permission)"
        click.echo(click.style(msg, fg="red"))
        return 1
@scanner.command("setup")
@click.pass_context
def setup_cmd(ctx: click.Context):
    """Get guided through setting up the scanner
    This interactive command gives a quick explanation of what the scanner is,
    and guides you through the optional authentication as well as the config
    options, then gives you a few examples for invocations.
    This setup will run the first time you run the `scan` command, but you
    may invoke it at anytime using `swh scanner setup`."""
    run_setup(ctx)
[docs]
def main():
    return scanner(auto_envvar_prefix="SWH_SCANNER") 
if __name__ == "__main__":
    main()