Airflow orchestrator

Airflow in LEXIS

As stated on the web presentation, the Airflow is a platform created by the community to programmatically author, schedule and monitor workflows. The Airflow platform is scalable, dynamic and easily extensible. The workflows within the Airflow are defined in Python language with the use of built-in operators or custom ones.

The LEXIS extends the Airflow with the possibility to describe a computational workflow using the OASIS TOSCA standard. Within the LEXIS, the user can also use the Python description (SOM DAG and execution) to manage the pipeline, but the arbitrary code represents a security risk. Therefore, the LEXIS team has to verify the pipelines described in Python before registering them into the Airflow. On the contrary, TOSCA improves security and makes workflow creation more user-friendly. It restricts the usage of the operators (workflow components) to a couple allowed, which implements interaction with HEAppE and Kubernetes to author execution on an HPC cluster or cloud. The DDI staging API handles data transfer between the computational tasks (see Distributed Data Infrastructure).

Furthermore, the Airflow offers many practical functionalities like grouping operators or advanced failure handling. The policies can be assigned to groups to achieve chaining of groups or running multiple instances of the group in parallel. For instance, race can be performed on multiple computational locations.

../_images/airflow-in-LEXIS-execute-workflow_diagram.png

Templating

As said above, the user can specify the computational workflow in OASIS TOSCA standard. Both initially target software deployment. Using them to describe computational workflows gives us a new possible use case. Templating allows the computational workflows to be more portable, system independent, and readable.

The TOSCA representation provides both declaration and behavioural description syntax. The declaration part is more suitable for LEXIS usage. It makes the workflow specification easier to describe. The example TOSCA workflow demonstrates computational task on HPC with data transfer.

The workflow templates are now available as workflows for container and custom job script execution. More templates will become available in the future. Support for the Common Workflow Language has been under development and testing as another standard that users can use to describe their computing workflows.

Workflow template editor

The idea of the workflow editor is to simplify the modification of the template. To increase the possibility of using HPC by a broader range of users, i.e., users of all categories of knowledge in computer science, the graphical interface for editing workflows is crucial.

With the use of the GUI, users can easily compose computational tasks into more or less advanced computational workflows. Each computational task may have input in the form of passed data or atomic types parameters. The output of the task is represented in the form of data only. Moreover, output data can be linked to the input of the subsequent task. Such abstraction can lead to better variability. For example, special postprocessing or preprocessing can be attached to the computation, or one can vary the core of the computation.

Finally, the editor generates the resulting computational workflow specification in the TOSCA standard. As the implementation of the emitter is done with the visitor design pattern, developing another editor on demand is not an issue.

The Workflow Editor uses Blazor Server framework based on .NET written in C# together with JavaScript Diagram JointJS.

The editor is currently being developed and tested on internal use-cases to validate its integration with the platform features.

../_images/WorkflowTemplateEditor_demo.gif ../_images/WorkflowTemplateEditor_demo2.gif

LEXIS Airflow Provider

As stated on the official website of the Apache Airflow:

Apache Airflow 2 is built in modular way. The “Core” of Apache Airflow provides core scheduler functionality which allow you to write some basic tasks, but the capabilities of Apache Airflow can be extended by installing additional packages, called providers.

Providers can contain operators, hooks, sensor, and transfer operators to communicate with a multitude of external systems, but they can also extend Airflow core with new capabilities.

LEXIS Airflow Provider extends the Airflow by new helpers, hooks, models, and operators needed for using the Airflow along with the LEXIS platform. Here, a brief summary of all is provided.

LEXIS Helpers

The helpers module provides the useful functions necessary for the hooks and operators described below. It consists of the following functions:

  • Function that returns a username from token:

    lexis.helpers.aai.get_username(token)
    
  • Function that acts as decorator and will add AAI ‘access_token’ to the context:

    lexis.helpers.aai.verify_conn(fn)
    

    where ‘fn’ means function that have to be wrapped. The method is usable only with Operators method ‘execute’.

LEXIS Hooks

The definition of a hook from the Apache Airflow official website is the following:

A Hook is a high-level interface to an external platform that lets you quickly and easily talk to them without having to write low-level code that hits their API or uses special libraries. They’re also often the building blocks that Operators are built out of.

They integrate with Connections to gather credentials, and many have a default conn_id; for example, the PostgresHook automatically looks for the Connection with a conn_id of postgres_default if you don’t pass one in.

Here, the hooks necessary for the LEXIS platform are described.

