Custom Static DAG#
You can create workflows using Python scripts as an alternative to YAML-based workflows.
Write your workflow as a Python file, define the parameters, and use the build_hpc_job_step wrapper function
to handle the orchestration. While YAML-based workflows (using the LWD specification) offer a user-friendly
approach for common use cases, Custom Static DAGs provide more flexibility and control for complex workflows.
Basic Structure#
A custom static DAG has this basic structure:
with DAG(
dag_id="my_custom_dag",
description="My Custom Workflow",
catchup=False,
start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
schedule_interval=None,
tags=["myproject"],
access_control={'myproject': {'can_read', 'can_edit'}},
render_template_as_native_obj=True,
params={
"access_token": Param("", type="string"),
"job1": Param({
"requirements": {...}, # Job configuration
"data_inputs": [...], # Input datasets
"data_outputs": [...] # Output storage
}, schema=SingleHPCJob)
}
) as dag:
# TASK: Authenticate and get initial access token
ref_token = LexisAAIOperator(
task_id="refresh_token_task",
access_token="{{ params.access_token }}",
offline_access=True
)
# TASK: Keep token alive during workflow execution
ref_token_keeper = LexisAAITokenKeeperSensor(
task_id="ref_token_keeper",
offline_access=True
)
# TASK: Stop token keeper when workflow completes
ref_token_stop = LexisAAIStopKeeperOperator(
trigger_rule=TriggerRule.ALL_DONE
)
# TASK: Your HPC job
job = build_hpc_job_step(
dag=dag,
group_id="job",
job_requirements="{{ params.job1['requirements'] }}",
data_inputs="{{ params.job1['data_inputs'] }}",
data_outputs="{{ params.job1['data_outputs'] }}"
)
# TASK: Mark workflow as complete
finish = FinishOperator(
task_id="finish"
)
# Define task dependencies
ref_token >> ref_token_keeper
ref_token >> job >> ref_token_stop >> finish
Authentication Management#
HPC jobs can run for hours or days, but access tokens expire quickly. To handle this, every workflow includes
three authentication operators (LexisAAIOperator, LexisAAITokenKeeperSensor, LexisAAIStopKeeperOperator)
that work together to keep your session alive.
You’ll see these in every example - they’re standard components that handle token management automatically. See AAI Operators for technical details.
Defining Job Parameters#
The SingleHPCJob schema defines three main components:
Job Requirements#
Specify HPC resources and job configuration:
"requirements": {
"job_name": "My Computation",
"project_shortname": "myproject",
"policy": "preferred", # How to select resources
"locations": [{
"location_name": "Karolina", # HPC cluster name
"location_resource": "your-resource-name"
}],
"tasks": [{
"command_template_name": "TestTemplate", # Your job script
"node_type_name": "qcpu_exp",
"walltime_limit": 3600,
"max_cores": 128,
"template_parameter_values": {
# Default input parameters for your job script
# These are passed to your command template when it runs
"input_param_name": "param_default_value"
"input_param2": 42
}
}]
}
Scheduler policies:
preferred - Use the specified cluster, fail if it’s unavailable
failover - Try the preferred cluster first, use alternatives if unavailable
Data Inputs#
Specify which datasets to stage to the HPC cluster:
"data_inputs": [{
"source": {
"location": "your-location", # e.g., "iRODS IT4I"
"resource": "your-resource", # e.g., "iRODS LEXIS V2"
"info": {
"dataset_id": "your-dataset-uuid",
"path": "/"
}
},
"target_path": "/" # Where to place data on HPC
}]
Data Outputs#
Specify where to find results and where to store them:
"data_outputs": [{
"source_path": "/", # Where results are on HPC
"target": {
"location": "your-location", # e.g., "iRODS IT4I"
"resource": "your-resource", # Storage resource name
"info": {
"access": "project", # Who can access results
"path": "/",
"title": "My Results",
"dataset_type": "dataset"
}
}
}]
Complete Example#
Here’s a complete example of a custom static DAG:
1 import pendulum
2 from airflow import DAG
3 from airflow.models.param import Param
4 from airflow.utils.trigger_rule import TriggerRule
5 from lexis.operators.aai import (
6 LexisAAIOperator,
7 LexisAAITokenKeeperSensor,
8 LexisAAIStopKeeperOperator
9 )
10 from lexis.operators.hpc import build_hpc_job_step
11 from lexis.operators.scheduler.hpc_job import FinishOperator
12 from lexis.datatypes.hpc import SingleHPCJob
13
14 with DAG(
15 dag_id="lexis_hpc_simple",
16 description="Simple HPC Workflow Example",
17 catchup=False,
18 start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
19 schedule_interval=None,
20 tags=["myproject"],
21 access_control={'myproject': {'can_read', 'can_edit'}},
22 render_template_as_native_obj=True,
23 params={
24 "access_token": Param("", type="string"),
25 "job1": Param({
26 "requirements": {
27 "job_name": "My HPC Job",
28 "project_shortname": "myproject",
29 "policy": "preferred",
30 "locations": [{
31 "location_name": "Karolina",
32 "location_resource": "your-resource-name"
33 }],
34 "tasks": [{
35 "command_template_name": "TestTemplate",
36 "node_type_name": "qcpu_exp",
37 "walltime_limit": 3600,
38 "max_cores": 128,
39 "template_parameter_values": {
40 "input_param": "value"
41 }
42 }]
43 },
44 "data_inputs": [{
45 "source": {
46 "location": "your-location",
47 "resource": "your-resource",
48 "info": {
49 "dataset_id": "your-dataset-uuid",
50 "path": "/"
51 }
52 },
53 "target_path": "/"
54 }],
55 "data_outputs": [{
56 "target": {
57 "location": "your-location",
58 "resource": "your-resource",
59 "info": {
60 "access": "project",
61 "path": "/",
62 "title": "My Workflow Output",
63 "dataset_type": "dataset"
64 }
65 },
66 "source_path": "/"
67 }]
68 }, schema=SingleHPCJob)
69 }
70 ) as dag:
71
72 # --- Token Management ---
73 ref_token = LexisAAIOperator(
74 task_id="refresh_token_task",
75 access_token="{{ params.access_token }}",
76 offline_access=True
77 )
78
79 ref_token_keeper = LexisAAITokenKeeperSensor(
80 task_id="ref_token_keeper",
81 offline_access=True
82 )
83
84 ref_token_stop = LexisAAIStopKeeperOperator(
85 trigger_rule=TriggerRule.ALL_DONE
86 )
87
88 # --- Data Preparation (with runtime templating) ---
89 data_inputs: list[dict] = []
90 for k, data_input in enumerate(dag.params.get_param("job1").value["data_inputs"]):
91 data_inputs.append({
92 "source": "{{ params.job1['data_inputs'][" + str(k) + "]['source'] }}",
93 "target_path": "{{ params.job1['data_inputs'][" + str(k) + "]['target_path'] }}"
94 })
95
96 data_outputs: list[dict] = []
97 for k, data_output in enumerate(dag.params.get_param("job1").value["data_outputs"]):
98 data_outputs.append({
99 "target": "{{ params.job1['data_outputs'][" + str(k) + "]['target'] }}",
100 "source_path": "{{ params.job1['data_outputs'][" + str(k) + "]['source_path'] }}"
101 })
102
103 # --- Your HPC Job ---
104 job = build_hpc_job_step(
105 dag=dag,
106 group_id="job",
107 job_requirements="{{ params.job1['requirements'] }}",
108 data_inputs=data_inputs,
109 data_outputs=data_outputs
110 )
111
112 # --- Workflow Completion ---
113 finish = FinishOperator(
114 task_id="finish"
115 )
116
117 # --- Task Dependencies ---
118 ref_token >> ref_token_keeper
119 ref_token >> job >> ref_token_stop >> finish
120
121 if __name__ == "__main__":
122 dag.test()
Note
Replace placeholder values like your-resource-name, your-location, your-resource,
and your-dataset-uuid with actual values from your LEXIS project.
Task Dependencies#
Define the execution order using the >> operator:
ref_token >> ref_token_keeper
ref_token >> job >> ref_token_stop >> finish
This creates the workflow execution order: authentication starts first, then the token keeper runs in parallel with your job to refresh the access token throughout execution.
┌─────────────────────┐
│ ref_token │
│ (authenticate) │
└──────────┬──────────┘
│
┌────────────┴────────────┐
│ │
▼ ▼
┌──────────────────────┐ ┌──────────────────────┐
│ ref_token_keeper │ │ job │
│ (keep alive) │ │ (HPC execution) │
│ [runs in parallel] │ └──────────┬───────────┘
└──────────────────────┘ │
▼
┌──────────────────────┐
│ ref_token_stop │
│ (cleanup keeper) │
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ finish │
└──────────────────────┘
Testing and Deployment#
Test Your DAG Locally#
Before deployment, test your workflow locally:
if __name__ == "__main__":
dag.test()
This validates your workflow logic and helps identify issues early.
Deployment Process#
To deploy a custom static DAG:
Develop and test your Python script locally
Submit for review to LEXIS platform administrators
Security validation ensures the workflow meets platform requirements
Deployment to the Airflow instance once approved
Execute via the LEXIS Portal, API, or Airflow UI