Airflow orchestrator#

Templating#

LEXIS Platform supports three basic templates for the workflow creation: - Container to execute a job containing apptainer (singularity) container, - HPC Command Application to execute predefined HEAppE Command Template (job script), - Custom Job Script to execute custom (user defined) job script.

Anyway, as mentioned above, the user can specify the computational workflow using LWD YAML specification. It allows to create a custom (arbitrary) workflow in a user-friendly way while ensuring the security as described few lines above. Thus, the user can define as arbitrary workflow as possible. The example Complete Example demonstrates simple HPC workflow with data transfer.

Examples#

TOSCA workflow#

Tosca description of HPC computational task for Airflow#
tosca_definitions_version: tosca_simple_yaml_1_2


metadata:
template_name: lexis_job_with_data_dag
template_version: v1_0
template_uo: ADAS - IT4Innovations
template_author: Jan Swiatkowski
template_contact_email: jan.swiatkowski@vsb.cz
start_date: "2023-02-21"

description: LEXIS example workflow with data for Apache Airflow

imports:
- https://nextcloud.it4i.cz/s/PL2jsXSMRkMxj7Q/download/lexis-airflow-types.yaml

topology_template:
# TEMPLATE INPUTS
inputs:
    access_token:
    type: string
    required: True
    default: ""
    computation_project:
    type: string
    required: True
    default: ""
    job1_cluster_id:
    type: number
    required: True
    default: 2
    job1_command_template_id:
    type: number
    required: True
    default: 3
    job1_parameters:
    type: map
    required: True
    default: {"inputParam":"test"}
    job1_cluster_node_type_id:
    type: number
    required: True
    default: 8
    job1_max_cores:
    type: number
    required: True
    default: 24
    job1_filetransfer_method_id:
    type: number
    required: True
    default: 2
    job1_heappe_uri:
    type: string
    required: True
    default: "https://heappe.it4i.cz/lexis"
    job1_input_dataset_path:
    type: string
    required: True
    default: "project/proj812b07ed780274387d12c665fa3a4f7f/5e51f28c-88fa-11ec-a591-00155d4ab2e4/test10M.dat"
    job1_output_dataset_path:
    type: string
    required: True
    default: "project/proj812b07ed780274387d12c665fa3a4f7f"
    job1_output_ddi_metadata:
    type: map #lexis.datatypes.ddi.Metadata
    required: True
    default:
        contributor: ["Testing"]
        creator: ["Testing"]
        owner: ["Testing"]
        publicationYear: "2022"
        publisher: ["Airflow"]
        resourceType: Test data
        title: AIRFLOW test output TOSCA


