# Copyright (C) 2015-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import Counter
import json
from typing import Dict, Iterable, List, Optional, Tuple, Union
import warnings
import attr
import psycopg
import psycopg_pool
from swh.core.db.common import db_transaction
from swh.indexer.storage.interface import IndexerStorageInterface
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.model import SHA1_SIZE
from swh.storage.exc import StorageDBError
from swh.storage.utils import get_partition_bounds_bytes
from . import converters
from .db import Db
from .exc import DuplicateId, IndexerStorageArgumentException
from .interface import PagedResult, Sha1
from .metrics import process_metrics, send_metric, timed
from .model import (
ContentLicenseRow,
ContentMetadataRow,
ContentMimetypeRow,
DirectoryIntrinsicMetadataRow,
OriginExtrinsicMetadataRow,
OriginIntrinsicMetadataRow,
)
from .writer import JournalWriter
INDEXER_CFG_KEY = "indexer_storage"
MAPPING_NAMES = ["cff", "codemeta", "gemspec", "maven", "npm", "pkg-info"]
[docs]
def sanitize_json(doc):
"""Recursively replaces NUL characters, as postgresql does not allow
them in text fields."""
if isinstance(doc, str):
return doc.replace("\x00", "")
elif not hasattr(doc, "__iter__"):
return doc
elif isinstance(doc, dict):
return {sanitize_json(k): sanitize_json(v) for (k, v) in doc.items()}
elif isinstance(doc, (list, tuple)):
return [sanitize_json(v) for v in doc]
else:
raise TypeError(f"Unexpected object type in sanitize_json: {doc}")
[docs]
def get_indexer_storage(cls: str, **kwargs) -> IndexerStorageInterface:
"""Instantiate an indexer storage implementation of class `cls` with arguments
`kwargs`.
Args:
cls: indexer storage class (local, remote or memory)
kwargs: dictionary of arguments passed to the
indexer storage class constructor
Returns:
an instance of swh.indexer.storage
Raises:
ValueError if passed an unknown storage class.
"""
from swh.core.config import get_swh_backend_module
if "args" in kwargs:
warnings.warn(
'Explicit "args" key is deprecated, use keys directly instead.',
DeprecationWarning,
)
kwargs = kwargs["args"]
_, BackendClass = get_swh_backend_module(INDEXER_CFG_KEY, cls)
assert BackendClass is not None
check_config = kwargs.pop("check_config", {})
idx_storage = BackendClass(**kwargs)
if check_config:
if not idx_storage.check_config(**check_config):
raise EnvironmentError("Indexer storage check config failed")
return idx_storage
[docs]
def check_id_duplicates(data):
"""
If any two row models in `data` have the same unique key, raises
a `ValueError`.
Values associated to the key must be hashable.
Args:
data (List[dict]): List of dictionaries to be inserted
>>> tool1 = {"name": "foo", "version": "1.2.3", "configuration": {}}
>>> tool2 = {"name": "foo", "version": "1.2.4", "configuration": {}}
>>> check_id_duplicates([
... ContentLicenseRow(id=b'foo', tool=tool1, license="GPL"),
... ContentLicenseRow(id=b'foo', tool=tool2, license="GPL"),
... ])
>>> check_id_duplicates([
... ContentLicenseRow(id=b'foo', tool=tool1, license="AGPL"),
... ContentLicenseRow(id=b'foo', tool=tool1, license="AGPL"),
... ])
Traceback (most recent call last):
...
swh.indexer.storage.exc.DuplicateId: [{'id': b'foo', 'license': 'AGPL', 'tool_configuration': '{}', 'tool_name': 'foo', 'tool_version': '1.2.3'}]
""" # noqa
counter = Counter(tuple(sorted(item.unique_key().items())) for item in data)
duplicates = [id_ for (id_, count) in counter.items() if count >= 2]
if duplicates:
raise DuplicateId(list(map(dict, duplicates)))
[docs]
class IndexerStorage:
"""SWH Indexer Storage Datastore"""
current_version = 137
def __init__(self, db, min_pool_conns=1, max_pool_conns=10, journal_writer=None):
"""
Args:
db: either a libpq connection string, or a psycopg connection
journal_writer: configuration passed to
`swh.journal.writer.get_journal_writer`
"""
self.journal_writer = JournalWriter(journal_writer)
try:
if isinstance(db, str):
self._pool = psycopg_pool.ConnectionPool(
conninfo=db,
min_size=min_pool_conns,
max_size=max_pool_conns,
)
self._db = None
else:
self._pool = None
self._db = Db(db)
except psycopg.OperationalError as e:
raise StorageDBError(e)
[docs]
def get_db(self):
if self._db:
return self._db
return Db.from_pool(self._pool)
[docs]
def put_db(self, db):
if db is not self._db:
db.put_conn()
def _join_indexer_configuration(self, entries, db, cur):
"""Replaces ``entry.indexer_configuration_id`` with a full tool dict
in ``entry.tool``."""
joined_entries = []
# usually, all the additions in a batch are from the same indexer,
# so this cache allows doing a single query for all the entries.
tool_cache = {}
for entry in entries:
# get the tool used to generate this addition
tool_id = entry.indexer_configuration_id
assert tool_id
if tool_id not in tool_cache:
tool_cache[tool_id] = dict(
self._tool_get_from_id(tool_id, db=db, cur=cur)
)
del tool_cache[tool_id]["id"]
entry = attr.evolve(
entry, tool=tool_cache[tool_id], indexer_configuration_id=None
)
joined_entries.append(entry)
return joined_entries
[docs]
@timed
@db_transaction()
def check_config(self, *, check_write, db=None, cur=None):
# Check permissions on one of the tables
if check_write:
check = "INSERT"
else:
check = "SELECT"
cur.execute(
"select has_table_privilege(current_user, 'content_mimetype', %s)", # noqa
(check,),
)
return cur.fetchone()[0]
[docs]
@timed
@db_transaction()
def content_mimetype_missing(
self, mimetypes: Iterable[Dict], db=None, cur=None
) -> List[Tuple[Sha1, int]]:
return [obj[0] for obj in db.content_mimetype_missing_from_list(mimetypes, cur)]
[docs]
@timed
@db_transaction()
def get_partition(
self,
indexer_type: str,
indexer_configuration_id: int,
partition_id: int,
nb_partitions: int,
page_token: Optional[str] = None,
limit: int = 1000,
with_textual_data=False,
db=None,
cur=None,
) -> PagedResult[Sha1]:
"""Retrieve ids of content with `indexer_type` within within partition partition_id
bound by limit.
Args:
**indexer_type**: Type of data content to index (mimetype, etc...)
**indexer_configuration_id**: The tool used to index data
**partition_id**: index of the partition to fetch
**nb_partitions**: total number of partitions to split into
**page_token**: opaque token used for pagination
**limit**: Limit result (default to 1000)
**with_textual_data** (bool): Deal with only textual content (True) or all
content (all contents by defaults, False)
Raises:
IndexerStorageArgumentException for;
- limit to None
- wrong indexer_type provided
Returns:
PagedResult of Sha1. If next_page_token is None, there is no more data to
fetch
"""
if limit is None:
raise IndexerStorageArgumentException("limit should not be None")
if indexer_type not in db.content_indexer_names:
err = f"Wrong type. Should be one of [{','.join(db.content_indexer_names)}]"
raise IndexerStorageArgumentException(err)
start, end = get_partition_bounds_bytes(partition_id, nb_partitions, SHA1_SIZE)
if page_token is not None:
start = hash_to_bytes(page_token)
if end is None:
end = b"\xff" * SHA1_SIZE
next_page_token: Optional[str] = None
ids = [
row[0]
for row in db.content_get_range(
indexer_type,
start,
end,
indexer_configuration_id,
limit=limit + 1,
with_textual_data=with_textual_data,
cur=cur,
)
]
if len(ids) >= limit:
next_page_token = hash_to_hex(ids[-1])
ids = ids[:limit]
assert len(ids) <= limit
return PagedResult(results=ids, next_page_token=next_page_token)
[docs]
@timed
@db_transaction()
def content_mimetype_get_partition(
self,
indexer_configuration_id: int,
partition_id: int,
nb_partitions: int,
page_token: Optional[str] = None,
limit: int = 1000,
db=None,
cur=None,
) -> PagedResult[Sha1]:
return self.get_partition(
"mimetype",
indexer_configuration_id,
partition_id,
nb_partitions,
page_token=page_token,
limit=limit,
db=db,
cur=cur,
)
[docs]
@timed
@process_metrics
@db_transaction()
def content_mimetype_add(
self,
mimetypes: List[ContentMimetypeRow],
db=None,
cur=None,
) -> Dict[str, int]:
mimetypes_with_tools = self._join_indexer_configuration(
mimetypes, db=db, cur=cur
)
check_id_duplicates(mimetypes_with_tools)
self.journal_writer.write_additions("content_mimetype", mimetypes_with_tools)
db.mktemp_content_mimetype(cur)
db.copy_to(
[m.to_dict() for m in mimetypes],
"tmp_content_mimetype",
["id", "mimetype", "encoding", "indexer_configuration_id"],
cur,
)
count = db.content_mimetype_add_from_temp(cur)
return {"content_mimetype:add": count}
[docs]
@timed
@db_transaction()
def content_mimetype_get(
self, ids: Iterable[Sha1], db=None, cur=None
) -> List[ContentMimetypeRow]:
return [
ContentMimetypeRow.from_dict(
converters.db_to_mimetype(dict(zip(db.content_mimetype_cols, c)))
)
for c in db.content_mimetype_get_from_list(ids, cur)
]
[docs]
@timed
@db_transaction()
def content_fossology_license_get(
self, ids: Iterable[Sha1], db=None, cur=None
) -> List[ContentLicenseRow]:
return [
ContentLicenseRow.from_dict(
converters.db_to_fossology_license(
dict(zip(db.content_fossology_license_cols, c))
)
)
for c in db.content_fossology_license_get_from_list(ids, cur)
]
[docs]
@timed
@process_metrics
@db_transaction()
def content_fossology_license_add(
self,
licenses: List[ContentLicenseRow],
db=None,
cur=None,
) -> Dict[str, int]:
licenses_with_tools = self._join_indexer_configuration(licenses, db=db, cur=cur)
check_id_duplicates(licenses_with_tools)
self.journal_writer.write_additions(
"content_fossology_license", licenses_with_tools
)
db.mktemp_content_fossology_license(cur)
db.copy_to(
[license.to_dict() for license in licenses],
tblname="tmp_content_fossology_license",
columns=["id", "license", "indexer_configuration_id"],
cur=cur,
)
count = db.content_fossology_license_add_from_temp(cur)
return {"content_fossology_license:add": count}
[docs]
@timed
@db_transaction()
def content_fossology_license_get_partition(
self,
indexer_configuration_id: int,
partition_id: int,
nb_partitions: int,
page_token: Optional[str] = None,
limit: int = 1000,
db=None,
cur=None,
) -> PagedResult[Sha1]:
return self.get_partition(
"fossology_license",
indexer_configuration_id,
partition_id,
nb_partitions,
page_token=page_token,
limit=limit,
with_textual_data=True,
db=db,
cur=cur,
)
[docs]
@timed
@db_transaction()
def content_metadata_missing(
self, metadata: Iterable[Dict], db=None, cur=None
) -> List[Tuple[Sha1, int]]:
return [obj[0] for obj in db.content_metadata_missing_from_list(metadata, cur)]
[docs]
@timed
@db_transaction()
def content_metadata_get(
self, ids: Iterable[Sha1], db=None, cur=None
) -> List[ContentMetadataRow]:
return [
ContentMetadataRow.from_dict(
converters.db_to_metadata(dict(zip(db.content_metadata_cols, c)))
)
for c in db.content_metadata_get_from_list(ids, cur)
]
[docs]
@timed
@process_metrics
@db_transaction()
def content_metadata_add(
self,
metadata: List[ContentMetadataRow],
db=None,
cur=None,
) -> Dict[str, int]:
metadata_with_tools = self._join_indexer_configuration(metadata, db=db, cur=cur)
check_id_duplicates(metadata_with_tools)
self.journal_writer.write_additions("content_metadata", metadata_with_tools)
db.mktemp_content_metadata(cur)
rows = [m.to_dict() for m in metadata]
for row in rows:
row["metadata"] = sanitize_json(row["metadata"])
db.copy_to(
rows,
"tmp_content_metadata",
["id", "metadata", "indexer_configuration_id"],
cur,
)
count = db.content_metadata_add_from_temp(cur)
return {
"content_metadata:add": count,
}
[docs]
@timed
@db_transaction()
def origin_intrinsic_metadata_search_fulltext(
self, conjunction: List[str], limit: int = 100, db=None, cur=None
) -> List[OriginIntrinsicMetadataRow]:
return [
OriginIntrinsicMetadataRow.from_dict(
converters.db_to_metadata(
dict(zip(db.origin_intrinsic_metadata_cols, c))
)
)
for c in db.origin_intrinsic_metadata_search_fulltext(
conjunction, limit=limit, cur=cur
)
]
[docs]
@timed
@db_transaction()
def indexer_configuration_add(self, tools, db=None, cur=None):
db.mktemp_indexer_configuration(cur)
db.copy_to(
tools,
"tmp_indexer_configuration",
["tool_name", "tool_version", "tool_configuration"],
cur,
)
tools = db.indexer_configuration_add_from_temp(cur)
results = [dict(zip(db.indexer_configuration_cols, line)) for line in tools]
send_metric(
"indexer_configuration:add",
len(results),
method_name="indexer_configuration_add",
)
return results
[docs]
@timed
@db_transaction()
def indexer_configuration_get(self, tool, db=None, cur=None):
tool_conf = tool["tool_configuration"]
if isinstance(tool_conf, dict):
tool_conf = json.dumps(tool_conf)
idx = db.indexer_configuration_get(
tool["tool_name"], tool["tool_version"], tool_conf
)
if not idx:
return None
return dict(zip(db.indexer_configuration_cols, idx))
@db_transaction()
def _tool_get_from_id(self, id_, db, cur):
tool = dict(
zip(
db.indexer_configuration_cols,
db.indexer_configuration_get_from_id(id_, cur),
)
)
return {
"id": tool["id"],
"name": tool["tool_name"],
"version": tool["tool_version"],
"configuration": tool["tool_configuration"],
}