diff --git a/framework_tvb/tvb/tests/framework/adapters/testadapter3.py b/framework_tvb/tvb/tests/framework/adapters/testadapter3.py index 97fe09e89bdc7c45413bc926ea8f6dc00a00da9c..0b44c0b010f0a72df30851ce2d14846be4a51539 100644 --- a/framework_tvb/tvb/tests/framework/adapters/testadapter3.py +++ b/framework_tvb/tvb/tests/framework/adapters/testadapter3.py @@ -88,6 +88,9 @@ class TestAdapter3(abcadapter.ABCAsynchronous): def get_view_model(): return TestModel + def load_view_model(self, operation): + return TestModel + def get_form_class(self): return TestAdapter3Form diff --git a/framework_tvb/tvb/tests/framework/conftest.py b/framework_tvb/tvb/tests/framework/conftest.py index f0f39a92fd711116b9e45c9c7cd4e0f818ae5021..23c3cdc061d685538d6ddeb8009eaac3e1e17acb 100644 --- a/framework_tvb/tvb/tests/framework/conftest.py +++ b/framework_tvb/tvb/tests/framework/conftest.py @@ -502,36 +502,24 @@ def datatype_measure_factory(): def datatype_group_factory(time_series_index_factory, datatype_measure_factory, project_factory, user_factory, operation_factory): def build(subject="Datatype Factory User", state="RAW_DATA", project=None): - + # TODO This is not real, we miss a ViewModel stored # there store the name and the (hi, lo, step) value of the range parameters range_1 = ["row1", [1, 2, 10]] range_2 = ["row2", [0.1, 0.3, 0.5]] - # there are the actual numbers in the interval range_values_1 = [1, 3, 5, 7, 9] range_values_2 = [0.1, 0.4] user = user_factory() - if project is None: project = project_factory(user) - # Create an algorithm - alg_category = AlgorithmCategory('one', True) - dao.store_entity(alg_category) - ad = Algorithm(IntrospectionRegistry.SIMULATOR_MODULE, IntrospectionRegistry.SIMULATOR_CLASS, - alg_category.id) algorithm = dao.get_algorithm_by_module(IntrospectionRegistry.SIMULATOR_MODULE, IntrospectionRegistry.SIMULATOR_CLASS) - if algorithm is None: - algorithm = dao.store_entity(ad) - - # Create meta + # Create operation meta = {DataTypeMetaData.KEY_SUBJECT: "Datatype Factory User", DataTypeMetaData.KEY_STATE: "RAW_DATA"} - - # Create operation operation = operation_factory(algorithm=algorithm, test_user=user, test_project=project, meta=meta) group = OperationGroup(project.id, ranges=[json.dumps(range_1), json.dumps(range_2)]) @@ -542,10 +530,11 @@ def datatype_group_factory(time_series_index_factory, datatype_measure_factory, datatype_group = DataTypeGroup(group, subject=subject, state=state, operation_id=operation.id) datatype_group.no_of_ranges = 2 datatype_group.count_results = 10 - datatype_group = dao.store_entity(datatype_group) dt_group_ms = DataTypeGroup(group_ms, subject=subject, state=state, operation_id=operation.id) + datatype_group.no_of_ranges = 2 + datatype_group.count_results = 10 dao.store_entity(dt_group_ms) # Now create some data types and add them to group diff --git a/framework_tvb/tvb/tests/framework/core/services/burst_service_test.py b/framework_tvb/tvb/tests/framework/core/services/burst_service_test.py index c2a80cd117defb4d5792870d129d8f50bb30ca19..cc0ec79c0abc1bcd125178e12e13f493da4095a4 100644 --- a/framework_tvb/tvb/tests/framework/core/services/burst_service_test.py +++ b/framework_tvb/tvb/tests/framework/core/services/burst_service_test.py @@ -31,31 +31,16 @@ """ .. moduleauthor:: bogdan.neacsa <bogdan.neacsa@codemart.ro> """ -import uuid -from copy import deepcopy -from time import sleep -import numpy -from tvb.adapters.datatypes.db.mapped_value import DatatypeMeasureIndex -from tvb.adapters.datatypes.db.simulation_history import SimulationHistoryIndex +from uuid import UUID +from tvb.core.entities.file.simulator.view_model import SimulatorAdapterModel +from tvb.core.services.burst_service import BurstService from tvb.config.init.introspector_registry import IntrospectionRegistry -from tvb.core.adapters.abcadapter import ABCAdapter -from tvb.core.entities.file.files_helper import FilesHelper -from tvb.core.entities.file.simulator.datatype_measure_h5 import DatatypeMeasureH5 from tvb.core.entities.model.model_burst import * -from tvb.core.entities.model.model_datatype import * -from tvb.core.entities.model.model_operation import STATUS_STARTED, OperationPossibleStatus, STATUS_FINISHED from tvb.core.entities.storage import dao -from tvb.core.entities.transient.structure_entities import DataTypeMetaData -from tvb.core.neocom import h5 +from tvb.core.entities.file.files_helper import FilesHelper from tvb.core.services.algorithm_service import AlgorithmService -from tvb.core.services.burst_service import BurstService, STATUS_FOR_OPERATION -from tvb.core.services.operation_service import OperationService from tvb.core.services.project_service import ProjectService -from tvb.datatypes.connectivity import Connectivity -from tvb.datatypes.time_series import TimeSeriesRegion -# from tvb.tests.framework.adapters.simulator.simulator_adapter_test import SIMULATOR_PARAMETERS -from tvb.tests.framework.adapters.storeadapter import StoreAdapter from tvb.tests.framework.core.base_testcase import BaseTestCase from tvb.tests.framework.core.factory import TestFactory from tvb.tests.framework.datatypes.datatype1 import Datatype1 @@ -68,16 +53,9 @@ class TestBurstService(BaseTestCase): we launch operations in different threads and the transactional operator only rolls back sessions bounded to the current thread transaction. """ - PORTLET_ID = "TA1TA2" - # This should not be present in portlets.xml - INVALID_PORTLET_ID = "this_is_not_a_non_existent_test_portlet_ID" - burst_service = BurstService() - algorithm_service = AlgorithmService() - operation_service = OperationService() - sim_algorithm = algorithm_service.get_algorithm_by_module_and_class(IntrospectionRegistry.SIMULATOR_MODULE, - IntrospectionRegistry.SIMULATOR_CLASS) - # local_simulation_params = deepcopy(SIMULATOR_PARAMETERS) + sim_algorithm = AlgorithmService().get_algorithm_by_module_and_class(IntrospectionRegistry.SIMULATOR_MODULE, + IntrospectionRegistry.SIMULATOR_CLASS) def setup_method(self): """ @@ -148,14 +126,14 @@ class TestBurstService(BaseTestCase): self.burst_service.get_available_bursts(self.test_project.id)] returned_second_project_bursts = [burst.id for burst in self.burst_service.get_available_bursts(second_project.id)] - assert len(test_project_bursts) == len(returned_test_project_bursts),\ - "Incorrect bursts retrieved for project %s." % self.test_project - assert len(second_project_bursts) == len(returned_second_project_bursts),\ - "Incorrect bursts retrieved for project %s." % second_project - assert set(second_project_bursts) == set(returned_second_project_bursts),\ - "Incorrect bursts retrieved for project %s." % second_project - assert set(test_project_bursts) == set(returned_test_project_bursts),\ - "Incorrect bursts retrieved for project %s." % self.test_project + assert len(test_project_bursts) == len(returned_test_project_bursts), \ + "Incorrect bursts retrieved for project %s." % self.test_project + assert len(second_project_bursts) == len(returned_second_project_bursts), \ + "Incorrect bursts retrieved for project %s." % second_project + assert set(second_project_bursts) == set(returned_second_project_bursts), \ + "Incorrect bursts retrieved for project %s." % second_project + assert set(test_project_bursts) == set(returned_test_project_bursts), \ + "Incorrect bursts retrieved for project %s." % self.test_project def test_rename_burst(self, operation_factory): """ @@ -167,303 +145,65 @@ class TestBurstService(BaseTestCase): loaded_burst = dao.get_burst_by_id(burst_config.id) assert loaded_burst.name == "new_burst_name", "Burst was not renamed properly." - def test_branch_burst(self): - """ - Test the branching of an existing burst. - """ - burst_config = self._prepare_and_launch_async_burst(wait_to_finish=60) - burst_config.prepare_after_load() - - launch_params = self._prepare_simulation_params(4) - burst_config.update_simulator_configuration(launch_params) - - burst_id, _ = self.burst_service.launch_burst(burst_config, 0, self.sim_algorithm.id, - self.test_user.id, "branch") - burst_config = dao.get_burst_by_id(burst_id) - self._wait_for_burst(burst_config) - - ts_regions = self.count_all_entities(TimeSeriesRegion) - sim_states = self.count_all_entities(SimulationHistoryIndex) - assert 2 == ts_regions, "An operation group should have been created for each step." - assert 2 == sim_states, "An dataType group should have been created for each step." - def test_burst_delete_with_project(self): """ Test that on removal of a project all burst related data is cleared. """ - self._prepare_and_launch_sync_burst() + TestFactory.store_burst(self.test_project.id) ProjectService().remove_project(self.test_project.id) self._check_burst_removed() - def test_sync_burst_launch(self): - """ - A full test for launching a burst. - First create the workflow steps and launch the burst. - Then check that only operation created is for the first adapter from the portlet. The - second should be viewed as a visualizer. - After that load the burst and check that the visualizer and analyzer are loaded in the - corresponding tab and that all the parameters are still the same. Finally check that burst - status updates corresponding to final operation status. - """ - loaded_burst, workflow_step_list = self._prepare_and_launch_sync_burst() - finished, started, error, _, _ = dao.get_operation_numbers(self.test_project.id) - assert finished == 1, "One operations should have been generated for this burst." - assert started == 0, "No operations should remain started since workflow was launched synchronous." - assert error == 0, "No operations should return error status." - assert loaded_burst.tabs[0].portlets[0] is not None, "Portlet not loaded from config!" - portlet_config = loaded_burst.tabs[0].portlets[0] - analyzers = portlet_config.analyzers - assert len(analyzers) == 0, "Only have 'simulator' and a visualizer. No analyzers should be loaded." - visualizer = portlet_config.visualizer - assert visualizer is not None, "Visualizer should not be none." - assert visualizer.fk_algorithm == workflow_step_list[0].fk_algorithm,\ - "Different ids after burst load for visualizer." - assert visualizer.static_param == workflow_step_list[0].static_param,\ - "Different static params after burst load for visualizer." - assert visualizer.dynamic_param == workflow_step_list[0].dynamic_param,\ - "Different static params after burst load for visualizer." - - def test_launch_burst(self): - """ - Test the launch burst method from burst service. - """ - first_step_algo = self.algorithm_service.get_algorithm_by_module_and_class( - 'tvb.tests.framework.adapters.testadapter1', 'TestAdapter1') - adapter_interface = self.algorithm_service.prepare_adapter(self.test_project.id, first_step_algo) - ui_submited_simulator_iface_replica = {} - kwargs_replica = {} - for entry in adapter_interface: - ui_submited_simulator_iface_replica[entry[ABCAdapter.KEY_NAME]] = {KEY_PARAMETER_CHECKED: True, - KEY_SAVED_VALUE: entry[ - ABCAdapter.KEY_DEFAULT]} - kwargs_replica[entry[ABCAdapter.KEY_NAME]] = entry[ABCAdapter.KEY_DEFAULT] - burst_config = self.burst_service.new_burst_configuration(self.test_project.id) - burst_config.simulator_configuration = ui_submited_simulator_iface_replica - test_portlet = dao.get_portlet_by_identifier(self.PORTLET_ID) - tab_config = {test_portlet.id: [(0, 0), (0, 1), (1, 0)]} - self._add_portlets_to_burst(burst_config, tab_config) - burst_config.update_simulator_configuration(kwargs_replica) - burst_id, _ = self.burst_service.launch_burst(burst_config, 0, first_step_algo.id, self.test_user.id) - burst_config = dao.get_burst_by_id(burst_id) - assert burst_config.status in (BurstConfiguration.BURST_FINISHED, BurstConfiguration.BURST_RUNNING),\ - "Burst not launched successfully!" - # Wait maximum x seconds for burst to finish - self._wait_for_burst(burst_config) - - def test_load_group_burst(self): - """ - Launch a group adapter and load it afterwards and check that a group_id is properly loaded. - """ - launch_params = self._prepare_simulation_params(1, True, 3) - - burst_config = self.burst_service.new_burst_configuration(self.test_project.id) - burst_config.update_simulator_configuration(launch_params) - burst_id, _ = self.burst_service.launch_burst(burst_config, 0, self.sim_algorithm.id, self.test_user.id) - burst_config = dao.get_burst_by_id(burst_id) - # Wait maximum x seconds for burst to finish - self._wait_for_burst(burst_config) - - launched_workflows = dao.get_workflows_for_burst(burst_id, is_count=True) - assert 3 == launched_workflows, "3 workflows should have been launched due to group parameter." - - group_id = self.burst_service.load_burst(burst_id)[1] - assert group_id >= 0, "Should be part of group." - datatype_measures = self.count_all_entities(DatatypeMeasureIndex) - assert 3 == datatype_measures - - def test_launch_burst_invalid_simulator_parameters(self): + def test_load_burst_configuration(self): """ - Test that burst is marked as error if invalid data is passed to the first step. + Test that loads the burst configuration based non the stored config id """ - algo_id = self.algorithm_service.get_algorithm_by_module_and_class('tvb.tests.framework.adapters.testadapter1', - 'TestAdapter1').id - #Passing invalid kwargs to the 'simulator' component - burst_config = self.burst_service.new_burst_configuration(self.test_project.id) - kwargs_replica = {'test1_val1_invalid': '0', 'test1_val2': '0'} - burst_config.update_simulator_configuration(kwargs_replica) - burst_id, _ = self.burst_service.launch_burst(burst_config, 0, algo_id, self.test_user.id) - burst_config = dao.get_burst_by_id(burst_id) - #Wait maximum x seconds for burst to finish - self._wait_for_burst(burst_config, error_expected=True) + stored_burst = TestFactory.store_burst(self.test_project.id) + burst_config = self.burst_service.load_burst_configuration(stored_burst.id) + assert burst_config.id == stored_burst.id, "The loaded burst does not have the same ID" - def test_launch_burst_invalid_simulator_data(self): + def test_update_simulation_fields(self, tmph5factory): """ - Test that burst is marked as error if invalid data is passed to the first step. + Test that updates the simulation fields of the burst """ - algo_id = self.algorithm_service.get_algorithm_by_module_and_class('tvb.tests.framework.adapters.testadapter1', - 'TestAdapter1').id - #Adapter tries to do an int(test1_val1) so this should fail - burst_config = self.burst_service.new_burst_configuration(self.test_project.id) - kwargs_replica = {'test1_val1': 'asa', 'test1_val2': '0'} - burst_config.update_simulator_configuration(kwargs_replica) - burst_id, _ = self.burst_service.launch_burst(burst_config, 0, algo_id, self.test_user.id) - burst_config = dao.get_burst_by_id(burst_id) - #Wait maximum x seconds for burst to finish - self._wait_for_burst(burst_config, error_expected=True) + stored_burst = TestFactory.store_burst(self.test_project.id) - def test_launch_group_burst_happy_flow(self): - """ - Happy flow of launching a burst with a range parameter. Expect to get both and operation - group and a DataType group for the results of the simulations and for the metric steps. - """ - burst_config = self._prepare_and_launch_async_burst(length=1, is_range=True, nr_ops=4, wait_to_finish=120) - if burst_config.status != BurstConfiguration.BURST_FINISHED: - self.burst_service.stop_burst(burst_config) - raise AssertionError("Burst should have finished successfully.") + connectivity = TestFactory.import_zip_connectivity(self.test_user, self.test_project) + op = TestFactory.create_operation(test_user=self.test_user, test_project=self.test_project) + simulation = SimulatorAdapterModel() + simulation.connectivity = UUID(connectivity.gid) - op_groups = self.count_all_entities(OperationGroup) - dt_groups = self.get_all_entities(DataTypeGroup) - assert 2 == op_groups, "An operation group should have been created for each step." - assert len(dt_groups) == 2, "An dataType group should have been created for each step." - for datatype in dt_groups: - assert 4 == datatype.count_results, "Should have 4 datatypes in group" + burst_config = self.burst_service.update_simulation_fields(stored_burst.id, op.id, simulation.gid) + assert burst_config.id == stored_burst.id, "The loaded burst does not have the same ID" + assert burst_config.fk_simulation == op.id, "The loaded burst does not have the fk simulation that it was given" + assert burst_config.simulator_gid == simulation.gid.hex, "The loaded burst does not have the simulation gid that it was given" - def test_launch_group_burst_no_metric(self): + def test_prepare_name(self): """ - Test the launch burst method from burst service. Try to launch a burst with test adapter which has - no metrics associated. This should fail. + Test prepare burst name """ - burst_config = self.burst_service.new_burst_configuration(self.test_project.id) - - algo_id = self.algorithm_service.get_algorithm_by_module_and_class('tvb.tests.framework.adapters.testadapter1', - 'TestAdapter1').id - kwargs_replica = {'test1_val1': '[0, 1, 2]', 'test1_val2': '0', RANGE_PARAMETER_1: 'test1_val1'} - test_portlet = dao.get_portlet_by_identifier(self.PORTLET_ID) - tab_config = {test_portlet.id: [(0, 0), (0, 1), (1, 0)]} - self._add_portlets_to_burst(burst_config, tab_config) - burst_config.update_simulator_configuration(kwargs_replica) - burst_id, _ = self.burst_service.launch_burst(burst_config, 0, algo_id, self.test_user.id) - burst_config = dao.get_burst_by_id(burst_id) - # Wait maximum x seconds for burst to finish - self._wait_for_burst(burst_config, error_expected=True) + stored_burst = TestFactory.store_burst(self.test_project.id) + simulation_tuple = self.burst_service.prepare_name(stored_burst, self.test_project.id) + assert simulation_tuple[0] == 'simulation_' + str(dao.get_number_of_bursts(self.test_project.id) + 1), \ + "The default simulation name is not the defined one" - launched_workflows = dao.get_workflows_for_burst(burst_id, is_count=True) - assert 3 == launched_workflows, "3 workflows should have been launched due to group parameter." + busrt_test_name = "Burst Test Name" + stored_burst.name = busrt_test_name + stored_burst = dao.store_entity(stored_burst) + simulation_tuple = self.burst_service.prepare_name(stored_burst, self.test_project.id) + assert simulation_tuple[0] == busrt_test_name, "The burst name is not the given one" - op_groups = self.count_all_entities(OperationGroup) - dt_groups = self.count_all_entities(DataTypeGroup) - assert 5 == op_groups, "An operation group should have been created for each step." - assert 5 == dt_groups, "An dataType group should have been created for each step." - - def test_load_tab_configuration(self): + def test_prepare_burst_for_pse(self): """ - Create a burst with some predefined portlets in some known positions. Check that the - load_tab_configuration method does what it is expected, and we get the portlets in the - corresponding tab positions. + Test prepare burst for pse """ - burst_config = self.burst_service.new_burst_configuration(self.test_project.id) - SIMULATOR_MODULE = 'tvb.tests.framework.adapters.testadapter1' - SIMULATOR_CLASS = 'TestAdapter1' - algo_id = self.algorithm_service.get_algorithm_by_module_and_class(SIMULATOR_MODULE, SIMULATOR_CLASS).id - kwargs_replica = {'test1_val1': '0', 'test1_val2': '0'} - test_portlet = dao.get_portlet_by_identifier(self.PORTLET_ID) - # Add test_portlet to positions (0,0), (0,1) and (1,0) - tab_config = {test_portlet.id: [(0, 0), (0, 1), (1, 0)]} - burst_config.update_simulator_configuration(kwargs_replica) - burst_id, _ = self.burst_service.launch_burst(burst_config, 0, algo_id, self.test_user.id) - burst_config = dao.get_burst_by_id(burst_id) - burst_config = self._wait_for_burst(burst_config) - burst_wf = dao.get_workflows_for_burst(burst_config.id)[0] - wf_step = dao.get_workflow_steps(burst_wf.id)[0] - burst_config.prepare_after_load() - for tab in burst_config.tabs: - for portlet in tab.portlets: - assert portlet is None, "Before loading the tab configuration all portlets should be none." - burst_config = self.burst_service.load_tab_configuration(burst_config, wf_step.fk_operation) - for tab_idx, tab in enumerate(burst_config.tabs): - for portlet_idx, portlet in enumerate(tab.portlets): - if (tab_idx == 0 and portlet_idx in [0, 1]) or (tab_idx == 1 and portlet_idx == 0): - assert portlet is not None, "portlet gonfiguration not set" - assert test_portlet.id == portlet.portlet_id, "Unexpected portlet entity loaded." - else: - assert portlet is None, "Before loading the tab configuration all portlets should be none" + burst = BurstConfiguration(self.test_project.id) + assert burst.fk_metric_operation_group == None, "The fk for the metric operation group is not None" + assert burst.fk_operation_group == None, "The fk for the operation group is not None" + assert burst.operation_group == None, "The operation group is not None" - def _wait_for_burst(self, burst_config, error_expected=False, timeout=500): - """ - Method that just waits until a burst configuration is finished or a maximum timeout is reached. - - :param burst_config: the burst configuration that should be waited on - :param timeout: the maximum number of seconds to wait after the burst - """ - waited = 0 - while burst_config.status == BurstConfiguration.BURST_RUNNING and waited <= timeout: - sleep(0.5) - waited += 0.5 - burst_config = dao.get_burst_by_id(burst_config.id) - - if waited > timeout: - self.burst_service.stop_burst(burst_config) - raise AssertionError("Timed out waiting for simulations to finish. We will cancel it") - - if error_expected and burst_config.status != BurstConfiguration.BURST_ERROR: - self.burst_service.stop_burst(burst_config) - raise AssertionError("Burst should have failed due to invalid input data.") - - if (not error_expected) and burst_config.status != BurstConfiguration.BURST_FINISHED: - msg = "Burst status should have been FINISH. Instead got %s %s" % (burst_config.status, - burst_config.error_message) - self.burst_service.stop_burst(burst_config) - raise AssertionError(msg) - - return burst_config - - def _prepare_and_launch_async_burst(self, length=4, is_range=False, nr_ops=0, wait_to_finish=0): - """ - Launch an asynchronous burst with a simulation having all the default parameters, only the length received as - a parameters. This is launched with actual simulator and not with a dummy test adapter as replacement. - :param length: the length of the simulation in milliseconds. This is also used in case we need - a group burst, in which case we will have `nr_ops` simulations with lengths starting from - `length` to `length + nr_ops` milliseconds - :param is_range: a boolean which switches between a group burst and a non group burst. - !! even if `is_range` is `True` you still need a non-zero positive `nr_ops` to have an actual group burst - :param nr_ops: the number of operations in the group burst - """ - launch_params = self._prepare_simulation_params(length, is_range, nr_ops) - - burst_config = self.burst_service.new_burst_configuration(self.test_project.id) - burst_config.update_simulator_configuration(launch_params) - burst_id = self.burst_service.launch_burst(burst_config, 0, self.sim_algorithm.id, self.test_user.id)[0] - burst_config = dao.get_burst_by_id(burst_id) - - __timeout = 15 - __waited = 0 - # Wait a maximum of 15 seconds for the burst launch to be performed - while dao.get_workflows_for_burst(burst_config.id, is_count=True) == 0 and __waited < __timeout: - sleep(0.5) - __waited += 0.5 - - if wait_to_finish: - burst_config = self._wait_for_burst(burst_config, timeout=wait_to_finish) - return burst_config - - def _prepare_and_launch_sync_burst(self): - """ - Private method to launch a dummy burst. Return the burst loaded after the launch finished - as well as the workflow steps that initially formed the burst. - NOTE: the burst launched by this method is a `dummy` one, meaning we do not use an actual - simulation, but instead test adapters. - """ - burst_config = TestFactory.store_burst(self.test_project.id) - - workflow_step_list = [] - - stored_dt = datatypes_factory.DatatypesFactory()._store_datatype(Datatype1()) - first_step_algorithm = self.algorithm_service.get_algorithm_by_module_and_class( - "tvb.tests.framework.adapters.testadapter1", "TestAdapterDatatypeInput") - metadata = {DataTypeMetaData.KEY_BURST: burst_config.id} - kwargs = {"test_dt_input": stored_dt.gid, 'test_non_dt_input': '0'} - operations, group = self.operation_service.prepare_operations(self.test_user.id, self.test_project, - first_step_algorithm, - first_step_algorithm.algorithm_category, - metadata, **kwargs) - - ### Now fire the workflow and also update and store the burst configuration ## - self.operation_service.launch_operation(operations[0].id, False) - loaded_burst, _ = self.burst_service.load_burst(burst_config.id) - import_operation = dao.get_operation_by_id(stored_dt.fk_from_operation) - dao.remove_entity(import_operation.__class__, import_operation.id) - dao.remove_datatype(stored_dt.gid) - return loaded_burst, workflow_step_list + pse_burst = self.burst_service.prepare_burst_for_pse(burst) + assert pse_burst.metric_operation_group != None, "The fk for the operation group is None" + assert pse_burst.operation_group != None, "The operation group is None" def _check_burst_removed(self): """ @@ -481,113 +221,3 @@ class TestBurstService(BaseTestCase): datatype2_stored = self.count_all_entities(Datatype2) assert 0 == datatype1_stored, "Specific datatype entries for DataType1 were not deleted." assert 0 == datatype2_stored, "Specific datatype entries for DataType2 were not deleted." - - def _prepare_simulation_params(self, length, is_range=False, no_ops=0): - - connectivity = self._burst_create_connectivity() - - launch_params = self.local_simulation_params - launch_params['connectivity'] = connectivity.gid - if is_range: - launch_params['simulation_length'] = str(list(range(length, length + no_ops))) - launch_params[RANGE_PARAMETER_1] = 'simulation_length' - else: - launch_params['simulation_length'] = str(length) - launch_params[RANGE_PARAMETER_1] = None - - return launch_params - - def _burst_create_connectivity(self): - """ - Create a connectivity that will be used in "non-dummy" burst launches (with the actual simulator). - """ - meta = {DataTypeMetaData.KEY_SUBJECT: "John Doe", DataTypeMetaData.KEY_STATE: "RAW_DATA"} - - self.operation = Operation(self.test_user.id, self.test_project.id, self.sim_algorithm.id, - json.dumps(''), meta=json.dumps(meta), status=STATUS_STARTED) - self.operation = dao.store_entity(self.operation) - storage_path = FilesHelper().get_project_folder(self.test_project, str(self.operation.id)) - connectivity = Connectivity(storage_path=storage_path) - connectivity.weights = numpy.ones((74, 74)) - connectivity.centres = numpy.ones((74, 3)) - adapter_instance = StoreAdapter([connectivity]) - self.operation_service.initiate_prelaunch(self.operation, adapter_instance, {}) - return connectivity - - def test_prepare_metrics_operation(self, operation_factory, pse_burst_configuration_factory): - burst = pse_burst_configuration_factory(self.test_project) - - op = operation_factory(test_user=self.test_user, test_project=self.test_project) - op.fk_operation_group = burst.fk_operation_group - op = dao.store_entity(op) - - db_ops = dao.get_operations_in_group(burst.fk_metric_operation_group) - assert len(db_ops) == 0 - metric_op_dir, metric_op = self.burst_service.prepare_metrics_operation(op) - assert metric_op.status == STATUS_FINISHED - assert metric_op.fk_operation_group == burst.fk_metric_operation_group - db_ops = dao.get_operations_in_group(burst.fk_metric_operation_group) - assert len(db_ops) == 1 - assert metric_op.fk_launched_in == op.fk_launched_in - assert metric_op.visible is False - - def test_prepare_index_for_metric_result(self, operation_factory, pse_burst_configuration_factory): - burst = pse_burst_configuration_factory(self.test_project) - - op = operation_factory(test_user=self.test_user, test_project=self.test_project) - op.fk_operation_group = burst.fk_metric_operation_group - op = dao.store_entity(op) - - op_dir = FilesHelper().get_operation_folder(op.project.name, op.id) - dm_gid = uuid.uuid4() - dm_h5_file = h5.path_for(op_dir, DatatypeMeasureH5, dm_gid) - with DatatypeMeasureH5(dm_h5_file) as dm_h5: - dm_h5.gid.store(dm_gid) - dm_h5.metrics.store({'a': 0, 'b': 1}) - dm_h5.analyzed_datatype.store(dm_gid) - - db_measures = dao.get_generic_entity(DatatypeMeasureIndex, dm_gid.hex, 'gid') - assert len(db_measures) == 0 - metric_dt_group = dao.get_datatypegroup_by_op_group_id(burst.fk_metric_operation_group) - nr_dts_in_group = dao.count_datatypes_in_group(metric_dt_group.id) - assert nr_dts_in_group == 0 - index = self.burst_service.prepare_index_for_metric_result(op, dm_h5_file, burst) - dao.store_entity(index) - db_measures = dao.get_generic_entity(DatatypeMeasureIndex, dm_gid.hex, 'gid') - assert len(db_measures) > 0 - metric_dt_group = dao.get_datatypegroup_by_op_group_id(burst.fk_metric_operation_group) - nr_dts_in_group = dao.count_datatypes_in_group(metric_dt_group.id) - assert nr_dts_in_group == 1 - - def test_update_finished_burst_status(self, operation_factory, pse_burst_configuration_factory): - for status in OperationPossibleStatus: - burst = pse_burst_configuration_factory(self.test_project) - - op = operation_factory(test_user=self.test_user, test_project=self.test_project) - op.fk_operation_group = burst.fk_operation_group - dao.store_entity(op) - - op_metric = operation_factory(test_user=self.test_user, test_project=self.test_project, - operation_status=status) - op_metric.fk_operation_group = burst.fk_metric_operation_group - dao.store_entity(op_metric) - - assert burst.status == BurstConfiguration.BURST_RUNNING - self.burst_service.update_burst_status(burst) - burst = dao.get_burst_by_id(burst.id) - assert burst.status == STATUS_FOR_OPERATION[op_metric.status] - - def test_update_finished_burst_status_simple_simulation(self, operation_factory, pse_burst_configuration_factory): - for status in OperationPossibleStatus: - op = operation_factory(test_user=self.test_user, test_project=self.test_project, operation_status=status) - dao.store_entity(op) - - burst = pse_burst_configuration_factory(self.test_project) - burst.fk_operation_group = None - burst.fk_simulation = op.id - dao.store_entity(burst) - - assert burst.status == BurstConfiguration.BURST_RUNNING - self.burst_service.update_burst_status(burst) - burst = dao.get_burst_by_id(burst.id) - assert burst.status == STATUS_FOR_OPERATION[status] diff --git a/framework_tvb/tvb/tests/framework/core/services/project_structure_test.py b/framework_tvb/tvb/tests/framework/core/services/project_structure_test.py index 7055535eb28f0db42f5631593453735e6baabdb5..00c3c63fd31de823faa94356393c9b541d1ee48d 100644 --- a/framework_tvb/tvb/tests/framework/core/services/project_structure_test.py +++ b/framework_tvb/tvb/tests/framework/core/services/project_structure_test.py @@ -32,6 +32,7 @@ .. moduleauthor:: Ionel Ortelecan <ionel.ortelecan@codemart.ro> .. moduleauthor:: Bogdan Neacsa <bogdan.neacsa@codemart.ro> """ + import os import numpy import pytest @@ -48,9 +49,9 @@ from tvb.core.entities.storage import dao from tvb.core.entities.file.files_helper import FilesHelper from tvb.core.entities.transient.structure_entities import DataTypeMetaData from tvb.core.entities.filters.factory import StaticFiltersFactory +from tvb.core.services.operation_service import OperationService from tvb.core.services.project_service import ProjectService from tvb.datatypes.graph import ConnectivityMeasure -from tvb.tests.framework.adapters.testadapter3 import TestAdapter3 from tvb.tests.framework.core.base_testcase import TransactionalTestCase from tvb.tests.framework.core.factory import TestFactory from tvb.tests.framework.core.services.algorithm_service_test import TEST_ADAPTER_VALID_MODULE, TEST_ADAPTER_VALID_CLASS @@ -324,11 +325,12 @@ class TestProjectStructure(TransactionalTestCase): view_model = BaseBCTModel() view_model.connectivity = conn.gid adapter = ABCAdapter.build_adapter_from_class(TransitivityBinaryDirected) - value_wrapper = TestFactory.launch_synchronously(self.test_user, self.test_project, adapter, view_model)[0] + result = OperationService().fire_operation(adapter, self.test_user, self.test_project.id, + view_model=view_model) conn.visible = False dao.store_entity(conn) - operation = dao.get_operation_by_id(value_wrapper.fk_from_operation) + operation = dao.get_operation_by_id(result[0].id) inputs = self.project_service.get_datatype_and_datatypegroup_inputs_for_operation(operation.gid, self.relevant_filter) @@ -339,92 +341,49 @@ class TestProjectStructure(TransactionalTestCase): assert len(inputs) == 1, "Incorrect number of inputs." assert conn.id == inputs[0].id, "Retrieved wrong input dataType." - def test_get_inputs_for_op_group(self, datatype_group_factory, test_adapter_factory): + def test_get_inputs_for_group(self, datatype_group_factory, test_adapter_factory): """ Tests method get_datatypes_inputs_for_operation_group. - The DataType inputs will be from a DataType group. """ - group = datatype_group_factory(project=self.test_project) - datatypes = dao.get_datatypes_from_datatype_group(group.id) + zip_path = os.path.join(os.path.dirname(tvb_data.__file__), 'connectivity', 'connectivity_66.zip') + conn = TestFactory.import_zip_connectivity(self.test_user, self.test_project, zip_path) + conn.visible = False + dao.store_entity(conn) + + group = OperationGroup(self.test_project.id, "group", "range1[1..2]") + group = dao.store_entity(group) - datatypes[0].visible = False - dao.store_entity(datatypes[0]) - datatypes[1].visible = False - dao.store_entity(datatypes[1]) + view_model = BaseBCTModel() + view_model.connectivity = conn.gid + adapter = ABCAdapter.build_adapter_from_class(TransitivityBinaryDirected) + algorithm = adapter.stored_adapter - op_group = OperationGroup(self.test_project.id, "group", "range1[1..2]") - op_group = dao.store_entity(op_group) - params_1 = json.dumps({"param_5": "1", "param_1": datatypes[0].gid, "param_6": "2"}) - params_2 = json.dumps({"param_5": "1", "param_4": datatypes[1].gid, "param_6": "5"}) + operation1 = Operation(self.test_user.id, self.test_project.id, algorithm.id, + json.dumps({'gid': view_model.gid.hex}), op_group_id=group.id) + operation2 = Operation(self.test_user.id, self.test_project.id, algorithm.id, + json.dumps({'gid': view_model.gid.hex}), op_group_id=group.id) + dao.store_entities([operation1, operation2]) - algo = test_adapter_factory(adapter_class=TestAdapter3) - op1 = Operation(self.test_user.id, self.test_project.id, algo.id, params_1, op_group_id=op_group.id) - op2 = Operation(self.test_user.id, self.test_project.id, algo.id, params_2, op_group_id=op_group.id) - dao.store_entities([op1, op2]) + OperationService()._store_view_model(operation1, dao.get_project_by_id(self.test_project.id), view_model) + OperationService()._store_view_model(operation2, dao.get_project_by_id(self.test_project.id), view_model) - inputs = self.project_service.get_datatypes_inputs_for_operation_group(op_group.id, self.relevant_filter) + inputs = self.project_service.get_datatypes_inputs_for_operation_group(group.id, self.relevant_filter) assert len(inputs) == 0 - inputs = self.project_service.get_datatypes_inputs_for_operation_group(op_group.id, self.full_filter) + inputs = self.project_service.get_datatypes_inputs_for_operation_group(group.id, self.full_filter) assert len(inputs) == 1, "Incorrect number of dataTypes." - assert not datatypes[0].id == inputs[0].id, "Retrieved wrong dataType." - assert not datatypes[1].id == inputs[0].id, "Retrieved wrong dataType." assert group.id == inputs[0].id, "Retrieved wrong dataType." - datatypes[0].visible = True - dao.store_entity(datatypes[0]) + conn.visible = True + dao.store_entity(conn) - inputs = self.project_service.get_datatypes_inputs_for_operation_group(op_group.id, self.relevant_filter) + inputs = self.project_service.get_datatypes_inputs_for_operation_group(group.id, self.relevant_filter) assert len(inputs) == 1, "Incorrect number of dataTypes." - assert not datatypes[0].id == inputs[0].id, "Retrieved wrong dataType." - assert not datatypes[1].id == inputs[0].id, "Retrieved wrong dataType." - assert group.id == inputs[0].id, "Retrieved wrong dataType." - inputs = self.project_service.get_datatypes_inputs_for_operation_group(op_group.id, self.full_filter) + inputs = self.project_service.get_datatypes_inputs_for_operation_group(group.id, self.full_filter) assert len(inputs) == 1, "Incorrect number of dataTypes." - assert not datatypes[0].id == inputs[0].id, "Retrieved wrong dataType." - assert not datatypes[1].id == inputs[0].id, "Retrieved wrong dataType." assert group.id == inputs[0].id, "Retrieved wrong dataType." - def test_get_inputs_for_op_group_simple_inputs(self, array_factory, test_adapter_factory): - """ - Tests method get_datatypes_inputs_for_operation_group. - The dataType inputs will not be part of a dataType group. - """ - # it's a list of 3 elem. - array_wrappers = array_factory(self.test_project) - array_wrapper_ids = [] - for datatype in array_wrappers: - array_wrapper_ids.append(datatype[0]) - - datatype = dao.get_datatype_by_id(array_wrapper_ids[0]) - datatype.visible = False - dao.store_entity(datatype) - - op_group = OperationGroup(self.test_project.id, "group", "range1[1..2]") - op_group = dao.store_entity(op_group) - params_1 = json.dumps({"param_5": "2", "param_1": array_wrappers[0][2], - "param_2": array_wrappers[1][2], "param_6": "7"}) - params_2 = json.dumps({"param_5": "5", "param_3": array_wrappers[2][2], - "param_2": array_wrappers[1][2], "param_6": "6"}) - - algo = test_adapter_factory(adapter_class=TestAdapter3) - op1 = Operation(self.test_user.id, self.test_project.id, algo.id, params_1, op_group_id=op_group.id) - op2 = Operation(self.test_user.id, self.test_project.id, algo.id, params_2, op_group_id=op_group.id) - dao.store_entities([op1, op2]) - - inputs = self.project_service.get_datatypes_inputs_for_operation_group(op_group.id, self.relevant_filter) - assert len(inputs) == 2 - assert not array_wrapper_ids[0] in [inputs[0].id, inputs[1].id], "Retrieved wrong dataType." - assert array_wrapper_ids[1] in [inputs[0].id, inputs[1].id], "Retrieved wrong dataType." - assert array_wrapper_ids[2] in [inputs[0].id, inputs[1].id], "Retrieved wrong dataType." - - inputs = self.project_service.get_datatypes_inputs_for_operation_group(op_group.id, self.full_filter) - assert len(inputs) == 3, "Incorrect number of dataTypes." - assert array_wrapper_ids[0] in [inputs[0].id, inputs[1].id, inputs[2].id] - assert array_wrapper_ids[1] in [inputs[0].id, inputs[1].id, inputs[2].id] - assert array_wrapper_ids[2] in [inputs[0].id, inputs[1].id, inputs[2].id] - def test_remove_datatype(self, array_factory): """ Tests the deletion of a datatype.