# NODES
node_templates:
    # token management
    ref_token:
    type: lexis.nodes.operators.aai.LexisAAIOperator
    properties:
        access_token: { get_input: access_token }
    ref_token_keeper:
    type: lexis.nodes.operators.aai.LexisAAITokenKeeperSensor
    requirements:
        - aai_operator: ref_token
    ref_token_keeper_stop:
    type: lexis.nodes.operators.aai.LexisAAIStopKeeperOperator
    requirements:
        - previous_task: job1_heappe_job_delete

    # HEAppE Job context
    job1_heappe_session:
    type: lexis.nodes.operators.heappe.HEAppESessionOperator
    properties:
        heappe_uri: { get_input: job1_heappe_uri }
    requirements:
        - aai_operator: ref_token
        - previous_task: ref_token
    # keep session active
    job1_heappe_session_keeper_stop:
    type: lexis.nodes.operators.heappe.HEAppESessionStopKeeperOperator
    requirements:
        - previous_task: job1_heappe_job_delete

    job1_heappe_session_keeper:
    type: lexis.nodes.operators.heappe.HEAppESessionKeeperSensor
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_session_stop_task_id: { get_attribute: [ job1_heappe_session_keeper_stop, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
    # HEAppE Job
    job1_heappe_prepare_job:
    type: lexis.nodes.operators.heappe.HEAppEPrepareJobOperator
    properties:
        Name: Job Name n.1
        Project: { get_input: computation_project }
        ClusterId: { get_input: job1_cluster_id }
        CommandTemplateId: { get_input: job1_command_template_id }
        FileTransferMethodId: { get_input: job1_filetransfer_method_id }
        EnvironmentVariables: {}
        MaxCores: { get_input: job1_max_cores }
        Priority: 4
        ClusterNodeTypeId: { get_input: job1_cluster_node_type_id }
        TemplateParameterValues: { get_input: job1_parameters }
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session

    # enable file transfer from DDI
    job1_heappe_enable_filetransfer:
    type: lexis.nodes.operators.heappe.HEAppEEnableFileTransferOperator
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
    # transfer data
    job1_ddi_data_input_transfer:
    type: lexis.nodes.operators.ddi.LexisStagingOperator
    properties:
        source_system: it4i_iRODS
        source_path: { get_input: job1_input_dataset_path }
        target_system: karolina_home
        target_path: "/"
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
        heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] }
    requirements:
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
        - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer
        - previous_task: job1_heappe_enable_filetransfer

    # wait for the DDI operation
    job1_ddi_data_input_wait:
    type: lexis.nodes.operators.ddi.LexisWaitStagingRequestSensor
    properties:
        staging_operator_task_id: { get_attribute: [ job1_ddi_data_input_transfer, task_id ] }
    requirements:
        - staging_operator_task_id: job1_ddi_data_input_transfer

    # submit HEAppe job
    job1_heappe_submit_job:
    type: lexis.nodes.operators.heappe.HEAppESubmitJobOperator
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
        - previous_task: job1_ddi_data_input_wait

    job1_heappe_job_wait:
    type: lexis.nodes.operators.heappe.HEAppEWaitSubmittedJobSensor
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_submitted_job_task_id: { get_attribute: [ job1_heappe_submit_job, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
        - heappe_submitted_job_task_id: job1_heappe_submit_job

    # transfer job output
    job1_job_output_to_ddi:
    type: lexis.nodes.operators.ddi.LexisStagingOperator
    properties:
        target_system: it4i_iRODS
        target_path: { get_input: job1_output_dataset_path }
        source_system: karolina_home
        source_path: "/"
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
        heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] }
    requirements:
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
        - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer
        - previous_task: job1_heappe_job_wait

    # wait for the DDI operation
    job1_ddi_data_output_wait:
    type: lexis.nodes.operators.ddi.LexisWaitStagingRequestSensor
    properties:
        staging_operator_task_id: { get_attribute: [ job1_job_output_to_ddi, task_id ] }
    requirements:
        - staging_operator_task_id: job1_job_output_to_ddi

    job1_heappe_end_filetransfer:
    type: lexis.nodes.operators.heappe.HEAppEEndFileTransferOperator
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
        heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
        - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer
        - previous_task: job1_ddi_data_output_wait

    # end HEAppE job
    job1_heappe_job_delete:
    type: lexis.nodes.operators.heappe.HEAppEDeleteJobOperator
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
        - previous_task: job1_heappe_end_filetransfer

SOM DAG and execution#

A short example of executing SOM algorithm (Self-Organizing Map) on the LEXIS portal is provided in this section. At first, the SOM DAG created in python is shown below.

SOM DAG Python.#
ref_token = LexisAAIOperator(
    task_id="refresh_token_task",
    access_token="{{ params.access_token }}"
)

ref_token_keeper = LexisAAITokenKeeperSensor(
    task_id="ref_token_keeper",
)

ref_token_stop = LexisAAIStopKeeperOperator()

# Get Heappe session
job1_heappe_session = HEAppESessionOperator(task_id="job1_heappe_session", heappe_uri="{{ params.job1_heappe_uri }}")
job1_heappe_session_keeper = HEAppESessionKeeperSensor(
    task_id="job1_heappe_session_keeper",
    heappe_session_task_id="job1_heappe_session",
    heappe_session_stop_task_id="job1_heappe_session_keeper_stop"
)
job1_heappe_session_keeper_stop = HEAppESessionStopKeeperOperator(
    task_id="job1_heappe_session_keeper_stop",
)
job1_heappe_prepare_job = HEAppEPrepareJobOperator(
    task_id="job1_heappe_prepare_job",
    heappe_session_task_id="job1_heappe_session",
    Name="{{ params.name }}",
    Project="{{ params.computation_project }}",
    ClusterId="{{ params.job1_cluster_id }}",
    CommandTemplateId="{{ params.job1_command_template_id }}",
    ClusterNodeTypeId="{{ params.job1_cluster_node_type_id }}",
    FileTransferMethodId="{{ params.job1_filetransfer_method_id }}",
    TemplateParameterValues="{{ params.job1_parameters }}",
    MaxCores="{{ params.job1_max_cores }}"
)