Keycloak Hook

Keycloak Hook is needed to manage Keycloak session during the LEXIS workflow. Keycloak hook can be imported by:

from lexis.hooks.aai import KeycloakHook

The class has one optional parameter is ‘conn_id’. By default:

conn_id: str="lexis_keycloak_default"

Keycloak hook consists of the following methods:

  • Function to get basic URI:

    get_uri(conn: Connection, extras)
    
  • Function to get client’s KeycloakOpenID:

    get_client(uri: str, realm: str, client_id: str, client_secret_key: str) -> KeycloakOpenID
    
  • Function that initiates a new KeycloakOpenID client (Should be called before using the client):

    get_conn() -> KeycloakOpenID
    
  • Function that exchanges token for refresh offline token with current client:

    exchange_for_refresh_offline_token(access_token: str) -> str
    

    The method returns refresh token of client set in Connection.

  • Function to get access token for refresh token:

    get_access_token(refresh_token: str)
    

LEXIS DDI Hook

The hook is used for the purpose of staging. It can be imported by:

from lexis.hooks.ddi import LexisDDIHook

The class has one optional parameter is ‘conn_id’. By default:

conn_id: str="lexis_keycloak_default"

LEXIS DDI hook consists of the following methods:

  • Function to get basic URI:

    get_uri(conn: Connection, extras)
    
  • Function that initiates hook with url from Connection:

    get_conn()
    
  • Function that stages dataset between DDI systems and HPC systems:

    submit_stage(access_token, source_system, source_path, target_system, target_path, heappe_url,
                 heappe_job_id, heappe_job_task_id, encryption=False, compression=False, metadata: DDIMetadata=None)->str
    
  • Function that returns staging status:

    staging_status(access_token: str, request_id: str) -> Literal["In progress", "Transfer completed"]|str
    
  • Function that returns available staging areas:

    get_available_staging_areas(access_token:str) -> list[str]
    

HEAppE Hook

HEAppE hook manages jobs and file transfer in connection with HEAppE middleware. It can be imported by:

from lexis.hooks.heappe import HEAppEHook

The class has one mandatory parameter ‘heappe_uri’, which holds the URI of the HEAppE. The HEAppE hook consists of the following functions:

  • Function that initiates a new ApiClient client (Should be called before using the client):

    get_conn()
    
  • Function that returns the HEAppE session:

    get_session(access_token: str)
    
  • Function that returns exception if the session is timed out:

    is_session_timeout_exception(ex: ApiException) -> bool
    
  • Function that prepare HPC job specification needed for submitting:

    prepare_job(heappe_session, heappe_job_spec: HEAppEJobSpec) -> dict
    

    It returns the job context.

  • Function that submits a HPC job:

    submit_job(heappe_session, job_context) -> dict
    

    It returns the context of the submitted job.

  • Function that returns a job status:

    get_job_state(heappe_session, submitted_job_context)
    
  • Function to delete a job:

    delete_job(heappe_session, job_context) -> dict
    

    It returns deleted job context.

  • Function that enables a file transfer:

    enable_filetransfer(heappe_session, job_context) -> dict
    

    It returns a file transfer context.

  • Function that ends a filetransfer:

    end_filetransfer(self, heappe_session, job_context, filetransfer_context) -> dict
    

    It returns a file transfer context.

  • Function that provide a file download from a cluster:

    download_file_from_cluster(heappe_session, submitted_job_context, file_path) -> dict
    

    It returns a decoded log.

LEXIS Models

The models module keeps the definitions of a structure for metadata needed within the DDI and HEAppE operators.

DDI Metadata

It can be imported by:

from lexis.models.ddi import DDIMetadata

The parameters that can be provided to the class are the following:

  • contributor: list[str] | None. By default: None.

  • creator: list[str] | None. By default: None.

  • owner: list[str] | None. By default: None.

  • publicationYear: str | None. By default: None.

  • publisher: list[str] | None. By default: None.

  • resourceType: str | None. By default: None.

  • title: str. By default: ‘’.

  • workflow_id: str. By default: ‘’.

  • workflowEx_id: str. By default: ‘’.

HEAppE Session Time Out

The class can be imported by:

from lexis.models.heappe import HEAppESessionTimeOut

It needs one mandatory parameter ‘old_session_id’. The class provides an exception if the session is timed out.

HEAppE Job specification

The class can be imported by:

from lexis.models.heappe import HEAppEJobSpec

