Custom Static DAG#

You can create workflows using Python scripts as an alternative to YAML-based workflows. Write your workflow as a Python file, define the parameters, and use the build_hpc_job_step wrapper function to handle the orchestration. While YAML-based workflows (using the LWD specification) offer a user-friendly approach for common use cases, Custom Static DAGs provide more flexibility and control for complex workflows.

Basic Structure#

A custom static DAG has this basic structure:

with DAG(
    dag_id="my_custom_dag",
    description="My Custom Workflow",
    catchup=False,
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    schedule_interval=None,
    tags=["myproject"],
    access_control={'myproject': {'can_read', 'can_edit'}},
    render_template_as_native_obj=True,
    params={
        "access_token": Param("", type="string"),
        "job1": Param({
            "requirements": {...},      # Job configuration
            "data_inputs": [...],       # Input datasets
            "data_outputs": [...]       # Output storage
        }, schema=SingleHPCJob)
    }
) as dag:

    # TASK: Authenticate and get initial access token
    ref_token = LexisAAIOperator(
        task_id="refresh_token_task",
        access_token="{{ params.access_token }}",
        offline_access=True
    )

    # TASK: Keep token alive during workflow execution
    ref_token_keeper = LexisAAITokenKeeperSensor(
        task_id="ref_token_keeper",
        offline_access=True
    )

    # TASK: Stop token keeper when workflow completes
    ref_token_stop = LexisAAIStopKeeperOperator(
        trigger_rule=TriggerRule.ALL_DONE
    )

    # TASK: Your HPC job
    job = build_hpc_job_step(
        dag=dag,
        group_id="job",
        job_requirements="{{ params.job1['requirements'] }}",
        data_inputs="{{ params.job1['data_inputs'] }}",
        data_outputs="{{ params.job1['data_outputs'] }}"
    )

    # TASK: Mark workflow as complete
    finish = FinishOperator(
        task_id="finish"
    )

    # Define task dependencies
    ref_token >> ref_token_keeper
    ref_token >> job >> ref_token_stop >> finish

Authentication Management#

HPC jobs can run for hours or days, but access tokens expire quickly. To handle this, every workflow includes three authentication operators (LexisAAIOperator, LexisAAITokenKeeperSensor, LexisAAIStopKeeperOperator) that work together to keep your session alive.

You’ll see these in every example - they’re standard components that handle token management automatically. See AAI Operators for technical details.

Defining Job Parameters#

The SingleHPCJob schema defines three main components:

Job Requirements#

Specify HPC resources and job configuration:

"requirements": {
    "job_name": "My Computation",
    "project_shortname": "myproject",
    "policy": "preferred",                        # How to select resources
    "locations": [{
        "location_name": "Karolina",              # HPC cluster name
        "location_resource": "your-resource-name"
    }],
    "tasks": [{
        "command_template_name": "TestTemplate",  # Your job script
        "node_type_name": "qcpu_exp",
        "walltime_limit": 3600,
        "max_cores": 128,
        "template_parameter_values": {
            # Default input parameters for your job script
            # These are passed to your command template when it runs
            "input_param_name": "param_default_value"
            "input_param2": 42
        }
    }]
}

Scheduler policies:

  • preferred - Use the specified cluster, fail if it’s unavailable

  • failover - Try the preferred cluster first, use alternatives if unavailable

Data Inputs#

Specify which datasets to stage to the HPC cluster:

"data_inputs": [{
    "source": {
        "location": "your-location",          # e.g., "iRODS IT4I"
        "resource": "your-resource",          # e.g., "iRODS LEXIS V2"
        "info": {
            "dataset_id": "your-dataset-uuid",
            "path": "/"
        }
    },
    "target_path": "/"                       # Where to place data on HPC
}]

Data Outputs#

Specify where to find results and where to store them:

"data_outputs": [{
    "source_path": "/",                      # Where results are on HPC
    "target": {
        "location": "your-location",          # e.g., "iRODS IT4I"
        "resource": "your-resource",          # Storage resource name
        "info": {
            "access": "project",              # Who can access results
            "path": "/",
            "title": "My Results",
            "dataset_type": "dataset"
        }
    }
}]

Complete Example#

