# Copyright (C) 2018-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import io
import json
from django.conf import settings
from django.contrib.admin.views.decorators import staff_member_required
from django.core.exceptions import ObjectDoesNotExist
from django.core.management import call_command
from django.core.paginator import Paginator
from django.http import FileResponse, HttpResponse, JsonResponse
from django.shortcuts import render
from django.views.decorators.http import require_POST
from swh.web.save_code_now.models import (
    SaveAuthorizedOrigin,
    SaveOriginRequest,
    SaveUnauthorizedOrigin,
)
from swh.web.save_code_now.origin_save import (
    SAVE_REQUEST_PENDING,
    SAVE_REQUEST_REJECTED,
    create_save_origin_request,
)
from swh.web.utils import datatables_pagination_params
[docs]
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_requests(request):
    return render(request, "admin/origin-save-requests.html") 
[docs]
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_filters(request):
    return render(request, "admin/origin-save-filters.html") 
def _datatables_origin_urls_response(request, urls_query_set):
    search_value = request.GET.get("search[value]")
    if search_value:
        urls_query_set = urls_query_set.filter(url__icontains=search_value)
    column_order = request.GET.get("order[0][column]")
    field_order = request.GET.get(f"columns[{column_order}][name]", "id")
    order_dir = request.GET.get("order[0][dir]", "desc")
    if order_dir == "desc":
        field_order = "-" + field_order
    urls_query_set = urls_query_set.order_by(field_order)
    table_data = {}
    table_data["draw"] = int(request.GET.get("draw", 1))
    table_data["recordsTotal"] = urls_query_set.count()
    table_data["recordsFiltered"] = urls_query_set.count()
    length, page = datatables_pagination_params(request)
    paginator = Paginator(urls_query_set, length)
    urls_query_set = paginator.page(page).object_list
    table_data["data"] = [{"url": u.url} for u in urls_query_set]
    return JsonResponse(table_data)
[docs]
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_authorized_urls_list(request):
    authorized_urls = SaveAuthorizedOrigin.objects.all()
    return _datatables_origin_urls_response(request, authorized_urls) 
[docs]
@require_POST
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_add_authorized_url(request, origin_url):
    try:
        SaveAuthorizedOrigin.objects.get(url=origin_url)
    except ObjectDoesNotExist:
        # add the new authorized url
        SaveAuthorizedOrigin.objects.create(url=origin_url)
        # check if pending save requests with that url prefix exist
        pending_save_requests = SaveOriginRequest.objects.filter(
            origin_url__startswith=origin_url, status=SAVE_REQUEST_PENDING
        )
        # create origin save tasks for previously pending requests
        for psr in pending_save_requests:
            create_save_origin_request(psr.visit_type, psr.origin_url)
        status_code = 200
    else:
        status_code = 400
    return HttpResponse(status=status_code) 
[docs]
@require_POST
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_remove_authorized_url(request, origin_url):
    try:
        entry = SaveAuthorizedOrigin.objects.get(url=origin_url)
    except ObjectDoesNotExist:
        status_code = 404
    else:
        entry.delete()
        status_code = 200
    return HttpResponse(status=status_code) 
[docs]
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_unauthorized_urls_list(request):
    unauthorized_urls = SaveUnauthorizedOrigin.objects.all()
    return _datatables_origin_urls_response(request, unauthorized_urls) 
[docs]
@require_POST
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_add_unauthorized_url(request, origin_url):
    try:
        SaveUnauthorizedOrigin.objects.get(url=origin_url)
    except ObjectDoesNotExist:
        SaveUnauthorizedOrigin.objects.create(url=origin_url)
        # check if pending save requests with that url prefix exist
        pending_save_requests = SaveOriginRequest.objects.filter(
            origin_url__startswith=origin_url, status=SAVE_REQUEST_PENDING
        )
        # mark pending requests as rejected
        for psr in pending_save_requests:
            psr.status = SAVE_REQUEST_REJECTED
            psr.save()
        status_code = 200
    else:
        status_code = 400
    return HttpResponse(status=status_code) 
[docs]
@require_POST
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_remove_unauthorized_url(request, origin_url):
    try:
        entry = SaveUnauthorizedOrigin.objects.get(url=origin_url)
    except ObjectDoesNotExist:
        status_code = 404
    else:
        entry.delete()
        status_code = 200
    return HttpResponse(status=status_code) 
[docs]
@require_POST
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_request_accept(request, visit_type, origin_url):
    try:
        SaveAuthorizedOrigin.objects.get(url=origin_url)
    except ObjectDoesNotExist:
        SaveAuthorizedOrigin.objects.create(url=origin_url)
    create_save_origin_request(visit_type, origin_url)
    return HttpResponse(status=200) 
[docs]
@require_POST
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_request_reject(request, visit_type, origin_url):
    try:
        sor = SaveOriginRequest.objects.get(
            visit_type=visit_type, origin_url=origin_url, status=SAVE_REQUEST_PENDING
        )
    except ObjectDoesNotExist:
        status_code = 404
    else:
        status_code = 200
        sor.status = SAVE_REQUEST_REJECTED
        sor.note = json.loads(request.body).get("note")
        sor.save()
    return HttpResponse(status=status_code) 
[docs]
@require_POST
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_request_remove(request, sor_id):
    try:
        entry = SaveOriginRequest.objects.get(id=sor_id)
    except ObjectDoesNotExist:
        status_code = 404
    else:
        entry.delete()
        status_code = 200
    return HttpResponse(status=status_code) 
[docs]
@staff_member_required(view_func=None, login_url=settings.LOGIN_URL)
def admin_origin_save_requests_csv_dump(request):
    output = io.StringIO()
    call_command("dump_savecodenow_data", stdout=output)
    return FileResponse(
        io.BytesIO(output.getvalue().encode()),
        filename="save_code_now_requests.csv",
        content_type="application/csv",
        as_attachment=True,
    )