diff --git a/mipengine/node/monetdb_interface/monet_db_facade.py b/mipengine/node/monetdb_interface/monet_db_facade.py index eaba4dc1863e6b44fc2adc75b491e04cb86a5533..ef9fda06fb8b4d236862b0ad1230870d1ac4f88a 100644 --- a/mipengine/node/monetdb_interface/monet_db_facade.py +++ b/mipengine/node/monetdb_interface/monet_db_facade.py @@ -39,7 +39,7 @@ def db_execute_and_fetchall(query: str, parameters=None) -> List: def db_execute_query(query: str, parameters=None): query_execution_timeout = node_config.celery.tasks_timeout - query = make_idempotent(query) + query = convert_to_idempotent(query) db_execution_dto = _DBExecutionDTO( query=query, parameters=parameters, timeout=query_execution_timeout ) @@ -53,7 +53,7 @@ def db_execute_udf(query: str, parameters=None): raise ValueError(f"UDF execution query: {query} should contain only one query.") udf_execution_timeout = node_config.celery.run_udf_task_timeout - query = make_udf_execution_idempotent(query) + query = convert_udf_execution_query_to_idempotent(query) db_execution_dto = _DBExecutionDTO( query=query, parameters=parameters, timeout=udf_execution_timeout ) @@ -99,7 +99,7 @@ def _validate_exception_is_recoverable(exc): """ Check whether the query needs to be re-executed and return True or False accordingly. """ - if isinstance(exc, BrokenPipeError) or isinstance(exc, ConnectionResetError): + if isinstance(exc, (BrokenPipeError, ConnectionResetError)): return True elif isinstance(exc, DatabaseError): return "ValueError" not in str(exc) and not isinstance(exc, ProgrammingError) @@ -128,11 +128,6 @@ def _execute_queries_with_error_handling(func): try: return func(**kwargs) except Exception as exc: - if isinstance( - exc, ProgrammingError - ) and "is already part of MERGE TABLE" in str(exc): - return - if not _validate_exception_is_recoverable(exc): logger.error( f"Error occurred: Exception type: '{type(exc)}' and exception message: '{exc}'" @@ -165,7 +160,7 @@ def _execute_and_fetchall(db_execution_dto) -> List: return result -def make_udf_execution_idempotent(query: str) -> str: +def convert_udf_execution_query_to_idempotent(query: str) -> str: def extract_table_name(query: str) -> str: """ Extracts the name of the table from an INSERT INTO statement. @@ -191,7 +186,7 @@ def make_udf_execution_idempotent(query: str) -> str: ) -def make_idempotent(query: str) -> str: +def convert_to_idempotent(query: str) -> str: """ This function creates an idempotent query to protect from a potential edge case where a table creation query is interrupted due to a UDF running and allocating memory. diff --git a/tests/standalone_tests/test_monetdb_interface_if_not_exists.py b/tests/standalone_tests/test_monetdb_interface_if_not_exists.py index dc5c3d42aafe3a949bd18ba4d5ebfed6ad3e3198..da302a76f8f241ce75d506a33b9b2e38561231c0 100644 --- a/tests/standalone_tests/test_monetdb_interface_if_not_exists.py +++ b/tests/standalone_tests/test_monetdb_interface_if_not_exists.py @@ -1,8 +1,8 @@ import pytest -from mipengine.node.monetdb_interface.monet_db_facade import make_idempotent +from mipengine.node.monetdb_interface.monet_db_facade import convert_to_idempotent from mipengine.node.monetdb_interface.monet_db_facade import ( - make_udf_execution_idempotent, + convert_udf_execution_query_to_idempotent, ) @@ -25,12 +25,12 @@ from mipengine.node.monetdb_interface.monet_db_facade import ( ], ) def test_make_idempotent(query, expected_idempotent_query): - assert make_idempotent(query) == expected_idempotent_query + assert convert_to_idempotent(query) == expected_idempotent_query def test_make_udf_execution_idempotent(): assert ( - make_udf_execution_idempotent("INSERT INTO my_tbl1 VALUES (1);") + convert_udf_execution_query_to_idempotent("INSERT INTO my_tbl1 VALUES (1);") == "INSERT INTO my_tbl1 VALUES (1)\n" "WHERE NOT EXISTS (SELECT * FROM my_tbl1);" )