job1_heappe_enable_filetransfer = HEAppEEnableFileTransferOperator(
    task_id="job1_heappe_enable_filetransfer",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_session_task_id="job1_heappe_session",
)

job1_ddi_data_input_transfer = LexisStagingOperator(
    task_id="job1_ddi_data_input_transfer",
    source_system="it4i_iRODS",
    source_path="{{ params.job1_input_dataset_path }}",
    target_system="karolina_home",
    target_path="/",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer"
)

job1_ddi_data_input_wait = LexisWaitStagingRequestSensor(
    task_id="job1_ddi_data_input_transfer_wait",
    staging_operator_task_id="job1_ddi_data_input_transfer"
)

job1_heappe_submit_job = HEAppESubmitJobOperator(
    task_id="job1_heappe_submit_job",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_session_task_id="job1_heappe_session",
)

job1_heappe_job_wait = HEAppEWaitSubmittedJobSensor(
    task_id="job1_heappe_job_waiting",
    heappe_submitted_job_task_id="job1_heappe_submit_job",
    heappe_session_task_id="job1_heappe_session",
)

job1_job_output_to_ddi = LexisStagingOperator(
    task_id="job1_to_ddi",
    target_system="it4i_iRODS",
    target_path="{{ params.job1_output_dataset_path }}",
    source_system="karolina_home",
    source_path="/",
    metadata="{{ params.job1_output_ddi_metadata }}",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer"
)

job1_ddi_data_output_wait = LexisWaitStagingRequestSensor(
    task_id="job1_ddi_data_output_transfer_wait",
    staging_operator_task_id="job1_to_ddi"
)

job1_heappe_end_filetransfer = HEAppEEndFileTransferOperator(
    task_id="job1_heappe_end_filetransfer",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer",
    heappe_session_task_id="job1_heappe_session"
)

job1_heappe_job_delete = HEAppEDeleteJobOperator(
    task_id="job1_heappe_job_delete",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_session_task_id="job1_heappe_session",
)

ref_token >> job1_heappe_session >> job1_heappe_prepare_job\
>> job1_heappe_enable_filetransfer \
>> job1_ddi_data_input_transfer >> job1_ddi_data_input_wait >> job1_heappe_submit_job >> job1_heappe_job_wait \
>> job1_job_output_to_ddi >> job1_ddi_data_output_wait >> job1_heappe_end_filetransfer\
>> job1_heappe_job_delete >> job1_heappe_session_keeper_stop >> ref_token_stop

ref_token >> job1_heappe_session >> job1_heappe_session_keeper >> ref_token_stop
ref_token >> ref_token_keeper

Further, in the figure below, the SOM DAG details from the LEXIS portal are shown. User can use the LEXIS portal to create a workflow execution. To do so, user has to click on CREATE WORKFLOW EXECUTION button below the DAG details.

../_images/som01.png

SOM workflow in LEXIS portal: DAG details.#

Then, as seen in the figure below, job parameters has to be filled. After everything is set, user has to click on CREATE in the upper left corner.

../_images/som02.png

SOM workflow in LEXIS portal: set parameters.#

After the workflow execution is created, the workflow runtime can be observed as shown in the figure below. Each rectangle is colored by appropriate color during the runtime. All posible colors, which may occurs during the runtime, are shown in the bottom of workflow map.

../_images/som03.png

SOM workflow in LEXIS portal: workflow map.#

If the workflow execution was successful, user can check the resulting data on the LEXIS portal. In this case, the data can be visualized using the aweSOM Shiny App developed within IT4Innovations, as can be seen below.

../_images/som04.png

SOM Shiny App: Load data.#

The user can manipulate with interactive fields, buttons, and sliders, to adjust visualizations of the data.

../_images/som05.png

SOM Shiny App: data visualization.#