====================
Airflow orchestrator
====================

Templating
===========
LEXIS Platform supports three basic templates for the workflow creation: 
- `Container <https://api.lexis.tech/airflow/api/lexis/openapi/swagger#/Containers>`_ to execute a job containing apptainer (singularity) container, 
- `HPC Command Application <https://api.lexis.tech/airflow/api/lexis/openapi/swagger#/HPC%20Command%20App>`_ to execute predefined HEAppE Command Template (job script),
- `Custom Job Script <https://api.lexis.tech/airflow/api/lexis/openapi/swagger#/Custom%20Job%20Scripts>`_ to execute custom (user defined) job script.

Anyway, as mentioned above, the user can specify the computational workflow using `LWD <https://api.lexis.tech/airflow/api/lexis/lwd-workflows/v1/user_guide>`_ YAML specification. 
It allows to create a custom (arbitrary) workflow in a user-friendly way while ensuring the security as described few lines above.
Thus, the user can define as arbitrary workflow as possible. 
The example :ref:`simple_HPC_workflow` demonstrates simple HPC workflow with data transfer.




Examples
========

.. _TOSCA_example:

TOSCA workflow
--------------

.. code-block:: none
    :caption: Tosca description of HPC computational task for Airflow
    
    tosca_definitions_version: tosca_simple_yaml_1_2


    metadata:
    template_name: lexis_job_with_data_dag
    template_version: v1_0
    template_uo: ADAS - IT4Innovations
    template_author: Jan Swiatkowski
    template_contact_email: jan.swiatkowski@vsb.cz
    start_date: "2023-02-21"

    description: LEXIS example workflow with data for Apache Airflow

    imports:
    - https://nextcloud.it4i.cz/s/PL2jsXSMRkMxj7Q/download/lexis-airflow-types.yaml

    topology_template:
    # TEMPLATE INPUTS
    inputs:
        access_token:
        type: string
        required: True
        default: ""
        computation_project:
        type: string
        required: True
        default: ""
        job1_cluster_id:
        type: number
        required: True
        default: 2
        job1_command_template_id:
        type: number
        required: True
        default: 3
        job1_parameters:
        type: map
        required: True
        default: {"inputParam":"test"}
        job1_cluster_node_type_id:
        type: number
        required: True
        default: 8
        job1_max_cores:
        type: number
        required: True
        default: 24
        job1_filetransfer_method_id:
        type: number
        required: True
        default: 2
        job1_heappe_uri:
        type: string
        required: True
        default: "https://heappe.it4i.cz/lexis"
        job1_input_dataset_path:
        type: string
        required: True
        default: "project/proj812b07ed780274387d12c665fa3a4f7f/5e51f28c-88fa-11ec-a591-00155d4ab2e4/test10M.dat"
        job1_output_dataset_path:
        type: string
        required: True
        default: "project/proj812b07ed780274387d12c665fa3a4f7f"
        job1_output_ddi_metadata:
        type: map #lexis.datatypes.ddi.Metadata
        required: True
        default:
            contributor: ["Testing"]
            creator: ["Testing"]
            owner: ["Testing"]
            publicationYear: "2022"
            publisher: ["Airflow"]
            resourceType: Test data
            title: AIRFLOW test output TOSCA


    # NODES
    node_templates:
        # token management
        ref_token:
        type: lexis.nodes.operators.aai.LexisAAIOperator
        properties:
            access_token: { get_input: access_token }
        ref_token_keeper:
        type: lexis.nodes.operators.aai.LexisAAITokenKeeperSensor
        requirements:
            - aai_operator: ref_token
        ref_token_keeper_stop:
        type: lexis.nodes.operators.aai.LexisAAIStopKeeperOperator
        requirements:
            - previous_task: job1_heappe_job_delete
        
        # HEAppE Job context
        job1_heappe_session:
        type: lexis.nodes.operators.heappe.HEAppESessionOperator
        properties:
            heappe_uri: { get_input: job1_heappe_uri }
        requirements:
            - aai_operator: ref_token
            - previous_task: ref_token
        # keep session active
        job1_heappe_session_keeper_stop:
        type: lexis.nodes.operators.heappe.HEAppESessionStopKeeperOperator
        requirements:
            - previous_task: job1_heappe_job_delete
        
        job1_heappe_session_keeper:
        type: lexis.nodes.operators.heappe.HEAppESessionKeeperSensor
        properties:
            heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
            heappe_session_stop_task_id: { get_attribute: [ job1_heappe_session_keeper_stop, task_id ] }
        requirements:
            - heappe_session_task_id: job1_heappe_session
        # HEAppE Job
        job1_heappe_prepare_job:
        type: lexis.nodes.operators.heappe.HEAppEPrepareJobOperator
        properties:
            Name: Job Name n.1
            Project: { get_input: computation_project }
            ClusterId: { get_input: job1_cluster_id }
            CommandTemplateId: { get_input: job1_command_template_id }
            FileTransferMethodId: { get_input: job1_filetransfer_method_id }
            EnvironmentVariables: {}
            MaxCores: { get_input: job1_max_cores }
            Priority: 4
            ClusterNodeTypeId: { get_input: job1_cluster_node_type_id }
            TemplateParameterValues: { get_input: job1_parameters }
            heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        requirements:
            - heappe_session_task_id: job1_heappe_session

        # enable file transfer from DDI
        job1_heappe_enable_filetransfer:
        type: lexis.nodes.operators.heappe.HEAppEEnableFileTransferOperator
        properties:
            heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
            heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
        requirements:
            - heappe_session_task_id: job1_heappe_session
            - heappe_prepare_job_task_id: job1_heappe_prepare_job
        # transfer data
        job1_ddi_data_input_transfer:
        type: lexis.nodes.operators.ddi.LexisStagingOperator
        properties:
            source_system: it4i_iRODS
            source_path: { get_input: job1_input_dataset_path }
            target_system: karolina_home
            target_path: "/"
            heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
            heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] }
        requirements:
            - heappe_prepare_job_task_id: job1_heappe_prepare_job
            - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer
            - previous_task: job1_heappe_enable_filetransfer
        
        # wait for the DDI operation
        job1_ddi_data_input_wait:
        type: lexis.nodes.operators.ddi.LexisWaitStagingRequestSensor
        properties:
            staging_operator_task_id: { get_attribute: [ job1_ddi_data_input_transfer, task_id ] }
        requirements:
            - staging_operator_task_id: job1_ddi_data_input_transfer
        
        # submit HEAppe job
        job1_heappe_submit_job:
        type: lexis.nodes.operators.heappe.HEAppESubmitJobOperator
        properties:
            heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
            heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
        requirements:
            - heappe_session_task_id: job1_heappe_session
            - heappe_prepare_job_task_id: job1_heappe_prepare_job
            - previous_task: job1_ddi_data_input_wait
        
        job1_heappe_job_wait:
        type: lexis.nodes.operators.heappe.HEAppEWaitSubmittedJobSensor
        properties:
            heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
            heappe_submitted_job_task_id: { get_attribute: [ job1_heappe_submit_job, task_id ] }
        requirements:
            - heappe_session_task_id: job1_heappe_session
            - heappe_submitted_job_task_id: job1_heappe_submit_job
        
        # transfer job output
        job1_job_output_to_ddi:
        type: lexis.nodes.operators.ddi.LexisStagingOperator
        properties:
            target_system: it4i_iRODS
            target_path: { get_input: job1_output_dataset_path }
            source_system: karolina_home
            source_path: "/"
            heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
            heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] }
        requirements:
            - heappe_prepare_job_task_id: job1_heappe_prepare_job
            - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer
            - previous_task: job1_heappe_job_wait
        
        # wait for the DDI operation
        job1_ddi_data_output_wait:
        type: lexis.nodes.operators.ddi.LexisWaitStagingRequestSensor
        properties:
            staging_operator_task_id: { get_attribute: [ job1_job_output_to_ddi, task_id ] }
        requirements:
            - staging_operator_task_id: job1_job_output_to_ddi

        job1_heappe_end_filetransfer:
        type: lexis.nodes.operators.heappe.HEAppEEndFileTransferOperator
        properties:
            heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
            heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
            heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] }
        requirements:
            - heappe_session_task_id: job1_heappe_session
            - heappe_prepare_job_task_id: job1_heappe_prepare_job
            - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer
            - previous_task: job1_ddi_data_output_wait
        
        # end HEAppE job
        job1_heappe_job_delete:
        type: lexis.nodes.operators.heappe.HEAppEDeleteJobOperator
        properties:
            heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
            heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
        requirements:
            - heappe_session_task_id: job1_heappe_session
            - heappe_prepare_job_task_id: job1_heappe_prepare_job
            - previous_task: job1_heappe_end_filetransfer    


