# Copyright (C) 2020-2024  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Classes representing tables in the Cassandra database.
They are very close to classes found in swh.model.model, but most of
them are subtly different:
* Large objects are split into other classes (eg. RevisionRow has no
  'parents' field, because parents are stored in a different table,
  represented by RevisionParentRow)
* They have a "cols" field, which returns the list of column names
  of the table
* They only use types that map directly to Cassandra's schema (ie. no enums)
Therefore, this model doesn't reuse swh.model.model, except for types
that can be mapped to UDTs (Person and TimestampWithTimezone).
Fields may have :func:`dataclasses metadata <dataclasses.field>` keys ``fk``
if the existence of a corresponding row in a different table is almost guaranteed
(up to loaders not crashing and eventual-consistency settling down) and
``points_to`` if they are a Merkle-DAG link to another object (which is more likely
to be missing).
This is used by :func:`swh.storage.cassandra.diagram.dot_diagram`.
"""
import dataclasses
import datetime
from typing import (
    TYPE_CHECKING,
    Any,
    ClassVar,
    Dict,
    List,
    Optional,
    Tuple,
    Type,
    TypeVar,
    cast,
)
if TYPE_CHECKING:
    from _typeshed import DataclassInstance
from cassandra.util import Date
from swh.model.model import Person, TimestampWithTimezone
MAGIC_NULL_PK = b"<null>"
"""
NULLs (or all-empty blobs) are not allowed in primary keys; instead we use a
special value that can't possibly be a valid hash.
"""
T = TypeVar("T", bound="BaseRow")
[docs]
def content_index_table_name(algo: str, skipped_content: bool) -> str:
    """Given an algorithm name, returns the name of one of the 'content_by_*'
    and 'skipped_content_by_*' tables that serve as index for the 'content'
    and 'skipped_content' tables based on this algorithm's hashes.
    For now it is a simple substitution, but future versions may append a version
    number to it, if needed for schema updates."""
    if skipped_content:
        return f"skipped_content_by_{algo}"
    else:
        return f"content_by_{algo}" 
[docs]
class BaseRow:
    TABLE: ClassVar[str]
    PARTITION_KEY: ClassVar[Tuple[str, ...]]
    CLUSTERING_KEY: ClassVar[Tuple[str, ...]] = ()
[docs]
    @classmethod
    def denullify_clustering_key(self, ck: Tuple) -> Tuple:
        """If this class has a Optional fields used as a clustering key, this replaces
        such values from the given clustering key so it is suitable for sorting purposes
        """
        return ck 
[docs]
    @classmethod
    def from_dict(cls: Type[T], d: Dict[str, Any]) -> T:
        return cls(**d) 
[docs]
    @classmethod
    def cols(cls) -> List[str]:
        return [
            field.name for field in dataclasses.fields(cast("DataclassInstance", cls))
        ] 
[docs]
    def to_dict(self) -> Dict[str, Any]:
        return dataclasses.asdict(cast("DataclassInstance", self)) 
 
[docs]
@dataclasses.dataclass
class MigrationRow(BaseRow):
    TABLE = "migration"
    PARTITION_KEY = ("id",)
    id: str
    dependencies: set[str]
    min_read_version: str
    status: str
    """``pending``/``running``/``completed``""" 
[docs]
@dataclasses.dataclass
class ContentRow(BaseRow):
    TABLE = "content"
    PARTITION_KEY: ClassVar[Tuple[str, ...]] = ("sha256",)
    CLUSTERING_KEY = (
        "sha1",
        "sha1_git",
        "blake2s256",
    )
    sha1: bytes
    sha1_git: bytes
    sha256: bytes
    blake2s256: bytes
    length: int
    ctime: Optional[datetime.datetime]
    """creation time, i.e. time of (first) injection into the storage"""
    status: str 
[docs]
@dataclasses.dataclass
class SkippedContentRow(BaseRow):
    TABLE = "skipped_content"
    PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256")
    sha1: Optional[bytes]
    sha1_git: Optional[bytes]
    sha256: Optional[bytes]
    blake2s256: Optional[bytes]
    length: Optional[int]
    ctime: Optional[datetime.datetime]
    """creation time, i.e. time of (first) injection into the storage"""
    status: str
    reason: str
    origin: str
[docs]
    @classmethod
    def denullify_clustering_key(self, ck: Tuple) -> Tuple:
        return tuple(MAGIC_NULL_PK if v is None else v for v in ck) 
[docs]
    @classmethod
    def from_dict(cls, d: Dict[str, Any]) -> "SkippedContentRow":
        d = d.copy()
        for k in ("sha1", "sha1_git", "sha256", "blake2s256"):
            if d[k] == MAGIC_NULL_PK:
                d[k] = None
        return super().from_dict(d) 
 
[docs]
@dataclasses.dataclass
class DirectoryRow(BaseRow):
    TABLE = "directory"
    PARTITION_KEY = ("id",)
    id: bytes
    raw_manifest: Optional[bytes]
    """NULL if the object can be rebuild from (sorted) entries""" 
[docs]
@dataclasses.dataclass
class DirectoryEntryRow(BaseRow):
    TABLE = "directory_entry"
    PARTITION_KEY = ("directory_id",)
    CLUSTERING_KEY = ("name",)
    directory_id: bytes = dataclasses.field(metadata={"fk": ["directory.id"]})
    name: bytes
    """path name, relative to containing dir"""
    target: bytes = dataclasses.field(
        metadata={
            "points_to": [
                "content.sha1_git",
                "skipped_content.sha1_git",
                "directory.id",
                "revision.id",
            ]
        }
    )
    perms: int
    """unix-like permissions"""
    type: str
    """target type""" 
[docs]
@dataclasses.dataclass
class RevisionRow(BaseRow):
    TABLE = "revision"
    PARTITION_KEY = ("id",)
    id: bytes
    date: Optional[TimestampWithTimezone]
    committer_date: Optional[TimestampWithTimezone]
    type: str
    directory: bytes = dataclasses.field(metadata={"points_to": ["directory.id"]})
    """source code "root" directory"""
    message: bytes
    author: Person
    committer: Person
    synthetic: bool
    """true iff revision has been created by Software Heritage"""
    metadata: str
    """extra metadata as JSON(tarball checksums, etc...)"""
    extra_headers: dict
    """extra commit information as (tuple(key, value), ...)"""
    raw_manifest: Optional[bytes]
    """NULL if the object can be rebuild from other cells and revision_parent.""" 
[docs]
@dataclasses.dataclass
class RevisionParentRow(BaseRow):
    TABLE = "revision_parent"
    PARTITION_KEY = ("id",)
    CLUSTERING_KEY = ("parent_rank",)
    id: bytes = dataclasses.field(metadata={"fk": ["revision.id"]})
    parent_rank: int
    """parent position in merge commits, 0-based"""
    parent_id: bytes = dataclasses.field(metadata={"points_to": ["revision.id"]}) 
[docs]
@dataclasses.dataclass
class ReleaseRow(BaseRow):
    TABLE = "release"
    PARTITION_KEY = ("id",)
    id: bytes
    target_type: str
    target: bytes = dataclasses.field(
        metadata={
            "points_to": [
                "content.sha1_git",
                "skipped_content.sha1_git",
                "directory.id",
                "revision.id",
            ]
        }
    )
    date: TimestampWithTimezone
    name: bytes
    message: bytes
    author: Person
    synthetic: bool
    """true iff release has been created by Software Heritage"""
    raw_manifest: Optional[bytes]
    """NULL if the object can be rebuild from other cells""" 
[docs]
@dataclasses.dataclass
class SnapshotRow(BaseRow):
    TABLE = "snapshot"
    PARTITION_KEY = ("id",)
    id: bytes 
[docs]
@dataclasses.dataclass
class SnapshotBranchRow(BaseRow):
    """
    For a given snapshot_id, branches are sorted by their name,
    allowing easy pagination.
    """
    TABLE = "snapshot_branch"
    PARTITION_KEY = ("snapshot_id",)
    CLUSTERING_KEY = ("name",)
    snapshot_id: bytes = dataclasses.field(metadata={"fk": ["snapshot.id"]})
    name: bytes
    target_type: Optional[str]
    target: Optional[bytes] = dataclasses.field(
        metadata={
            "points_to": [
                "content.sha1_git",
                "skipped_content.sha1_git",
                "revision.id",
                "release.id",
            ]
        }
    ) 
[docs]
@dataclasses.dataclass
class OriginVisitRow(BaseRow):
    TABLE = "origin_visit"
    PARTITION_KEY = ("origin",)
    CLUSTERING_KEY = ("visit",)
    origin: str = dataclasses.field(metadata={"fk": ["origin.url"]})
    visit: int
    date: datetime.datetime
    type: str 
[docs]
@dataclasses.dataclass
class OriginVisitStatusRow(BaseRow):
    TABLE = "origin_visit_status"
    PARTITION_KEY = ("origin",)
    CLUSTERING_KEY = ("visit", "date")
    origin: str = dataclasses.field(metadata={"fk": ["origin_visit.origin"]})
    visit: int = dataclasses.field(metadata={"fk": ["origin_visit.visit"]})
    date: datetime.datetime
    type: str
    status: str
    metadata: str
    snapshot: bytes = dataclasses.field(metadata={"fk": ["snapshot.id"]})
[docs]
    @classmethod
    def from_dict(cls: Type[T], d: Dict[str, Any]) -> T:
        return cls(**d) 
 
[docs]
@dataclasses.dataclass
class OriginRow(BaseRow):
    TABLE = "origin"
    PARTITION_KEY = ("sha1",)
    sha1: bytes
    url: str
    next_visit_id: int
    """
    We need integer visit ids for compatibility with the pgsql
    storage, so we're using lightweight transactions with this trick:
    https://stackoverflow.com/a/29391877/539465
    """ 
[docs]
@dataclasses.dataclass
class ObjectCountRow(BaseRow):
    TABLE = "object_count"
    PARTITION_KEY = ("partition_key",)
    CLUSTERING_KEY = ("object_type",)
    partition_key: int
    object_type: str
    count: int 
[docs]
@dataclasses.dataclass
class ExtIDRow(BaseRow):
    TABLE = "extid"
    PARTITION_KEY = ("extid_type", "extid")
    CLUSTERING_KEY = ("extid_version", "target_type", "target")
    extid_type: str
    extid: bytes
    extid_version: int
    target_type: str
    target: bytes 
[docs]
@dataclasses.dataclass
class ExtIDByTargetRow(BaseRow):
    TABLE = "extid_by_target"
    PARTITION_KEY = ("target_type", "target")
    CLUSTERING_KEY = ("target_token",)
    target_type: str
    target: bytes = dataclasses.field(metadata={"fk": ["extid.target"]})
    target_token: int
    """value of token(pk) on the "primary" table""" 
[docs]
@dataclasses.dataclass(frozen=True)
class ObjectReferenceRow(BaseRow):
    TABLE = "object_references"
    PARTITION_KEY = ("target_type", "target")
    CLUSTERING_KEY = ("source_type", "source")
    target_type: str
    target: bytes
    source_type: str
    source: bytes 
[docs]
@dataclasses.dataclass(frozen=True)
class ObjectReferencesTableRow(BaseRow):
    TABLE = "object_references_table"
    PARTITION_KEY = ("pk",)
    CLUSTERING_KEY = ("name",)
    pk: int
    """always zero, puts everything in the same Cassandra partition for faster querying"""
    name: str
    year: int
    """ISO year."""
    week: int
    """ISO week."""
    start: Date
    end: Date