The parameters that can be provided to the class are the following:

  • Name: str, a name of a job (mandatory).

  • Project: str, an accounting project (mandatory).

  • ClusterId: int, ID of a cluster (mandatory).

  • CommandTemplateId: int, ID of a command template (mandatory).

  • FileTransferMethodId: int, ID of a filetransfer method (optional). By default: None.

  • EnvironmentVariables: dict, a dictionary of environmental variables (optional). By default: {}.

  • MinCores: int, minimal number of cores. By default: 1.

  • MaxCores: int, maximal number of cores. By default: 64.

  • WalltimeLimit: int, maximal walltime limit in seconds. By default: 600,

  • Priority: int, priority of a job. By default: 4.

  • ClusterNodeTypeId: int, ID of a cluster node type. By default: 8.

  • TemplateParameterValues: dict, a dictionary of a template parameter values. By default: {}.

  • TaskParalizationParameters: Sequence[dict], a sequence of dictionaries for task parallelization parameters. By default: [].

LEXIS Operators

The definition of an operator from the Apache Airflow official website is the following:

An Operator is conceptually a template for a predefined Task, that you can just define declaratively inside your DAG.

Airflow has a very extensive set of operators available, with some built-in to the core or pre-installed providers. Some popular operators from core include:

  • BashOperator - executes a bash command

  • PythonOperator - calls an arbitrary Python function

  • EmailOperator - sends an email

  • Use the @task decorator to execute an arbitrary Python function. It doesn’t support rendering jinja templates passed as arguments.

It is also important to recall the following note:

Inside Airflow’s code, we often mix the concepts of Tasks and Operators, and they are mostly interchangeable. However, when we talk about a Task, we mean the generic “unit of execution” of a DAG; when we talk about an Operator, we mean a reusable, pre-made Task template whose logic is all done for you and that just needs some arguments.

Here, a list of existing LEXIS operators is provided:

  • lexis.operators.aai.LexisAAIOperator – Lexis AAI operator used to exchange access token for offline token and share it on XCOM.

  • lexis.operators.aai.LexisAAIStopKeeperOperator – LEXIS operator to stop LexisAAITokenKeeperSensor.

  • lexis.operators.aai.LexisAAITokenKeeperSensor – LEXIS operator to periodically refresh AAI token session, until the all of the task related to the session are done or failed.

  • lexis.operators.base.LexisOperatorBase – a simple base operator base.

  • lexis.operators.base.LexisAAIOperatorBase – LEXIS base operator to handle LEXIS AAI authentization.

  • lexis.operators.base.LexisStagingOperatorBase – LEXIS base operator to handle DDI staging.

  • lexis.operators.base.HEAppEOperatorBase – LEXIS base operator to handle HEAppE session.

  • lexis.operators.cloud.LexisCloudOperator – LEXIS operator to manage job for a cloud.

  • lexis.operators.ddi.LexisStagingOperator – LEXIS operator used to trigger staging operations on the DDI.

  • lexis.operators.ddi.LexisWaitStagingRequestSensor – LEXIS operator used to wait for staging operation.

  • lexis.operators.heappe.HEAppESessionOperator – LEXIS operator used to manage HEAppE session.

  • lexis.operators.heappe.HEAppEPrepareJobOperator – LEXIS operator used to prepare HEAppE job context by job specifications.

  • lexis.operators.heappe.HEAppESubmitJobOperator – LEXIS operator used to submit HEAppE job.

  • lexis.operators.heappe.HEAppEEnableFileTransferOperator – LEXIS operator used to enable file transfer for HEAppE job.

  • lexis.operators.heappe.HEAppEEndFileTransferOperator – LEXIS operator used to disable file transfer for HEAppE job.

  • lexis.operators.heappe.HEAppEDeleteJobOperator – LEXIS operator used to delete HEAppE job.

  • lexis.operators.heappe.HEAppEWaitSubmittedJobSensor – LEXIS operator used to wait for submitted job.

  • lexis.operators.heappe.HEAppESessionStopKeeperOperator – LEXIS operator used to stop HEAppESessionKeeperSensor.

  • lexis.operators.heappe.HEAppESessionKeeperSensor – LEXIS operator that periodically refresh HEAppE session, until the all of the task related to the session are done or failed.

LEXIS Airflow Plugin

Plugins are used to customize the Airflow installation. The official statement from the Apache Airflow website is the following:

Airflow offers a generic toolbox for working with data. Different organizations have different stacks and different needs. Using Airflow plugins can be a way for companies to customize their Airflow installation to reflect their ecosystem.

Plugins can be used as an easy way to write, share and activate new sets of features.

There’s also a need for a set of more complex applications to interact with different flavors of data and metadata.

The LEXIS Airflow plugin provides two base features: user authentization, and registration of DAGs to Airflow using the REST API.

LEXIS API View

The class PluginLexisAPIView extends the Airflow REST API by the following endpoints:

  • “/” – endpoint to check if the Airflow is alive.

  • “/create-dag” – endpoint to register new DAGs.

User authentization

User authentization backend is stored within the Python module lexis_api.auth.lexis_token. It consists of the following functions:

  • Function that returns all allowed projects for the current user:

    get_allowed_projects(userinfo)
    
  • Function that check if the current user has administration permissions:

    get_admin_status(userinfo)
    
  • Function that authenticates and sets current user if authorization header exists:

    auth_current_user() -> User | None
    
  • Decorator for the functions that require authentication:

    requires_authentication(function: T)
    

Examples

TOSCA workflow

Tosca description of HPC computational task for Airflow
tosca_definitions_version: tosca_simple_yaml_1_2


metadata:
template_name: lexis_job_with_data_dag
template_version: v1_0
template_uo: ADAS - IT4Innovations
template_author: Jan Swiatkowski
template_contact_email: jan.swiatkowski@vsb.cz
start_date: "2023-02-21"

description: LEXIS example workflow with data for Apache Airflow

imports:
- https://nextcloud.it4i.cz/s/PL2jsXSMRkMxj7Q/download/lexis-airflow-types.yaml

topology_template:
# TEMPLATE INPUTS
inputs:
    access_token:
    type: string
    required: True
    default: ""
    computation_project:
    type: string
    required: True
    default: ""
    job1_cluster_id:
    type: number
    required: True
    default: 2
    job1_command_template_id:
    type: number
    required: True
    default: 3
    job1_parameters:
    type: map
    required: True
    default: {"inputParam":"test"}
    job1_cluster_node_type_id:
    type: number
    required: True
    default: 8
    job1_max_cores:
    type: number
    required: True
    default: 24
    job1_filetransfer_method_id:
    type: number
    required: True
    default: 2
    job1_heappe_uri:
    type: string
    required: True
    default: "https://heappe.it4i.cz/lexis"
    job1_input_dataset_path:
    type: string
    required: True
    default: "project/proj812b07ed780274387d12c665fa3a4f7f/5e51f28c-88fa-11ec-a591-00155d4ab2e4/test10M.dat"
    job1_output_dataset_path:
    type: string
    required: True
    default: "project/proj812b07ed780274387d12c665fa3a4f7f"
    job1_output_ddi_metadata:
    type: map #lexis.datatypes.ddi.Metadata
    required: True
    default:
        contributor: ["Testing"]
        creator: ["Testing"]
        owner: ["Testing"]
        publicationYear: "2022"
        publisher: ["Airflow"]
        resourceType: Test data
        title: AIRFLOW test output TOSCA