.. _SOM_DAG_Example:

SOM DAG and execution
------------------------
A short example of executing SOM algorithm (Self-Organizing Map) on the LEXIS portal is provided in this section. At first, the SOM DAG created
in python is shown below.

.. code-block:: python
    :caption: SOM DAG Python.

    ref_token = LexisAAIOperator(
        task_id="refresh_token_task",
        access_token="{{ params.access_token }}"
    )

    ref_token_keeper = LexisAAITokenKeeperSensor(
        task_id="ref_token_keeper",
    )

    ref_token_stop = LexisAAIStopKeeperOperator()

    # Get Heappe session
    job1_heappe_session = HEAppESessionOperator(task_id="job1_heappe_session", heappe_uri="{{ params.job1_heappe_uri }}")
    job1_heappe_session_keeper = HEAppESessionKeeperSensor(
        task_id="job1_heappe_session_keeper",
        heappe_session_task_id="job1_heappe_session",
        heappe_session_stop_task_id="job1_heappe_session_keeper_stop"
    )
    job1_heappe_session_keeper_stop = HEAppESessionStopKeeperOperator(
        task_id="job1_heappe_session_keeper_stop",
    )
    job1_heappe_prepare_job = HEAppEPrepareJobOperator(
        task_id="job1_heappe_prepare_job",
        heappe_session_task_id="job1_heappe_session",
        Name="{{ params.name }}",
        Project="{{ params.computation_project }}",
        ClusterId="{{ params.job1_cluster_id }}",
        CommandTemplateId="{{ params.job1_command_template_id }}",
        ClusterNodeTypeId="{{ params.job1_cluster_node_type_id }}",
        FileTransferMethodId="{{ params.job1_filetransfer_method_id }}",
        TemplateParameterValues="{{ params.job1_parameters }}",
        MaxCores="{{ params.job1_max_cores }}"
    )

    job1_heappe_enable_filetransfer = HEAppEEnableFileTransferOperator(
        task_id="job1_heappe_enable_filetransfer",
        heappe_prepare_job_task_id="job1_heappe_prepare_job",
        heappe_session_task_id="job1_heappe_session",
    )

    job1_ddi_data_input_transfer = LexisStagingOperator(
        task_id="job1_ddi_data_input_transfer",
        source_system="it4i_iRODS",
        source_path="{{ params.job1_input_dataset_path }}",
        target_system="karolina_home",
        target_path="/",
        heappe_prepare_job_task_id="job1_heappe_prepare_job",
        heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer"
    )

    job1_ddi_data_input_wait = LexisWaitStagingRequestSensor(
        task_id="job1_ddi_data_input_transfer_wait",
        staging_operator_task_id="job1_ddi_data_input_transfer"
    )

    job1_heappe_submit_job = HEAppESubmitJobOperator(
        task_id="job1_heappe_submit_job",
        heappe_prepare_job_task_id="job1_heappe_prepare_job",
        heappe_session_task_id="job1_heappe_session",
    )

    job1_heappe_job_wait = HEAppEWaitSubmittedJobSensor(
        task_id="job1_heappe_job_waiting",
        heappe_submitted_job_task_id="job1_heappe_submit_job",
        heappe_session_task_id="job1_heappe_session",
    )

    job1_job_output_to_ddi = LexisStagingOperator(
        task_id="job1_to_ddi",
        target_system="it4i_iRODS",
        target_path="{{ params.job1_output_dataset_path }}",
        source_system="karolina_home",
        source_path="/",
        metadata="{{ params.job1_output_ddi_metadata }}",
        heappe_prepare_job_task_id="job1_heappe_prepare_job",
        heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer"
    )

    job1_ddi_data_output_wait = LexisWaitStagingRequestSensor(
        task_id="job1_ddi_data_output_transfer_wait",
        staging_operator_task_id="job1_to_ddi"
    )

    job1_heappe_end_filetransfer = HEAppEEndFileTransferOperator(
        task_id="job1_heappe_end_filetransfer",
        heappe_prepare_job_task_id="job1_heappe_prepare_job",
        heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer",
        heappe_session_task_id="job1_heappe_session"
    )

    job1_heappe_job_delete = HEAppEDeleteJobOperator(
        task_id="job1_heappe_job_delete",
        heappe_prepare_job_task_id="job1_heappe_prepare_job",
        heappe_session_task_id="job1_heappe_session",
    )

    ref_token >> job1_heappe_session >> job1_heappe_prepare_job\
    >> job1_heappe_enable_filetransfer \
    >> job1_ddi_data_input_transfer >> job1_ddi_data_input_wait >> job1_heappe_submit_job >> job1_heappe_job_wait \
    >> job1_job_output_to_ddi >> job1_ddi_data_output_wait >> job1_heappe_end_filetransfer\
    >> job1_heappe_job_delete >> job1_heappe_session_keeper_stop >> ref_token_stop
    
    ref_token >> job1_heappe_session >> job1_heappe_session_keeper >> ref_token_stop
    ref_token >> ref_token_keeper

