=================
Custom Static DAG
=================

You can create workflows using Python scripts as an alternative to YAML-based workflows.
Write your workflow as a Python file, define the parameters, and use the ``build_hpc_job_step`` wrapper function
to handle the orchestration. While YAML-based workflows (using the LWD specification) offer a user-friendly
approach for common use cases, Custom Static DAGs provide more flexibility and control for complex workflows.

Basic Structure
===============

A custom static DAG has this basic structure:

.. code-block:: python

    with DAG(
        dag_id="my_custom_dag",
        description="My Custom Workflow",
        catchup=False,
        start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
        schedule_interval=None,
        tags=["myproject"],
        access_control={'myproject': {'can_read', 'can_edit'}},
        render_template_as_native_obj=True,
        params={
            "access_token": Param("", type="string"),
            "job1": Param({
                "requirements": {...},      # Job configuration
                "data_inputs": [...],       # Input datasets
                "data_outputs": [...]       # Output storage
            }, schema=SingleHPCJob)
        }
    ) as dag:

        # TASK: Authenticate and get initial access token
        ref_token = LexisAAIOperator(
            task_id="refresh_token_task",
            access_token="{{ params.access_token }}",
            offline_access=True
        )

        # TASK: Keep token alive during workflow execution
        ref_token_keeper = LexisAAITokenKeeperSensor(
            task_id="ref_token_keeper",
            offline_access=True
        )

        # TASK: Stop token keeper when workflow completes
        ref_token_stop = LexisAAIStopKeeperOperator(
            trigger_rule=TriggerRule.ALL_DONE
        )

        # TASK: Your HPC job
        job = build_hpc_job_step(
            dag=dag,
            group_id="job",
            job_requirements="{{ params.job1['requirements'] }}",
            data_inputs="{{ params.job1['data_inputs'] }}",
            data_outputs="{{ params.job1['data_outputs'] }}"
        )

        # TASK: Mark workflow as complete
        finish = FinishOperator(
            task_id="finish"
        )

        # Define task dependencies
        ref_token >> ref_token_keeper
        ref_token >> job >> ref_token_stop >> finish



Authentication Management
----------------------------
HPC jobs can run for hours or days, but access tokens expire quickly. To handle this, every workflow includes
three authentication operators (``LexisAAIOperator``, ``LexisAAITokenKeeperSensor``, ``LexisAAIStopKeeperOperator``)
that work together to keep your session alive.

You'll see these in every example - they're standard components that
handle token management automatically. See :ref:`AAI Operators <aai_operators>` for technical details.


Defining Job Parameters
========================

The ``SingleHPCJob`` schema defines three main components:


Job Requirements
----------------

Specify HPC resources and job configuration:

.. code-block:: python

    "requirements": {
        "job_name": "My Computation",
        "project_shortname": "myproject",
        "policy": "preferred",                        # How to select resources
        "locations": [{
            "location_name": "Karolina",              # HPC cluster name
            "location_resource": "your-resource-name"
        }],
        "tasks": [{
            "command_template_name": "TestTemplate",  # Your job script
            "node_type_name": "qcpu_exp",
            "walltime_limit": 3600,
            "max_cores": 128,
            "template_parameter_values": {
                # Default input parameters for your job script
                # These are passed to your command template when it runs
                "input_param_name": "param_default_value"
                "input_param2": 42
            }
        }]
    }

**Scheduler policies:**

* **preferred** - Use the specified cluster, fail if it's unavailable
* **failover** - Try the preferred cluster first, use alternatives if unavailable

Data Inputs
-----------

Specify which datasets to stage to the HPC cluster:

.. code-block:: python

    "data_inputs": [{
        "source": {
            "location": "your-location",          # e.g., "iRODS IT4I"
            "resource": "your-resource",          # e.g., "iRODS LEXIS V2"
            "info": {
                "dataset_id": "your-dataset-uuid",
                "path": "/"
            }
        },
        "target_path": "/"                       # Where to place data on HPC
    }]


Data Outputs
------------

Specify where to find results and where to store them:

.. code-block:: python

    "data_outputs": [{
        "source_path": "/",                      # Where results are on HPC
        "target": {
            "location": "your-location",          # e.g., "iRODS IT4I"
            "resource": "your-resource",          # Storage resource name
            "info": {
                "access": "project",              # Who can access results
                "path": "/",
                "title": "My Results",
                "dataset_type": "dataset"
            }
        }
    }]

.. _simple_HPC_workflow:

Complete Example
================

Here's a complete example of a custom static DAG:

