Skip to content
Snippets Groups Projects
Commit ab0ca468 authored by kfilippopolitis's avatar kfilippopolitis Committed by ThanKarab
Browse files

Renamed make-> convert to idempotent

parent d92302a4
No related branches found
No related tags found
No related merge requests found
......@@ -39,7 +39,7 @@ def db_execute_and_fetchall(query: str, parameters=None) -> List:
def db_execute_query(query: str, parameters=None):
query_execution_timeout = node_config.celery.tasks_timeout
query = make_idempotent(query)
query = convert_to_idempotent(query)
db_execution_dto = _DBExecutionDTO(
query=query, parameters=parameters, timeout=query_execution_timeout
)
......@@ -53,7 +53,7 @@ def db_execute_udf(query: str, parameters=None):
raise ValueError(f"UDF execution query: {query} should contain only one query.")
udf_execution_timeout = node_config.celery.run_udf_task_timeout
query = make_udf_execution_idempotent(query)
query = convert_udf_execution_query_to_idempotent(query)
db_execution_dto = _DBExecutionDTO(
query=query, parameters=parameters, timeout=udf_execution_timeout
)
......@@ -99,7 +99,7 @@ def _validate_exception_is_recoverable(exc):
"""
Check whether the query needs to be re-executed and return True or False accordingly.
"""
if isinstance(exc, BrokenPipeError) or isinstance(exc, ConnectionResetError):
if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
return True
elif isinstance(exc, DatabaseError):
return "ValueError" not in str(exc) and not isinstance(exc, ProgrammingError)
......@@ -128,11 +128,6 @@ def _execute_queries_with_error_handling(func):
try:
return func(**kwargs)
except Exception as exc:
if isinstance(
exc, ProgrammingError
) and "is already part of MERGE TABLE" in str(exc):
return
if not _validate_exception_is_recoverable(exc):
logger.error(
f"Error occurred: Exception type: '{type(exc)}' and exception message: '{exc}'"
......@@ -165,7 +160,7 @@ def _execute_and_fetchall(db_execution_dto) -> List:
return result
def make_udf_execution_idempotent(query: str) -> str:
def convert_udf_execution_query_to_idempotent(query: str) -> str:
def extract_table_name(query: str) -> str:
"""
Extracts the name of the table from an INSERT INTO statement.
......@@ -191,7 +186,7 @@ def make_udf_execution_idempotent(query: str) -> str:
)
def make_idempotent(query: str) -> str:
def convert_to_idempotent(query: str) -> str:
"""
This function creates an idempotent query to protect from a potential edge case
where a table creation query is interrupted due to a UDF running and allocating memory.
......
import pytest
from mipengine.node.monetdb_interface.monet_db_facade import make_idempotent
from mipengine.node.monetdb_interface.monet_db_facade import convert_to_idempotent
from mipengine.node.monetdb_interface.monet_db_facade import (
make_udf_execution_idempotent,
convert_udf_execution_query_to_idempotent,
)
......@@ -25,12 +25,12 @@ from mipengine.node.monetdb_interface.monet_db_facade import (
],
)
def test_make_idempotent(query, expected_idempotent_query):
assert make_idempotent(query) == expected_idempotent_query
assert convert_to_idempotent(query) == expected_idempotent_query
def test_make_udf_execution_idempotent():
assert (
make_udf_execution_idempotent("INSERT INTO my_tbl1 VALUES (1);")
convert_udf_execution_query_to_idempotent("INSERT INTO my_tbl1 VALUES (1);")
== "INSERT INTO my_tbl1 VALUES (1)\n"
"WHERE NOT EXISTS (SELECT * FROM my_tbl1);"
)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment