swh.graph.luigi.compressed_graph module#
Luigi tasks for compression#
This module contains Luigi tasks, as an alternative to the CLI that can be composed with other tasks, such as swh-export’s.
It implements the task DAG described in Compression steps.
Unlike the CLI, this requires the graph to be named graph.
Filtering#
The object_types parameter (--object-types on the CLI) specifies
the set of node types to read from the dataset export, and it defaults to
all types: ori,snp,rel,rev,dir,cnt.
Because the dataset export is keyed by edge sources, some objects
without any of these types will be present in the input dataset. For example,
if exporting ori,snp,rel,rev, root Directory of every release and revision
will be present, though without its labels (as well as a few Content objects
pointed by some Releases).
File layout#
In addition to files documented in Graph compression (eg. graph.graph,
graph.mph, …), tasks in this module produce this directory structure:
base_dir/
    <date>[_<flavor>]/
        compressed/
            graph.graph
            graph.mph
            ...
            meta/
                export.json
                compression.json
graph.meta/export.json is copied from the ORC dataset exported by
swh.export.luigi.
graph.meta/compression.json  contains information about the compression itself,
for provenance tracking.
For example:
[
    {
        "steps": null,
        "export_start": "2022-11-08T11:00:54.998799+00:00",
        "export_end": "2022-11-08T11:05:53.105519+00:00",
        "object_types": [
            "origin",
            "origin_visit"
        ],
        "hostname": "desktop5",
        "conf": {},
        "tool": {
            "name": "swh.graph",
            "version": "2.2.0"
        },
        "commands": [
            {
                "command": [
                    "bash",
                    "-c",
                    "java it.unimi.dsi.big.webgraph.labelling.BitStreamArcLabelledImmutableGraph --list ..."
                ],
                "cgroup": "/sys/fs/cgroup/user.slice/user-1002.slice/user@1002.service/app.slice/swh.graph@103038/bash@0",
                "cgroup_stats": {
                    "memory.events": "low 0\nhigh 0\nmax 0\noom 0\noom_kill 0\noom_group_kill 0",
                    "memory.events.local": "low 0\nhigh 0\nmax 0\noom 0\noom_kill 0\noom_group_kill 0",
                    "memory.swap.current": "0",
                    "memory.zswap.current": "0",
                    "memory.swap.events": "high 0\nmax 0\nfail 0",
                    "cpu.stat": "usage_usec 531350\nuser_usec 424286\nsystem_usec 107063\n...",
                    "memory.current": "614400",
                    "memory.stat": "anon 0\nfile 110592\nkernel 176128\nkernel_stack 0\n...",
                    "memory.numa_stat": "anon N0=0\nfile N0=110592\nkernel_stack N0=0\n...",
                    "memory.peak": "49258496"
                }
            }
        ]
    }
]
When the compression pipeline is run in separate steps, each of the steps is recorded as an object in the root list.
S3 layout#
As .bin files are meant to be accessed randomly, they are uncompressed on disk.
However, this is undesirable on at-rest/long-term storage like on S3, because
some are very sparse (eg. graph.property.committer_timestamp.bin can be
quickly compressed from 300 to 1GB).
Therefore, these files are compressed to .bin.zst, and need to be decompressed
when downloading.
The layout is otherwise the same as the file layout.
- class swh.graph.luigi.compressed_graph.ObjectTypesParameter(*args, **kwargs)[source]#
- Bases: - Parameter- A parameter type whose value is either - *or a set of comma-separated object types (eg.- ori,snp,rel,rev,dir,cnt).
- class swh.graph.luigi.compressed_graph.ExtractNodes(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = -20#
 - EXPORT_AS_INPUT: bool = True#
- Whether this task should depend directly on - LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by- requires().
 
- class swh.graph.luigi.compressed_graph.ExtractLabels(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = -10#
 - EXPORT_AS_INPUT: bool = True#
- Whether this task should depend directly on - LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by- requires().
 - OUTPUT_FILES: Set[str] = {'.labels.csv.zst'}#
- List of files which this task produces, without the graph name as prefix. 
 - USES_ALL_CPU_THREADS: bool = True#
- Trueon tasks that use all available CPU for their entire runtime.- These tasks should be scheduled in such a way they do not run at the same time, because running them concurrently does not improve run time. 
 - priority = -100#
- low priority, because it is not on the critical path 
 
- class swh.graph.luigi.compressed_graph.NodeStats(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 0#
 - OUTPUT_FILES: Set[str] = {'.nodes.count.txt', '.nodes.stats.txt'}#
- List of files which this task produces, without the graph name as prefix. 
 - priority = 100#
- high priority, to help the scheduler allocate resources 
 
- class swh.graph.luigi.compressed_graph.EdgeStats(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 3#
 - EXPORT_AS_INPUT: bool = True#
- Whether this task should depend directly on - LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by- requires().
 - OUTPUT_FILES: Set[str] = {'.edges.count.txt', '.edges.stats.txt'}#
- List of files which this task produces, without the graph name as prefix. 
 - priority = 100#
- high priority, to help the scheduler allocate resources 
 
- class swh.graph.luigi.compressed_graph.LabelStats(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 6#
 - OUTPUT_FILES: Set[str] = {'.labels.count.txt'}#
- List of files which this task produces, without the graph name as prefix. 
 - USES_ALL_CPU_THREADS: bool = True#
- Trueon tasks that use all available CPU for their entire runtime.- These tasks should be scheduled in such a way they do not run at the same time, because running them concurrently does not improve run time. 
 - priority = 100#
- high priority, to help the scheduler allocate resources 
 
- class swh.graph.luigi.compressed_graph.Mph(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 10#
 
- class swh.graph.luigi.compressed_graph.Bv(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 30#
 
- class swh.graph.luigi.compressed_graph.BvEf(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 40#
 
- class swh.graph.luigi.compressed_graph.BfsRoots(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 50#
 
- class swh.graph.luigi.compressed_graph.Bfs(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 60#
 
- class swh.graph.luigi.compressed_graph.PermuteAndSimplifyBfs(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 70#
 
- class swh.graph.luigi.compressed_graph.BfsEf(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 80#
 
- class swh.graph.luigi.compressed_graph.BfsDcf(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 90#
 
- class swh.graph.luigi.compressed_graph.Llp(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 100#
 - INPUT_FILES: Set[str] = {'-bfs-simplified.dcf', '-bfs-simplified.ef', '-bfs-simplified.graph'}#
- Dependencies of this step. 
 - OUTPUT_FILES: Set[str] = {'-llp.order'}#
- List of files which this task produces, without the graph name as prefix. 
 - gammas = <luigi.parameter.Parameter object>#
 
- class swh.graph.luigi.compressed_graph.PermuteLlp(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 120#
 
- class swh.graph.luigi.compressed_graph.Offsets(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 130#
 
- class swh.graph.luigi.compressed_graph.Ef(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 140#
 
- class swh.graph.luigi.compressed_graph.ComposeOrders(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 110#
 
- class swh.graph.luigi.compressed_graph.Transpose(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 160#
 
- class swh.graph.luigi.compressed_graph.TransposeOffsets(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 170#
 
- class swh.graph.luigi.compressed_graph.TransposeEf(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 175#
 
- class swh.graph.luigi.compressed_graph.Maps(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 180#
 
- class swh.graph.luigi.compressed_graph.ExtractPersons(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 190#
 - EXPORT_AS_INPUT: bool = True#
- Whether this task should depend directly on - LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by- requires().
 
- class swh.graph.luigi.compressed_graph.PersonsStats(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 195#
 
- class swh.graph.luigi.compressed_graph.MphPersons(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 200#
 
- class swh.graph.luigi.compressed_graph.NodeProperties(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 210#
 - INPUT_FILES: Set[str] = {'.node2swhid.bin', '.persons.pthash', '.pthash', '.pthash.order'}#
- Dependencies of this step. 
 - EXPORT_AS_INPUT: bool = True#
- Whether this task should depend directly on - LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by- requires().
 - OUTPUT_FILES: Set[str] = {'.property.author_id.bin', '.property.author_timestamp.bin', '.property.author_timestamp_offset.bin', '.property.committer_id.bin', '.property.committer_timestamp.bin', '.property.committer_timestamp_offset.bin', '.property.content.is_skipped.bits', '.property.content.length.bin', '.property.message.bin', '.property.message.offset.bin', '.property.tag_name.bin', '.property.tag_name.offset.bin'}#
- List of files which this task produces, without the graph name as prefix. 
 - priority = 10#
- semi-high priority because it takes a very long time to run 
 - output() List[LocalTarget][source]#
- Returns a list of luigi targets matching - OUTPUT_FILES.
 
- class swh.graph.luigi.compressed_graph.PthashLabels(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 220#
 
- class swh.graph.luigi.compressed_graph.LabelsOrder(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 225#
 
- class swh.graph.luigi.compressed_graph.FclLabels(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 230#
 
- class swh.graph.luigi.compressed_graph.EdgeLabels(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 240#
 - INPUT_FILES: Set[str] = {'.labels.pthash', '.labels.pthash.order', '.pthash', '.pthash.order'}#
- Dependencies of this step. 
 - EXPORT_AS_INPUT: bool = True#
- Whether this task should depend directly on - LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by- requires().
 - OUTPUT_FILES: Set[str] = {'-labelled.labeloffsets', '-labelled.labels', '-labelled.properties'}#
- List of files which this task produces, without the graph name as prefix. 
 - priority = 10#
- semi-high priority because it takes a long time to run 
 
- class swh.graph.luigi.compressed_graph.EdgeLabelsTranspose(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 250#
 - INPUT_FILES: Set[str] = {'.labels.pthash', '.labels.pthash.order', '.pthash', '.pthash.order'}#
- Dependencies of this step. 
 - EXPORT_AS_INPUT: bool = True#
- Whether this task should depend directly on - LocalExport. If not, it is assumed it depends transitiviely via one of the tasks returned by- requires().
 - OUTPUT_FILES: Set[str] = {'-transposed-labelled.labeloffsets', '-transposed-labelled.labels', '-transposed-labelled.properties'}#
- List of files which this task produces, without the graph name as prefix. 
 - priority = 10#
- semi-high priority because it takes a long time to run 
 
- class swh.graph.luigi.compressed_graph.EdgeLabelsEf(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 270#
 
- class swh.graph.luigi.compressed_graph.EdgeLabelsTransposeEf(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 280#
 
- class swh.graph.luigi.compressed_graph.Stats(*args, **kwargs)[source]#
- Bases: - _CompressionStepTask- STEP: CompressionStep = 290#
 
- class swh.graph.luigi.compressed_graph.CompressGraph(*args, **kwargs)[source]#
- Bases: - Task- local_export_path = <luigi.parameter.PathParameter object>#
 - graph_name = <luigi.parameter.Parameter object>#
 - batch_size = <luigi.parameter.IntParameter object>#
 - rust_executable_dir = <luigi.parameter.Parameter object>#
 - test_flavor = <luigi.parameter.Parameter object>#
 
- class swh.graph.luigi.compressed_graph.UploadGraphToS3(*args, **kwargs)[source]#
- Bases: - Task- Uploads a local compressed graphto S3; creating automatically if it does not exist. - Example invocation: - luigi --local-scheduler --module swh.graph.luigi UploadGraphToS3 --local-graph-path=graph/ --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/ - local_graph_path = <luigi.parameter.PathParameter object>#
 - s3_graph_path = <swh.export.luigi.S3PathParameter object>#
 - parallelism = <luigi.parameter.IntParameter object>#
 - requires() List[Task][source]#
- Returns a - CompressGraphtask that writes local files at the expected location.
 
- class swh.graph.luigi.compressed_graph.DownloadGraphFromS3(*args, **kwargs)[source]#
- Bases: - Task- Downloads a local dataset graph from S3. - This performs the inverse operation of - UploadGraphToS3- Example invocation: - luigi --local-scheduler --module swh.graph.luigi DownloadGraphFromS3 --local-graph-path=graph/ --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/ - requires() List[Task][source]#
- Returns a - UploadGraphToS3task that writes local files to S3.
 
- class swh.graph.luigi.compressed_graph.LocalGraph(*args, **kwargs)[source]#
- Bases: - Task- Task that depends on a local dataset being present – either directly from - ExportGraphor via- DownloadGraphFromS3.- compression_task_type = <luigi.parameter.TaskParameter object>#
 - requires() List[Task][source]#
- Returns an instance of either - CompressGraphor- DownloadGraphFromS3depending on the value of- compression_task_type.