# NODES
node_templates:
    # token management
    ref_token:
    type: lexis.nodes.operators.aai.LexisAAIOperator
    properties:
        access_token: { get_input: access_token }
    ref_token_keeper:
    type: lexis.nodes.operators.aai.LexisAAITokenKeeperSensor
    requirements:
        - aai_operator: ref_token
    ref_token_keeper_stop:
    type: lexis.nodes.operators.aai.LexisAAIStopKeeperOperator
    requirements:
        - previous_task: job1_heappe_job_delete

    # HEAppE Job context
    job1_heappe_session:
    type: lexis.nodes.operators.heappe.HEAppESessionOperator
    properties:
        heappe_uri: { get_input: job1_heappe_uri }
    requirements:
        - aai_operator: ref_token
        - previous_task: ref_token
    # keep session active
    job1_heappe_session_keeper_stop:
    type: lexis.nodes.operators.heappe.HEAppESessionStopKeeperOperator
    requirements:
        - previous_task: job1_heappe_job_delete

    job1_heappe_session_keeper:
    type: lexis.nodes.operators.heappe.HEAppESessionKeeperSensor
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_session_stop_task_id: { get_attribute: [ job1_heappe_session_keeper_stop, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
    # HEAppE Job
    job1_heappe_prepare_job:
    type: lexis.nodes.operators.heappe.HEAppEPrepareJobOperator
    properties:
        Name: Job Name n.1
        Project: { get_input: computation_project }
        ClusterId: { get_input: job1_cluster_id }
        CommandTemplateId: { get_input: job1_command_template_id }
        FileTransferMethodId: { get_input: job1_filetransfer_method_id }
        EnvironmentVariables: {}
        MaxCores: { get_input: job1_max_cores }
        Priority: 4
        ClusterNodeTypeId: { get_input: job1_cluster_node_type_id }
        TemplateParameterValues: { get_input: job1_parameters }
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session

    # enable file transfer from DDI
    job1_heappe_enable_filetransfer:
    type: lexis.nodes.operators.heappe.HEAppEEnableFileTransferOperator
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
    # transfer data
    job1_ddi_data_input_transfer:
    type: lexis.nodes.operators.ddi.LexisStagingOperator
    properties:
        source_system: it4i_iRODS
        source_path: { get_input: job1_input_dataset_path }
        target_system: karolina_home
        target_path: "/"
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
        heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] }
    requirements:
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
        - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer
        - previous_task: job1_heappe_enable_filetransfer

    # wait for the DDI operation
    job1_ddi_data_input_wait:
    type: lexis.nodes.operators.ddi.LexisWaitStagingRequestSensor
    properties:
        staging_operator_task_id: { get_attribute: [ job1_ddi_data_input_transfer, task_id ] }
    requirements:
        - staging_operator_task_id: job1_ddi_data_input_transfer

    # submit HEAppe job
    job1_heappe_submit_job:
    type: lexis.nodes.operators.heappe.HEAppESubmitJobOperator
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
        - previous_task: job1_ddi_data_input_wait

    job1_heappe_job_wait:
    type: lexis.nodes.operators.heappe.HEAppEWaitSubmittedJobSensor
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_submitted_job_task_id: { get_attribute: [ job1_heappe_submit_job, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
        - heappe_submitted_job_task_id: job1_heappe_submit_job

    # transfer job output
    job1_job_output_to_ddi:
    type: lexis.nodes.operators.ddi.LexisStagingOperator
    properties:
        target_system: it4i_iRODS
        target_path: { get_input: job1_output_dataset_path }
        source_system: karolina_home
        source_path: "/"
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
        heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] }
    requirements:
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
        - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer
        - previous_task: job1_heappe_job_wait

    # wait for the DDI operation
    job1_ddi_data_output_wait:
    type: lexis.nodes.operators.ddi.LexisWaitStagingRequestSensor
    properties:
        staging_operator_task_id: { get_attribute: [ job1_job_output_to_ddi, task_id ] }
    requirements:
        - staging_operator_task_id: job1_job_output_to_ddi

    job1_heappe_end_filetransfer:
    type: lexis.nodes.operators.heappe.HEAppEEndFileTransferOperator
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
        heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
        - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer
        - previous_task: job1_ddi_data_output_wait

    # end HEAppE job
    job1_heappe_job_delete:
    type: lexis.nodes.operators.heappe.HEAppEDeleteJobOperator
    properties:
        heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] }
        heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] }
    requirements:
        - heappe_session_task_id: job1_heappe_session
        - heappe_prepare_job_task_id: job1_heappe_prepare_job
        - previous_task: job1_heappe_end_filetransfer

SOM DAG and execution

A short example of executing SOM algorithm (Self-Organizing Map) on the LEXIS portal is provided in this section. At first, the SOM DAG created in python is shown below.

SOM DAG Python.
ref_token = LexisAAIOperator(
    task_id="refresh_token_task",
    access_token="{{ params.access_token }}"
)

ref_token_keeper = LexisAAITokenKeeperSensor(
    task_id="ref_token_keeper",
)

ref_token_stop = LexisAAIStopKeeperOperator()

# Get Heappe session
job1_heappe_session = HEAppESessionOperator(task_id="job1_heappe_session", heappe_uri="{{ params.job1_heappe_uri }}")
job1_heappe_session_keeper = HEAppESessionKeeperSensor(
    task_id="job1_heappe_session_keeper",
    heappe_session_task_id="job1_heappe_session",
    heappe_session_stop_task_id="job1_heappe_session_keeper_stop"
)
job1_heappe_session_keeper_stop = HEAppESessionStopKeeperOperator(
    task_id="job1_heappe_session_keeper_stop",
)
job1_heappe_prepare_job = HEAppEPrepareJobOperator(
    task_id="job1_heappe_prepare_job",
    heappe_session_task_id="job1_heappe_session",
    Name="{{ params.name }}",
    Project="{{ params.computation_project }}",
    ClusterId="{{ params.job1_cluster_id }}",
    CommandTemplateId="{{ params.job1_command_template_id }}",
    ClusterNodeTypeId="{{ params.job1_cluster_node_type_id }}",
    FileTransferMethodId="{{ params.job1_filetransfer_method_id }}",
    TemplateParameterValues="{{ params.job1_parameters }}",
    MaxCores="{{ params.job1_max_cores }}"
)

