==================== Airflow orchestrator ==================== Airflow in LEXIS ================ As stated on the `web presentation `_, the Airflow is a platform created by the community to programmatically author, schedule and monitor workflows. The Airflow platform is scalable, dynamic and easily extensible. The workflows within the Airflow are defined in Python language with the use of built-in operators or custom ones. The LEXIS extends the Airflow with the possibility to describe a computational workflow using the `OASIS TOSCA `_ or `CWL `_ standards. Within the LEXIS, the user can also use the Python description (:ref:`SOM_DAG_Example`) to manage the pipeline, but the arbitrary code represents a security risk. Therefore, the LEXIS team has to verify the pipelines described in Python before registering them into the Airflow. On the contrary, TOSCA or CWL improves security and makes workflow creation more user-friendly. It restricts the usage of the operators (workflow components) to a couple allowed, which implements interaction with HEAppE and Kubernetes to author execution on an HPC cluster or cloud. The DDI staging API handles data transfer between the computational tasks (see :ref:`ddi-service`). The Airflow does not directly support Tosca or CWL templates. Furthermore, the Airflow offers many practical functionalities like grouping operators or advanced failure handling. The policies can be assigned to groups to achieve chaining of groups or running multiple instances of the group in parallel. For instance, race can be performed on multiple computational locations. .. image:: Images/airflow-in-LEXIS-execute-workflow_diagram.png Templating =========== As said above, the user can specify the computational workflow in `OASIS TOSCA `_ or `CWL `_ standards. Both initially target software deployment. Using them to describe computational workflows gives us a new possible use case. Templating allows the computational workflows to be more portable, system independent, and readable. The TOSCA representation provides both declaration and behavioural description syntax. The declaration part is more suitable for LEXIS usage. It makes the workflow specification easier to describe. The example :ref:`TOSCA_example` demonstrates computational task on HPC with data transfer. The official support for templates standards will be released in Q2, 2023. Workflow template editor ======================== The idea of the workflow editor is to simplify the modification of the template. To increase the possibility of using HPC by a broader range of users, i.e., users of all categories of knowledge in computer science, the graphical interface for editing workflows is crucial. With the use of the GUI, users can easily compose computational tasks into more or less advanced computational workflows. Each computational task may have input in the form of passed data or atomic types parameters. The output of the task is represented in the form of data only. Moreover, output data can be linked to the input of the subsequent task. Such abstraction can lead to better variability. For example, special postprocessing or preprocessing can be attached to the computation, or one can vary the core of the computation. Finally, the editor generates the resulting computational workflow specification in the TOSCA standard. As the implementation of the emitter is done with the visitor design pattern, developing another editor on demand is not an issue. The Workflow Editor uses `Blazor Server `_ framework based on `.NET `_ written in C# together with JavaScript Diagram `JointJS `_. The official support for workflow graphical editor will be released in Q3, 2023. .. image:: Images/WorkflowEditor/WorkflowTemplateEditor_demo.gif .. image:: Images/WorkflowEditor/WorkflowTemplateEditor_demo2.gif LEXIS Airflow Provider ====================== As stated on the official website of the Apache Airflow: Apache Airflow 2 is built in modular way. The “Core” of Apache Airflow provides core scheduler functionality which allow you to write some basic tasks, but the capabilities of Apache Airflow can be extended by installing additional packages, called providers. Providers can contain operators, hooks, sensor, and transfer operators to communicate with a multitude of external systems, but they can also extend Airflow core with new capabilities. LEXIS Airflow Provider extends the Airflow by new helpers, hooks, models, and operators needed for using the Airflow along with the LEXIS platform. Here, a brief summary of all is provided. LEXIS Helpers ------------- The helpers module provides the useful functions necessary for the hooks and operators described below. It consists of the following functions: * Function that returns a username from token:: lexis.helpers.aai.get_username(token) * Function that acts as decorator and will add AAI 'access\_token' to the context:: lexis.helpers.aai.verify_conn(fn) where 'fn' means function that have to be wrapped. The method is usable only with Operators method 'execute'. LEXIS Hooks ----------- The definition of a hook from the Apache Airflow official website is the following: A Hook is a high-level interface to an external platform that lets you quickly and easily talk to them without having to write low-level code that hits their API or uses special libraries. They're also often the building blocks that Operators are built out of. They integrate with Connections to gather credentials, and many have a default conn_id; for example, the PostgresHook automatically looks for the Connection with a conn_id of postgres_default if you don't pass one in. Here, the hooks necessary for the LEXIS platform are described. Keycloak Hook ^^^^^^^^^^^^^ Keycloak Hook is needed to manage Keycloak session during the LEXIS workflow. Keycloak hook can be imported by:: from lexis.hooks.aai import KeycloakHook The class has one optional parameter is 'conn_id'. By default:: conn_id: str="lexis_keycloak_default" Keycloak hook consists of the following methods: * Function to get basic URI:: get_uri(conn: Connection, extras) * Function to get client's KeycloakOpenID:: get_client(uri: str, realm: str, client_id: str, client_secret_key: str) -> KeycloakOpenID * Function that initiates a new KeycloakOpenID client (Should be called before using the client):: get_conn() -> KeycloakOpenID * Function that exchanges token for refresh offline token with current client:: exchange_for_refresh_offline_token(access_token: str) -> str The method returns refresh token of client set in Connection. * Function to get access token for refresh token:: get_access_token(refresh_token: str) LEXIS DDI Hook ^^^^^^^^^^^^^^ The hook is used for the purpose of staging. It can be imported by:: from lexis.hooks.ddi import LexisDDIHook The class has one optional parameter is 'conn_id'. By default:: conn_id: str="lexis_keycloak_default" LEXIS DDI hook consists of the following methods: * Function to get basic URI:: get_uri(conn: Connection, extras) * Function that initiates hook with url from Connection:: get_conn() * Function that stages dataset between DDI systems and HPC systems:: submit_stage(access_token, source_system, source_path, target_system, target_path, heappe_url, heappe_job_id, heappe_job_task_id, encryption=False, compression=False, metadata: DDIMetadata=None)->str * Function that returns staging status:: staging_status(access_token: str, request_id: str) -> Literal["In progress", "Transfer completed"]|str * Function that returns available staging areas:: get_available_staging_areas(access_token:str) -> list[str] HEAppE Hook ^^^^^^^^^^^ HEAppE hook manages jobs and file transfer in connection with HEAppE middleware. It can be imported by:: from lexis.hooks.heappe import HEAppEHook The class has one mandatory parameter 'heappe\_uri', which holds the URI of the HEAppE. The HEAppE hook consists of the following functions: * Function that initiates a new ApiClient client (Should be called before using the client):: get_conn() * Function that returns the HEAppE session:: get_session(access_token: str) * Function that returns exception if the session is timed out:: is_session_timeout_exception(ex: ApiException) -> bool * Function that prepare HPC job specification needed for submitting:: prepare_job(heappe_session, heappe_job_spec: HEAppEJobSpec) -> dict It returns the job context. * Function that submits a HPC job:: submit_job(heappe_session, job_context) -> dict It returns the context of the submitted job. * Function that returns a job status:: get_job_state(heappe_session, submitted_job_context) * Function to delete a job:: delete_job(heappe_session, job_context) -> dict It returns deleted job context. * Function that enables a file transfer:: enable_filetransfer(heappe_session, job_context) -> dict It returns a file transfer context. * Function that ends a filetransfer:: end_filetransfer(self, heappe_session, job_context, filetransfer_context) -> dict It returns a file transfer context. * Function that provide a file download from a cluster:: download_file_from_cluster(heappe_session, submitted_job_context, file_path) -> dict It returns a decoded log. LEXIS Models ------------ The models module keeps the definitions of a structure for metadata needed within the DDI and HEAppE operators. DDI Metadata ^^^^^^^^^^^^ It can be imported by:: from lexis.models.ddi import DDIMetadata The parameters that can be provided to the class are the following: * **contributor**: list[str] | None. By default: None. * **creator**: list[str] | None. By default: None. * **owner**: list[str] | None. By default: None. * **publicationYear**: str | None. By default: None. * **publisher**: list[str] | None. By default: None. * **resourceType**: str | None. By default: None. * **title**: str. By default: ''. * **workflow_id**: str. By default: ''. * **workflowEx_id**: str. By default: ''. HEAppE Session Time Out ^^^^^^^^^^^^^^^^^^^^^^^^ The class can be imported by:: from lexis.models.heappe import HEAppESessionTimeOut It needs one mandatory parameter 'old_session_id'. The class provides an exception if the session is timed out. HEAppE Job specification ^^^^^^^^^^^^^^^^^^^^^^^^ The class can be imported by:: from lexis.models.heappe import HEAppEJobSpec The parameters that can be provided to the class are the following: * **Name**: str, a name of a job (mandatory). * **Project**: str, an accounting project (mandatory). * **ClusterId**: int, ID of a cluster (mandatory). * **CommandTemplateId**: int, ID of a command template (mandatory). * **FileTransferMethodId**: int, ID of a filetransfer method (optional). By default: None. * **EnvironmentVariables**: dict, a dictionary of environmental variables (optional). By default: {}. * **MinCores**: int, minimal number of cores. By default: 1. * **MaxCores**: int, maximal number of cores. By default: 64. * **WalltimeLimit**: int, maximal walltime limit in seconds. By default: 600, * **Priority**: int, priority of a job. By default: 4. * **ClusterNodeTypeId**: int, ID of a cluster node type. By default: 8. * **TemplateParameterValues**: dict, a dictionary of a template parameter values. By default: {}. * **TaskParalizationParameters**: Sequence[dict], a sequence of dictionaries for task parallelization parameters. By default: []. LEXIS Operators --------------- The definition of an operator from the Apache Airflow official website is the following: An Operator is conceptually a template for a predefined Task, that you can just define declaratively inside your DAG. Airflow has a very extensive set of operators available, with some built-in to the core or pre-installed providers. Some popular operators from core include: * BashOperator - executes a bash command * PythonOperator - calls an arbitrary Python function * EmailOperator - sends an email * Use the @task decorator to execute an arbitrary Python function. It doesn't support rendering jinja templates passed as arguments. It is also important to recall the following note: Inside Airflow's code, we often mix the concepts of Tasks and Operators, and they are mostly interchangeable. However, when we talk about a Task, we mean the generic "unit of execution" of a DAG; when we talk about an Operator, we mean a reusable, pre-made Task template whose logic is all done for you and that just needs some arguments. Here, a list of existing LEXIS operators is provided: * **lexis.operators.aai.LexisAAIOperator** -- Lexis AAI operator used to exchange access token for offline token and share it on XCOM. * **lexis.operators.aai.LexisAAIStopKeeperOperator** -- LEXIS operator to stop LexisAAITokenKeeperSensor. * **lexis.operators.aai.LexisAAITokenKeeperSensor** -- LEXIS operator to periodically refresh AAI token session, until the all of the task related to the session are done or failed. * **lexis.operators.base.LexisOperatorBase** -- a simple base operator base. * **lexis.operators.base.LexisAAIOperatorBase** -- LEXIS base operator to handle LEXIS AAI authentization. * **lexis.operators.base.LexisStagingOperatorBase** -- LEXIS base operator to handle DDI staging. * **lexis.operators.base.HEAppEOperatorBase** -- LEXIS base operator to handle HEAppE session. * **lexis.operators.cloud.LexisCloudOperator** -- LEXIS operator to manage job for a cloud. * **lexis.operators.ddi.LexisStagingOperator** -- LEXIS operator used to trigger staging operations on the DDI. * **lexis.operators.ddi.LexisWaitStagingRequestSensor** -- LEXIS operator used to wait for staging operation. * **lexis.operators.heappe.HEAppESessionOperator** -- LEXIS operator used to manage HEAppE session. * **lexis.operators.heappe.HEAppEPrepareJobOperator** -- LEXIS operator used to prepare HEAppE job context by job specifications. * **lexis.operators.heappe.HEAppESubmitJobOperator** -- LEXIS operator used to submit HEAppE job. * **lexis.operators.heappe.HEAppEEnableFileTransferOperator** -- LEXIS operator used to enable file transfer for HEAppE job. * **lexis.operators.heappe.HEAppEEndFileTransferOperator** -- LEXIS operator used to disable file transfer for HEAppE job. * **lexis.operators.heappe.HEAppEDeleteJobOperator** -- LEXIS operator used to delete HEAppE job. * **lexis.operators.heappe.HEAppEWaitSubmittedJobSensor** -- LEXIS operator used to wait for submitted job. * **lexis.operators.heappe.HEAppESessionStopKeeperOperator** -- LEXIS operator used to stop HEAppESessionKeeperSensor. * **lexis.operators.heappe.HEAppESessionKeeperSensor** -- LEXIS operator that periodically refresh HEAppE session, until the all of the task related to the session are done or failed. LEXIS Airflow Plugin ==================== Plugins are used to customize the Airflow installation. The official statement from the Apache Airflow website is the following: Airflow offers a generic toolbox for working with data. Different organizations have different stacks and different needs. Using Airflow plugins can be a way for companies to customize their Airflow installation to reflect their ecosystem. Plugins can be used as an easy way to write, share and activate new sets of features. There's also a need for a set of more complex applications to interact with different flavors of data and metadata. The LEXIS Airflow plugin provides two base features: user authentization, and registration of DAGs to Airflow using the REST API. LEXIS API View -------------- The class **PluginLexisAPIView** extends the Airflow REST API by the following endpoints: * "/" -- endpoint to check if the Airflow is alive. * "/create-dag" -- endpoint to register new DAGs. User authentization ------------------- User authentization backend is stored within the Python module **lexis\_api.auth.lexis_token**. It consists of the following functions: * Function that returns all allowed projects for the current user:: get_allowed_projects(userinfo) * Function that check if the current user has administration permissions:: get_admin_status(userinfo) * Function that authenticates and sets current user if authorization header exists:: auth_current_user() -> User | None * Decorator for the functions that require authentication:: requires_authentication(function: T) Examples ======== .. _TOSCA_example: TOSCA workflow -------------- .. code-block:: none :caption: Tosca description of HPC computational task for Airflow tosca_definitions_version: tosca_simple_yaml_1_2 metadata: template_name: lexis_job_with_data_dag template_version: v1_0 template_uo: ADAS - IT4Innovations template_author: Jan Swiatkowski template_contact_email: jan.swiatkowski@vsb.cz start_date: "2023-02-21" description: LEXIS example workflow with data for Apache Airflow imports: - https://nextcloud.it4i.cz/s/PL2jsXSMRkMxj7Q/download/lexis-airflow-types.yaml topology_template: # TEMPLATE INPUTS inputs: access_token: type: string required: True default: "" computation_project: type: string required: True default: "" job1_cluster_id: type: number required: True default: 2 job1_command_template_id: type: number required: True default: 3 job1_parameters: type: map required: True default: {"inputParam":"test"} job1_cluster_node_type_id: type: number required: True default: 8 job1_max_cores: type: number required: True default: 24 job1_filetransfer_method_id: type: number required: True default: 2 job1_heappe_uri: type: string required: True default: "https://heappe.it4i.cz/lexis" job1_input_dataset_path: type: string required: True default: "project/proj812b07ed780274387d12c665fa3a4f7f/5e51f28c-88fa-11ec-a591-00155d4ab2e4/test10M.dat" job1_output_dataset_path: type: string required: True default: "project/proj812b07ed780274387d12c665fa3a4f7f" job1_output_ddi_metadata: type: map #lexis.datatypes.ddi.Metadata required: True default: contributor: ["Testing"] creator: ["Testing"] owner: ["Testing"] publicationYear: "2022" publisher: ["Airflow"] resourceType: Test data title: AIRFLOW test output TOSCA # NODES node_templates: # token management ref_token: type: lexis.nodes.operators.aai.LexisAAIOperator properties: access_token: { get_input: access_token } ref_token_keeper: type: lexis.nodes.operators.aai.LexisAAITokenKeeperSensor requirements: - aai_operator: ref_token ref_token_keeper_stop: type: lexis.nodes.operators.aai.LexisAAIStopKeeperOperator requirements: - previous_task: job1_heappe_job_delete # HEAppE Job context job1_heappe_session: type: lexis.nodes.operators.heappe.HEAppESessionOperator properties: heappe_uri: { get_input: job1_heappe_uri } requirements: - aai_operator: ref_token - previous_task: ref_token # keep session active job1_heappe_session_keeper_stop: type: lexis.nodes.operators.heappe.HEAppESessionStopKeeperOperator requirements: - previous_task: job1_heappe_job_delete job1_heappe_session_keeper: type: lexis.nodes.operators.heappe.HEAppESessionKeeperSensor properties: heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] } heappe_session_stop_task_id: { get_attribute: [ job1_heappe_session_keeper_stop, task_id ] } requirements: - heappe_session_task_id: job1_heappe_session # HEAppE Job job1_heappe_prepare_job: type: lexis.nodes.operators.heappe.HEAppEPrepareJobOperator properties: Name: Job Name n.1 Project: { get_input: computation_project } ClusterId: { get_input: job1_cluster_id } CommandTemplateId: { get_input: job1_command_template_id } FileTransferMethodId: { get_input: job1_filetransfer_method_id } EnvironmentVariables: {} MaxCores: { get_input: job1_max_cores } Priority: 4 ClusterNodeTypeId: { get_input: job1_cluster_node_type_id } TemplateParameterValues: { get_input: job1_parameters } heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] } requirements: - heappe_session_task_id: job1_heappe_session # enable file transfer from DDI job1_heappe_enable_filetransfer: type: lexis.nodes.operators.heappe.HEAppEEnableFileTransferOperator properties: heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] } heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] } requirements: - heappe_session_task_id: job1_heappe_session - heappe_prepare_job_task_id: job1_heappe_prepare_job # transfer data job1_ddi_data_input_transfer: type: lexis.nodes.operators.ddi.LexisStagingOperator properties: source_system: it4i_iRODS source_path: { get_input: job1_input_dataset_path } target_system: karolina_home target_path: "/" heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] } heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] } requirements: - heappe_prepare_job_task_id: job1_heappe_prepare_job - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer - previous_task: job1_heappe_enable_filetransfer # wait for the DDI operation job1_ddi_data_input_wait: type: lexis.nodes.operators.ddi.LexisWaitStagingRequestSensor properties: staging_operator_task_id: { get_attribute: [ job1_ddi_data_input_transfer, task_id ] } requirements: - staging_operator_task_id: job1_ddi_data_input_transfer # submit HEAppe job job1_heappe_submit_job: type: lexis.nodes.operators.heappe.HEAppESubmitJobOperator properties: heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] } heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] } requirements: - heappe_session_task_id: job1_heappe_session - heappe_prepare_job_task_id: job1_heappe_prepare_job - previous_task: job1_ddi_data_input_wait job1_heappe_job_wait: type: lexis.nodes.operators.heappe.HEAppEWaitSubmittedJobSensor properties: heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] } heappe_submitted_job_task_id: { get_attribute: [ job1_heappe_submit_job, task_id ] } requirements: - heappe_session_task_id: job1_heappe_session - heappe_submitted_job_task_id: job1_heappe_submit_job # transfer job output job1_job_output_to_ddi: type: lexis.nodes.operators.ddi.LexisStagingOperator properties: target_system: it4i_iRODS target_path: { get_input: job1_output_dataset_path } source_system: karolina_home source_path: "/" heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] } heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] } requirements: - heappe_prepare_job_task_id: job1_heappe_prepare_job - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer - previous_task: job1_heappe_job_wait # wait for the DDI operation job1_ddi_data_output_wait: type: lexis.nodes.operators.ddi.LexisWaitStagingRequestSensor properties: staging_operator_task_id: { get_attribute: [ job1_job_output_to_ddi, task_id ] } requirements: - staging_operator_task_id: job1_job_output_to_ddi job1_heappe_end_filetransfer: type: lexis.nodes.operators.heappe.HEAppEEndFileTransferOperator properties: heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] } heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] } heappe_enable_file_transfer_task_id: {get_attribute: [ job1_heappe_enable_filetransfer, task_id ] } requirements: - heappe_session_task_id: job1_heappe_session - heappe_prepare_job_task_id: job1_heappe_prepare_job - heappe_enable_file_transfer_task_id: job1_heappe_enable_filetransfer - previous_task: job1_ddi_data_output_wait # end HEAppE job job1_heappe_job_delete: type: lexis.nodes.operators.heappe.HEAppEDeleteJobOperator properties: heappe_session_task_id: { get_attribute: [ job1_heappe_session, task_id ] } heappe_prepare_job_task_id: { get_attribute: [ job1_heappe_prepare_job, task_id ] } requirements: - heappe_session_task_id: job1_heappe_session - heappe_prepare_job_task_id: job1_heappe_prepare_job - previous_task: job1_heappe_end_filetransfer .. _SOM_DAG_Example: SOM DAG and execution ------------------------ A short example of executing SOM algorithm (Self-Organizing Map) on the LEXIS portal is provided in this section. At first, the SOM DAG created in python is shown below. .. code-block:: python :caption: SOM DAG Python. ref_token = LexisAAIOperator( task_id="refresh_token_task", access_token="{{ params.access_token }}" ) ref_token_keeper = LexisAAITokenKeeperSensor( task_id="ref_token_keeper", ) ref_token_stop = LexisAAIStopKeeperOperator() # Get Heappe session job1_heappe_session = HEAppESessionOperator(task_id="job1_heappe_session", heappe_uri="{{ params.job1_heappe_uri }}") job1_heappe_session_keeper = HEAppESessionKeeperSensor( task_id="job1_heappe_session_keeper", heappe_session_task_id="job1_heappe_session", heappe_session_stop_task_id="job1_heappe_session_keeper_stop" ) job1_heappe_session_keeper_stop = HEAppESessionStopKeeperOperator( task_id="job1_heappe_session_keeper_stop", ) job1_heappe_prepare_job = HEAppEPrepareJobOperator( task_id="job1_heappe_prepare_job", heappe_session_task_id="job1_heappe_session", Name="{{ params.name }}", Project="{{ params.computation_project }}", ClusterId="{{ params.job1_cluster_id }}", CommandTemplateId="{{ params.job1_command_template_id }}", ClusterNodeTypeId="{{ params.job1_cluster_node_type_id }}", FileTransferMethodId="{{ params.job1_filetransfer_method_id }}", TemplateParameterValues="{{ params.job1_parameters }}", MaxCores="{{ params.job1_max_cores }}" ) job1_heappe_enable_filetransfer = HEAppEEnableFileTransferOperator( task_id="job1_heappe_enable_filetransfer", heappe_prepare_job_task_id="job1_heappe_prepare_job", heappe_session_task_id="job1_heappe_session", ) job1_ddi_data_input_transfer = LexisStagingOperator( task_id="job1_ddi_data_input_transfer", source_system="it4i_iRODS", source_path="{{ params.job1_input_dataset_path }}", target_system="karolina_home", target_path="/", heappe_prepare_job_task_id="job1_heappe_prepare_job", heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer" ) job1_ddi_data_input_wait = LexisWaitStagingRequestSensor( task_id="job1_ddi_data_input_transfer_wait", staging_operator_task_id="job1_ddi_data_input_transfer" ) job1_heappe_submit_job = HEAppESubmitJobOperator( task_id="job1_heappe_submit_job", heappe_prepare_job_task_id="job1_heappe_prepare_job", heappe_session_task_id="job1_heappe_session", ) job1_heappe_job_wait = HEAppEWaitSubmittedJobSensor( task_id="job1_heappe_job_waiting", heappe_submitted_job_task_id="job1_heappe_submit_job", heappe_session_task_id="job1_heappe_session", ) job1_job_output_to_ddi = LexisStagingOperator( task_id="job1_to_ddi", target_system="it4i_iRODS", target_path="{{ params.job1_output_dataset_path }}", source_system="karolina_home", source_path="/", metadata="{{ params.job1_output_ddi_metadata }}", heappe_prepare_job_task_id="job1_heappe_prepare_job", heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer" ) job1_ddi_data_output_wait = LexisWaitStagingRequestSensor( task_id="job1_ddi_data_output_transfer_wait", staging_operator_task_id="job1_to_ddi" ) job1_heappe_end_filetransfer = HEAppEEndFileTransferOperator( task_id="job1_heappe_end_filetransfer", heappe_prepare_job_task_id="job1_heappe_prepare_job", heappe_enable_file_transfer_task_id="job1_heappe_enable_filetransfer", heappe_session_task_id="job1_heappe_session" ) job1_heappe_job_delete = HEAppEDeleteJobOperator( task_id="job1_heappe_job_delete", heappe_prepare_job_task_id="job1_heappe_prepare_job", heappe_session_task_id="job1_heappe_session", ) ref_token >> job1_heappe_session >> job1_heappe_prepare_job\ >> job1_heappe_enable_filetransfer \ >> job1_ddi_data_input_transfer >> job1_ddi_data_input_wait >> job1_heappe_submit_job >> job1_heappe_job_wait \ >> job1_job_output_to_ddi >> job1_ddi_data_output_wait >> job1_heappe_end_filetransfer\ >> job1_heappe_job_delete >> job1_heappe_session_keeper_stop >> ref_token_stop ref_token >> job1_heappe_session >> job1_heappe_session_keeper >> ref_token_stop ref_token >> ref_token_keeper Further, in the figure below, the SOM DAG details from the LEXIS portal are shown. User can use the LEXIS portal to create a workflow execution. To do so, user has to click on **CREATE WORKFLOW EXECUTION** button below the DAG details. .. figure:: Images/som01.png :align: center SOM workflow in LEXIS portal: DAG details. Then, as seen in the figure below, job parameters has to be filled. After everything is set, user has to click on **CREATE** in the upper left corner. .. figure:: Images/som02.png :align: center SOM workflow in LEXIS portal: set parameters. After the workflow execution is created, the workflow runtime can be observed as shown in the figure below. Each rectangle is colored by appropriate color during the runtime. All posible colors, which may occurs during the runtime, are shown in the bottom of workflow map. .. figure:: Images/som03.png :align: center SOM workflow in LEXIS portal: workflow map. If the workflow execution was successful, user can check the resulting data on the LEXIS portal. In this case, the data can be visualized using the aweSOM Shiny App developed within IT4Innovations, as can be seen below. .. figure:: Images/som04.png :align: center SOM Shiny App: Load data. The user can manipulate with interactive fields, buttons, and sliders, to adjust visualizations of the data. .. figure:: Images/som05.png :align: center SOM Shiny App: data visualization.