Source code for tool.VRE_Tool

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright 2020-2022 Barcelona Supercomputing Center (BSC), Spain
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import subprocess
import time
from glob import glob

from basic_modules.tool import Tool
from utils import logger


[docs]class myTool(Tool): """ This class define <myTool> Tool. """ DEFAULT_KEYS = ['execution', 'project', 'description'] """config.json default keys""" PYTHON_SCRIPT_PATH = "/example/hello.py" """<myApplication>"""
[docs] def __init__(self, configuration=None): """ Init function. :param configuration: A dictionary containing parameters that define how the operation should be carried out, which are specific to <myTool> tool. :type configuration: dict """ Tool.__init__(self) if configuration is None: configuration = {} self.configuration.update(configuration) for k, v in self.configuration.items(): if isinstance(v, list): self.configuration[k] = ' '.join(v) # Init variables self.current_dir = os.path.abspath(os.path.dirname(__file__)) self.parent_dir = os.path.abspath(self.current_dir + "/../") self.execution_path = self.configuration.get('execution', '.') if not os.path.isabs(self.execution_path): self.execution_path = os.path.normpath(os.path.join(self.parent_dir, self.execution_path)) self.arguments = dict( [(key, value) for key, value in self.configuration.items() if key not in self.DEFAULT_KEYS] )
[docs] def run(self, input_files, input_metadata, output_files, output_metadata): """ The main function to run the <myTool> tool. :param input_files: Dictionary of input files locations. :type input_files: dict :param input_metadata: Dictionary of input files metadata. :type input_metadata: dict :param output_files: Dictionary of output files locations expected to be generated. :type output_files: dict :param output_metadata: List of output files metadata expected to be generated. :type output_metadata: list :return: Generated output files and their metadata. :rtype: dict, dict """ try: # Set and validate execution directory. If not exists the directory will be created os.makedirs(self.execution_path, exist_ok=True) # Set and validate execution parent directory. If not exists the directory will be created execution_parent_dir = os.path.dirname(self.execution_path) os.makedirs(execution_parent_dir, exist_ok=True) # Update working directory to execution path os.chdir(self.execution_path) # Tool Execution self.toolExecution(input_files) # Create and validate the output file from tool execution output_id = output_metadata[0]['name'] output_type = output_metadata[0]['file']['file_type'].lower() output_file_path = glob(self.execution_path + "/*." + output_type)[0] if os.path.isfile(output_file_path): output_files[output_id] = [(output_file_path, "file")] return output_files, output_metadata # TODO: add more output files to save, if it is necessary for you # or create a method to manage more than one output file else: errstr = "Output file {} not created. See logs.".format(output_file_path) logger.fatal(errstr) raise Exception(errstr) except: errstr = "<myTool> tool execution failed. See logs." logger.fatal(errstr) raise Exception(errstr)
[docs] def toolExecution(self, input_files): """ The main function to run the <myTool> tool. :param input_files: Dictionary of input files locations. :type input_files: dict """ rc = None try: # Get input files input_file_1 = input_files.get('hello_file') if not os.path.isabs(input_file_1): input_file_1 = os.path.normpath(os.path.join(self.parent_dir, input_file_1)) # TODO: add more input files to use, if it is necessary for you # Get arguments argument_1 = self.arguments.get('username') if argument_1 is None: errstr = "argument_1 must be defined." logger.fatal(errstr) raise Exception(errstr) # TODO: add more arguments to use, if it is necessary for you # <myApplication> execution if os.path.isfile(input_file_1): # TODO: change cmd command line to run <myApplication> cmd = [ 'python', self.parent_dir + self.PYTHON_SCRIPT_PATH, # hello.py input_file_1, # hello.txt argument_1, # username ] process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Sending the stdout to the log file for line in iter(process.stderr.readline, b''): print(line.rstrip().decode("utf-8").replace("", " ")) rc = process.poll() while rc is None: rc = process.poll() time.sleep(0.1) if rc is not None and rc != 0: logger.progress("Something went wrong inside the <myApplication> execution. See logs", status="WARNING") else: logger.progress("<myApplication> execution finished successfully", status="FINISHED") else: errstr = "input_file_1 must be defined." logger.fatal(errstr) raise Exception(errstr) except: errstr = "<myApplication> execution failed. See logs." logger.error(errstr) if rc is not None: logger.error("RETVAL: {}".format(rc)) raise Exception(errstr)