job1_heappe_enable_filetransfer = HEAppEEnableFileTransferOperator(
    task_id="job1_heappe_enable_filetransfer",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_session_task_id="job1_heappe_session",
)

job1_ddi_data_input_transfer = LexisStagingOperator(
    task_id="job1_ddi_data_input_transfer",
    source_system="it4i_iRODS",
    source_path="{{ params.job1_input_dataset_path }}",
    target_system="karolina_home",
    target_path="/",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer"
)

job1_ddi_data_input_wait = LexisWaitStagingRequestSensor(
    task_id="job1_ddi_data_input_transfer_wait",
    staging_operator_task_id="job1_ddi_data_input_transfer"
)

job1_heappe_submit_job = HEAppESubmitJobOperator(
    task_id="job1_heappe_submit_job",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_session_task_id="job1_heappe_session",
)

job1_heappe_job_wait = HEAppEWaitSubmittedJobSensor(
    task_id="job1_heappe_job_waiting",
    heappe_submitted_job_task_id="job1_heappe_submit_job",
    heappe_session_task_id="job1_heappe_session",
)

job1_job_output_to_ddi = LexisStagingOperator(
    task_id="job1_to_ddi",
    target_system="it4i_iRODS",
    target_path="{{ params.job1_output_dataset_path }}",
    source_system="karolina_home",
    source_path="/",
    metadata="{{ params.job1_output_ddi_metadata }}",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer"
)

job1_ddi_data_output_wait = LexisWaitStagingRequestSensor(
    task_id="job1_ddi_data_output_transfer_wait",
    staging_operator_task_id="job1_to_ddi"
)

job1_heappe_end_filetransfer = HEAppEEndFileTransferOperator(
    task_id="job1_heappe_end_filetransfer",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer",
    heappe_session_task_id="job1_heappe_session"
)

job1_heappe_job_delete = HEAppEDeleteJobOperator(
    task_id="job1_heappe_job_delete",
    heappe_prepare_job_task_id="job1_heappe_prepare_job",
    heappe_session_task_id="job1_heappe_session",
)

ref_token >> job1_heappe_session >> job1_heappe_prepare_job\
>> job1_heappe_enable_filetransfer \
>> job1_ddi_data_input_transfer >> job1_ddi_data_input_wait >> job1_heappe_submit_job >> job1_heappe_job_wait \
>> job1_job_output_to_ddi >> job1_ddi_data_output_wait >> job1_heappe_end_filetransfer\
>> job1_heappe_job_delete >> job1_heappe_session_keeper_stop >> ref_token_stop

ref_token >> job1_heappe_session >> job1_heappe_session_keeper >> ref_token_stop
ref_token >> ref_token_keeper

Further, in the figure below, the SOM DAG details from the LEXIS portal are shown. User can use the LEXIS portal to create a workflow execution. To do so, user has to click on CREATE WORKFLOW EXECUTION button below the DAG details.

../_images/som01.png

SOM workflow in LEXIS portal: DAG details.

Then, as seen in the figure below, job parameters has to be filled. After everything is set, user has to click on CREATE in the upper left corner.

../_images/som02.png

SOM workflow in LEXIS portal: set parameters.

After the workflow execution is created, the workflow runtime can be observed as shown in the figure below. Each rectangle is colored by appropriate color during the runtime. All posible colors, which may occurs during the runtime, are shown in the bottom of workflow map.

../_images/som03.png

SOM workflow in LEXIS portal: workflow map.

If the workflow execution was successful, user can check the resulting data on the LEXIS portal. In this case, the data can be visualized using the aweSOM Shiny App developed within IT4Innovations, as can be seen below.

../_images/som04.png

SOM Shiny App: Load data.

The user can manipulate with interactive fields, buttons, and sliders, to adjust visualizations of the data.

../_images/som05.png

SOM Shiny App: data visualization.