.. code-block:: python
   :linenos:

    import pendulum
    from airflow import DAG
    from airflow.models.param import Param
    from airflow.utils.trigger_rule import TriggerRule
    from lexis.operators.aai import (
        LexisAAIOperator,
        LexisAAITokenKeeperSensor,
        LexisAAIStopKeeperOperator
    )
    from lexis.operators.hpc import build_hpc_job_step
    from lexis.operators.scheduler.hpc_job import FinishOperator
    from lexis.datatypes.hpc import SingleHPCJob

    with DAG(
        dag_id="lexis_hpc_simple",
        description="Simple HPC Workflow Example",
        catchup=False,
        start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
        schedule_interval=None,
        tags=["myproject"],
        access_control={'myproject': {'can_read', 'can_edit'}},
        render_template_as_native_obj=True,
        params={
            "access_token": Param("", type="string"),
            "job1": Param({
                "requirements": {
                    "job_name": "My HPC Job",
                    "project_shortname": "myproject",
                    "policy": "preferred",
                    "locations": [{
                        "location_name": "Karolina",
                        "location_resource": "your-resource-name"
                    }],
                    "tasks": [{
                        "command_template_name": "TestTemplate",
                        "node_type_name": "qcpu_exp",
                        "walltime_limit": 3600,
                        "max_cores": 128,
                        "template_parameter_values": {
                            "input_param": "value"
                        }
                    }]
                },
                "data_inputs": [{
                    "source": {
                        "location": "your-location",
                        "resource": "your-resource",
                        "info": {
                            "dataset_id": "your-dataset-uuid",
                            "path": "/"
                        }
                    },
                    "target_path": "/"
                }],
                "data_outputs": [{
                    "target": {
                        "location": "your-location",
                        "resource": "your-resource",
                        "info": {
                            "access": "project",
                            "path": "/",
                            "title": "My Workflow Output",
                            "dataset_type": "dataset"
                        }
                    },
                    "source_path": "/"
                }]
            }, schema=SingleHPCJob)
        }
    ) as dag:

        # --- Token Management ---
        ref_token = LexisAAIOperator(
            task_id="refresh_token_task",
            access_token="{{ params.access_token }}",
            offline_access=True
        )

        ref_token_keeper = LexisAAITokenKeeperSensor(
            task_id="ref_token_keeper",
            offline_access=True
        )

        ref_token_stop = LexisAAIStopKeeperOperator(
            trigger_rule=TriggerRule.ALL_DONE
        )

        # --- Data Preparation (with runtime templating) ---
        data_inputs: list[dict] = []
        for k, data_input in enumerate(dag.params.get_param("job1").value["data_inputs"]):
            data_inputs.append({
                "source": "{{ params.job1['data_inputs'][" + str(k) + "]['source'] }}",
                "target_path": "{{ params.job1['data_inputs'][" + str(k) + "]['target_path'] }}"
            })

        data_outputs: list[dict] = []
        for k, data_output in enumerate(dag.params.get_param("job1").value["data_outputs"]):
            data_outputs.append({
                "target": "{{ params.job1['data_outputs'][" + str(k) + "]['target'] }}",
                "source_path": "{{ params.job1['data_outputs'][" + str(k) + "]['source_path'] }}"
            })

        # --- Your HPC Job  ---
        job = build_hpc_job_step(
            dag=dag,
            group_id="job",
            job_requirements="{{ params.job1['requirements'] }}",
            data_inputs=data_inputs,
            data_outputs=data_outputs
        )

        # --- Workflow Completion ---
        finish = FinishOperator(
            task_id="finish"
        )

        # --- Task Dependencies ---
        ref_token >> ref_token_keeper
        ref_token >> job >> ref_token_stop >> finish

    if __name__ == "__main__":
        dag.test()

.. note::
   Replace placeholder values like ``your-resource-name``, ``your-location``, ``your-resource``,
   and ``your-dataset-uuid`` with actual values from your LEXIS project.


Task Dependencies
=================

Define the execution order using the ``>>`` operator:

.. code-block:: python

    ref_token >> ref_token_keeper
    ref_token >> job >> ref_token_stop >> finish

This creates the workflow execution order: authentication starts first, then the token keeper runs in parallel with your job to refresh the access token throughout execution.

.. code-block:: text

                    ┌─────────────────────┐
                    │    ref_token        │
                    │  (authenticate)     │
                    └──────────┬──────────┘
                               │
                  ┌────────────┴────────────┐
                  │                         │
                  ▼                         ▼
    ┌──────────────────────┐   ┌──────────────────────┐
    │  ref_token_keeper    │   │        job           │
    │  (keep alive)        │   │  (HPC execution)     │
    │  [runs in parallel]  │   └──────────┬───────────┘
    └──────────────────────┘              │
                                          ▼
                               ┌──────────────────────┐
                               │  ref_token_stop      │
                               │  (cleanup keeper)    │
                               └──────────┬───────────┘
                                          │
                                          ▼
                               ┌──────────────────────┐
                               │      finish          │
                               └──────────────────────┘

Testing and Deployment
=======================

Test Your DAG Locally
---------------------

Before deployment, test your workflow locally:

.. code-block:: python

    if __name__ == "__main__":
        dag.test()

This validates your workflow logic and helps identify issues early.


Deployment Process
------------------

To deploy a custom static DAG:

1. **Develop and test** your Python script locally
2. **Submit for review** to LEXIS platform administrators
3. **Security validation** ensures the workflow meets platform requirements
4. **Deployment** to the Airflow instance once approved
5. **Execute** via the LEXIS Portal, API, or Airflow UI
