swh.export.luigi module#
Luigi tasks#
This module contains Luigi tasks, as an alternative to the CLI that can be composed with other tasks, such as swh-graph’s.
File layout#
Tasks in this module work on “export directories”, which have this layout:
swh_<date>[_<flavor>]/
edges/
origin/
snapshot/
...
stamps/
origin
snapshot
...
orc/
origin/
snapshot/
...
stamps/
origin
snapshot
...
meta/
export.json
stamps files are written after corresponding directories are written.
Their presence indicates the corresponding directory was fully generated/copied.
This allows skipping work that was already done, while ignoring interrupted jobs.
They are omitted after the initial export (ie. when downloading to/from other machines).
meta/export.json contains information about the dataset, for provenance tracking.
For example:
{
"flavor": "full",
"export_start": "2022-11-08T11:00:54.998799+00:00",
"export_end": "2022-11-08T11:05:53.105519+00:00",
"brokers": [
"broker1.journal.staging.swh.network:9093"
],
"prefix": "swh.journal.objects",
"formats": [
"edges",
"orc"
],
"object_types": [
"revision",
"release",
"snapshot",
"origin_visit_status",
"origin_visit",
"origin"
],
"privileged": false,
"hostname": "desktop5",
"tool": {
"name": "swh.export",
"version": "0.3.2"
}
}
object_types contains a list of “main tables” exported; this excludes relational
tables like directory_entry.
Running all on staging#
An easy way to run it (eg. on the staging database), is to have these config files:
And run this command, for example:
luigi --log-level INFO --local-scheduler --module swh.export.luigi RunExportAll --UploadExportToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ --s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/ --athena-db-name=vlorentz_20221109_staging
Note that this arbitrarily divides config options between luigi.cfg and the CLI
for readability; but they can be used interchangeably
- class swh.export.luigi.ObjectType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
Enum- origin = 1#
- origin_visit = 2#
- origin_visit_status = 3#
- snapshot = 4#
- release = 5#
- revision = 6#
- directory = 7#
- content = 8#
- skipped_content = 9#
- class swh.export.luigi.Format(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Bases:
Enum- edges = 1#
- orc = 2#
- swh.export.luigi.merge_lists(lists: Iterator[List[T]]) List[T][source]#
Returns a list made of all items of the arguments, with no duplicate.
- class swh.export.luigi.PathParameter(is_dir: bool = False, is_file: bool = False, exists: bool = False, create: bool = False, **kwargs)[source]#
Bases:
PathParameterA parameter that is a local filesystem path.
If
is_dir,is_file, orexistsisTrue, then existence of the path (and optionally type) is checked.If
createis set, thenis_dirmust beTrue, and the directory is created if it does not already exist.- Parameters:
is_dir – whether the path should be to a directory
is_file – whether the path should be to a directory
exists – whether the path should already exist
create – whether the path should be created if it does not exist
is_dirandis_fileare mutually exclusive.existsandcreateare mutually exclusive.
- class swh.export.luigi.S3PathParameter(*args, **kwargs)[source]#
Bases:
ParameterA parameter that strip trailing slashes
- normalize(s)[source]#
Given a parsed parameter value, normalizes it.
The value can either be the result of parse(), the default value or arguments passed into the task’s constructor by instantiation.
This is very implementation defined, but can be used to validate/clamp valid values. For example, if you wanted to only accept even integers, and “correct” odd values to the nearest integer, you can implement normalize as
x // 2 * 2.
- class swh.export.luigi.FractionalFloatParameter(*args, **kwargs)[source]#
Bases:
FloatParameterA float parameter that must be between 0 and 1
- swh.export.luigi.stamps_paths(formats: List[Format], object_types: List[ObjectType]) List[str][source]#
Returns a list of (local FS or S3) paths used to mark tables as successfully exported.
- class swh.export.luigi.ExportGraph(*args, **kwargs)[source]#
Bases:
TaskExports the entire graph to the local filesystem.
Example invocation:
luigi --local-scheduler --module swh.export.luigi ExportGraph --config=graph.prod.yml --local-export-path=export/ --formats=edges
which is equivalent to this CLI call:
swh export –config-file graph.prod.yml graph export export/ –formats=edges
- export_id = <luigi.parameter.OptionalParameter object>#
- formats = <luigi.parameter.EnumListParameter object>#
- processes = <luigi.parameter.IntParameter object>#
- object_types = <luigi.parameter.EnumListParameter object>#
- class swh.export.luigi.UploadExportToS3(*args, **kwargs)[source]#
Bases:
TaskUploads a local dataset export to S3; creating automatically if it does not exist.
Example invocation:
luigi --local-scheduler --module swh.export.luigi UploadExportToS3 --local-export-path=export/ --formats=edges --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
- formats = <luigi.parameter.EnumListParameter object>#
- object_types = <luigi.parameter.EnumListParameter object>#
- requires() List[Task][source]#
Returns a
ExportGraphtask that writes local files at the expected location.
- class swh.export.luigi.DownloadExportFromS3(*args, **kwargs)[source]#
Bases:
TaskDownloads a local dataset export from S3.
This performs the inverse operation of
UploadExportToS3Example invocation:
luigi --local-scheduler --module swh.export.luigi DownloadExportFromS3 --local-export-path=export/ --formats=edges --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
- formats = <luigi.parameter.EnumListParameter object>#
- object_types = <luigi.parameter.EnumListParameter object>#
- parallelism = <luigi.parameter.IntParameter object>#
- requires() List[Task][source]#
Returns a
ExportGraphtask that writes local files at the expected location.
- class swh.export.luigi.LocalExport(*args, **kwargs)[source]#
Bases:
TaskTask that depends on a local dataset being present – either directly from
ExportGraphor viaDownloadExportFromS3.- formats = <luigi.parameter.EnumListParameter object>#
- object_types = <luigi.parameter.EnumListParameter object>#
- export_task_type = <luigi.parameter.TaskParameter object>#
- requires() List[Task][source]#
Returns an instance of either
ExportGraphorDownloadExportFromS3depending on the value ofexport_task_type.
- class swh.export.luigi.AthenaDatabaseTarget(name: str, table_names: Set[str])[source]#
Bases:
TargetTarget for the existence of a database on Athena.
- class swh.export.luigi.CreateAthena(*args, **kwargs)[source]#
Bases:
TaskCreates tables on AWS Athena pointing to a given graph dataset on S3.
Example invocation:
luigi --local-scheduler --module swh.export.luigi CreateAthena --ExportGraph-config=graph.staging.yml --athena-db-name=swh_20221108 --object-types=origin,origin_visit --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
which is equivalent to this CLI call:
swh export athena create –database-name swh_20221108 –location-prefix s3://softwareheritage/graph/swh_2022-11-08 –output-location s3://softwareheritage/graph/tmp/athena –replace-tables
- object_types = <luigi.parameter.EnumListParameter object>#
- s3_export_path = <swh.export.luigi.S3PathParameter object>#
- s3_athena_output_location = <swh.export.luigi.S3PathParameter object>#
- athena_db_name = <luigi.parameter.Parameter object>#
- requires() List[Task][source]#
Returns the corresponding
UploadExportToS3instance, with ORC as only format.
- output() List[Target][source]#
Returns an instance of
AthenaDatabaseTarget.
- class swh.export.luigi.RunExportAll(*args, **kwargs)[source]#
Bases:
WrapperTaskRuns both the S3 and Athena export.
Example invocation:
luigi --local-scheduler --module swh.export.luigi RunExportAll --ExportGraph-config=graph.staging.yml --ExportGraph-processes=12 --UploadExportToS3-local-export-path=/tmp/export_2022-11-08_staging/ --formats=edges --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 --athena-db-name=swh_20221108 --object-types=origin,origin_visit --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena
- formats = <luigi.parameter.EnumListParameter object>#
- object_types = <luigi.parameter.EnumListParameter object>#
- s3_export_path = <swh.export.luigi.S3PathParameter object>#
- s3_athena_output_location = <swh.export.luigi.S3PathParameter object>#
- athena_db_name = <luigi.parameter.Parameter object>#
- requires() List[Task][source]#
Returns instances of
CreateAthenaandUploadExportToS3.