Here’s a complete example of a custom static DAG:

  1 import pendulum
  2 from airflow import DAG
  3 from airflow.models.param import Param
  4 from airflow.utils.trigger_rule import TriggerRule
  5 from lexis.operators.aai import (
  6     LexisAAIOperator,
  7     LexisAAITokenKeeperSensor,
  8     LexisAAIStopKeeperOperator
  9 )
 10 from lexis.operators.hpc import build_hpc_job_step
 11 from lexis.operators.scheduler.hpc_job import FinishOperator
 12 from lexis.datatypes.hpc import SingleHPCJob
 13
 14 with DAG(
 15     dag_id="lexis_hpc_simple",
 16     description="Simple HPC Workflow Example",
 17     catchup=False,
 18     start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
 19     schedule_interval=None,
 20     tags=["myproject"],
 21     access_control={'myproject': {'can_read', 'can_edit'}},
 22     render_template_as_native_obj=True,
 23     params={
 24         "access_token": Param("", type="string"),
 25         "job1": Param({
 26             "requirements": {
 27                 "job_name": "My HPC Job",
 28                 "project_shortname": "myproject",
 29                 "policy": "preferred",
 30                 "locations": [{
 31                     "location_name": "Karolina",
 32                     "location_resource": "your-resource-name"
 33                 }],
 34                 "tasks": [{
 35                     "command_template_name": "TestTemplate",
 36                     "node_type_name": "qcpu_exp",
 37                     "walltime_limit": 3600,
 38                     "max_cores": 128,
 39                     "template_parameter_values": {
 40                         "input_param": "value"
 41                     }
 42                 }]
 43             },
 44             "data_inputs": [{
 45                 "source": {
 46                     "location": "your-location",
 47                     "resource": "your-resource",
 48                     "info": {
 49                         "dataset_id": "your-dataset-uuid",
 50                         "path": "/"
 51                     }
 52                 },
 53                 "target_path": "/"
 54             }],
 55             "data_outputs": [{
 56                 "target": {
 57                     "location": "your-location",
 58                     "resource": "your-resource",
 59                     "info": {
 60                         "access": "project",
 61                         "path": "/",
 62                         "title": "My Workflow Output",
 63                         "dataset_type": "dataset"
 64                     }
 65                 },
 66                 "source_path": "/"
 67             }]
 68         }, schema=SingleHPCJob)
 69     }
 70 ) as dag:
 71
 72     # --- Token Management ---
 73     ref_token = LexisAAIOperator(
 74         task_id="refresh_token_task",
 75         access_token="{{ params.access_token }}",
 76         offline_access=True
 77     )
 78
 79     ref_token_keeper = LexisAAITokenKeeperSensor(
 80         task_id="ref_token_keeper",
 81         offline_access=True
 82     )
 83
 84     ref_token_stop = LexisAAIStopKeeperOperator(
 85         trigger_rule=TriggerRule.ALL_DONE
 86     )
 87
 88     # --- Data Preparation (with runtime templating) ---
 89     data_inputs: list[dict] = []
 90     for k, data_input in enumerate(dag.params.get_param("job1").value["data_inputs"]):
 91         data_inputs.append({
 92             "source": "{{ params.job1['data_inputs'][" + str(k) + "]['source'] }}",
 93             "target_path": "{{ params.job1['data_inputs'][" + str(k) + "]['target_path'] }}"
 94         })
 95
 96     data_outputs: list[dict] = []
 97     for k, data_output in enumerate(dag.params.get_param("job1").value["data_outputs"]):
 98         data_outputs.append({
 99             "target": "{{ params.job1['data_outputs'][" + str(k) + "]['target'] }}",
100             "source_path": "{{ params.job1['data_outputs'][" + str(k) + "]['source_path'] }}"
101         })
102
103     # --- Your HPC Job  ---
104     job = build_hpc_job_step(
105         dag=dag,
106         group_id="job",
107         job_requirements="{{ params.job1['requirements'] }}",
108         data_inputs=data_inputs,
109         data_outputs=data_outputs
110     )
111
112     # --- Workflow Completion ---
113     finish = FinishOperator(
114         task_id="finish"
115     )
116
117     # --- Task Dependencies ---
118     ref_token >> ref_token_keeper
119     ref_token >> job >> ref_token_stop >> finish
120
121 if __name__ == "__main__":
122     dag.test()

Note

Replace placeholder values like your-resource-name, your-location, your-resource, and your-dataset-uuid with actual values from your LEXIS project.

Task Dependencies#

Define the execution order using the >> operator:

ref_token >> ref_token_keeper
ref_token >> job >> ref_token_stop >> finish

This creates the workflow execution order: authentication starts first, then the token keeper runs in parallel with your job to refresh the access token throughout execution.

                ┌─────────────────────┐
                │    ref_token        │
                │  (authenticate)     │
                └──────────┬──────────┘
                           │
              ┌────────────┴────────────┐
              │                         │
              ▼                         ▼
┌──────────────────────┐   ┌──────────────────────┐
│  ref_token_keeper    │   │        job           │
│  (keep alive)        │   │  (HPC execution)     │
│  [runs in parallel]  │   └──────────┬───────────┘
└──────────────────────┘              │
                                      ▼
                           ┌──────────────────────┐
                           │  ref_token_stop      │
                           │  (cleanup keeper)    │
                           └──────────┬───────────┘
                                      │
                                      ▼
                           ┌──────────────────────┐
                           │      finish          │
                           └──────────────────────┘

Testing and Deployment#

Test Your DAG Locally#

Before deployment, test your workflow locally:

if __name__ == "__main__":
    dag.test()

This validates your workflow logic and helps identify issues early.

Deployment Process#

To deploy a custom static DAG:

  1. Develop and test your Python script locally

  2. Submit for review to LEXIS platform administrators

  3. Security validation ensures the workflow meets platform requirements

  4. Deployment to the Airflow instance once approved

  5. Execute via the LEXIS Portal, API, or Airflow UI