Further, in the figure below, the SOM DAG details from the LEXIS portal are shown. User can use the LEXIS portal to create a workflow execution. 
To do so, user has to click on **CREATE WORKFLOW EXECUTION** button below the DAG details.

.. figure:: Images/som01.png
    :align: center
    
    SOM workflow in LEXIS portal: DAG details.
    
Then, as seen in the figure below, job parameters has to be filled. After everything is set, user has to click on **CREATE** in the upper left corner.

.. figure:: Images/som02.png
    :align: center
    
    SOM workflow in LEXIS portal: set parameters.

After the workflow execution is created, the workflow runtime can be observed as shown in the figure below. Each rectangle is colored by appropriate
color during the runtime. All posible colors, which may occurs during the runtime, are shown in the bottom of workflow map.

.. figure:: Images/som03.png
    :align: center
    
    SOM workflow in LEXIS portal: workflow map.

If the workflow execution was successful, user can check the resulting data on the LEXIS portal. In this case, the data can be visualized using the 
aweSOM Shiny App developed within IT4Innovations, as can be seen below.

.. figure:: Images/som04.png
    :align: center
    
    SOM Shiny App: Load data.

The user can manipulate with interactive fields, buttons, and sliders, to adjust visualizations of the data.

.. figure:: Images/som05.png
    :align: center
    
    SOM Shiny App: data visualization.
