Skip to content
Snippets Groups Projects
Select Git revision
  • 771cf1695f2b968c290ca63b4428d72f66b55a34
  • master default protected
  • tut_ring_allen
  • docs_furo
  • docs_reorder_cable_cell
  • docs_graphviz
  • docs_rtd_dev
  • ebrains_mirror
  • doc_recat
  • docs_spike_source
  • docs_sim_sample_clar
  • docs_pip_warn
  • github_template_updates
  • docs_fix_link
  • cv_default_and_doc_clarification
  • docs_add_numpy_req
  • readme_zenodo_05
  • install_python_fix
  • install_require_numpy
  • typofix_propetries
  • docs_recipe_lookup
  • v0.10.0
  • v0.10.1
  • v0.10.0-rc5
  • v0.10.0-rc4
  • v0.10.0-rc3
  • v0.10.0-rc2
  • v0.10.0-rc
  • v0.9.0
  • v0.9.0-rc
  • v0.8.1
  • v0.8
  • v0.8-rc
  • v0.7
  • v0.6
  • v0.5.2
  • v0.5.1
  • v0.5
  • v0.4
  • v0.3
  • v0.2.2
41 results

point.hpp

Blame
  • monet_db_facade.py 8.31 KiB
    import re
    from contextlib import contextmanager
    from functools import wraps
    from math import log2
    from typing import Any
    from typing import List
    from typing import Optional
    
    import pymonetdb
    from eventlet.greenthread import sleep
    from eventlet.lock import Semaphore
    from pydantic import BaseModel
    from pymonetdb import DatabaseError
    from pymonetdb import ProgrammingError
    
    from mipengine.node import config as node_config
    from mipengine.node import node_logger as logging
    
    query_execution_lock = Semaphore()
    udf_execution_lock = Semaphore()
    
    
    class _DBExecutionDTO(BaseModel):
        query: str
        parameters: Optional[List[Any]]
        timeout: Optional[int]
    
        class Config:
            allow_mutation = False
    
    
    def db_execute_and_fetchall(query: str, parameters=None) -> List:
        query_execution_timeout = node_config.celery.tasks_timeout
        db_execution_dto = _DBExecutionDTO(
            query=query, parameters=parameters, timeout=query_execution_timeout
        )
        return _execute_and_fetchall(db_execution_dto=db_execution_dto)
    
    
    def db_execute_query(query: str, parameters=None):
        query_execution_timeout = node_config.celery.tasks_timeout
        query = convert_to_idempotent(query)
        db_execution_dto = _DBExecutionDTO(
            query=query, parameters=parameters, timeout=query_execution_timeout
        )
        _execute(db_execution_dto=db_execution_dto, lock=query_execution_lock)
    
    
    def db_execute_udf(query: str, parameters=None):
        # Check if there is only one query
        split_queries = [query for query in query.strip().split(";") if query]
        if len(split_queries) > 1:
            raise ValueError(f"UDF execution query: {query} should contain only one query.")
    
        udf_execution_timeout = node_config.celery.run_udf_task_timeout
        query = convert_udf_execution_query_to_idempotent(query)
        db_execution_dto = _DBExecutionDTO(
            query=query, parameters=parameters, timeout=udf_execution_timeout
        )
        _execute(db_execution_dto=db_execution_dto, lock=udf_execution_lock)
    
    
    # Connection Pool disabled due to bugs in maintaining connections
    @contextmanager
    def _connection():
        conn = pymonetdb.connect(
            hostname=node_config.monetdb.ip,
            port=node_config.monetdb.port,
            username=node_config.monetdb.username,
            password=node_config.monetdb.password,
            database=node_config.monetdb.database,
        )
        yield conn
        conn.close()
    
    
    @contextmanager
    def _cursor(commit=False):
        with _connection() as conn:
            cur = conn.cursor()
            yield cur
            cur.close()
            if commit:
                conn.commit()
    
    
    @contextmanager
    def _lock(query_lock, timeout):
        acquired = query_lock.acquire(timeout=timeout)
        if not acquired:
            raise TimeoutError("Could not acquire the lock in the designed timeout.")
        try:
            yield
        finally:
            query_lock.release()
    
    
    def _validate_exception_is_recoverable(exc):
        """
        Check whether the query needs to be re-executed and return True or False accordingly.
        """
        if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
            return True
        elif isinstance(exc, DatabaseError):
            return "ValueError" not in str(exc) and not isinstance(exc, ProgrammingError)
        else:
            return False
    
    
    def _execute_queries_with_error_handling(func):
        @wraps(func)
        def error_handling(**kwargs):
            """
            On the query execution, handle the 'BrokenPipeError' and 'pymonetdb.exceptions.DatabaseError' exceptions.
            In these cases, try to recover the connection with the database for x amount of time (x should not exceed the timeout).
            """
            db_execution_dto = kwargs["db_execution_dto"]
    
            logger = logging.get_logger()
            logger.debug(
                f"query: {db_execution_dto.query} \n, parameters: {db_execution_dto.parameters}"
            )
    
            attempts = 0
            max_attempts = int(log2(db_execution_dto.timeout))
    
            while True:
                try:
                    return func(**kwargs)
                except Exception as exc:
                    if not _validate_exception_is_recoverable(exc):
                        logger.error(
                            f"Error occurred: Exception type: '{type(exc)}' and exception message: '{exc}'"
                        )
                        raise exc
    
                    logger.warning(
                        f"Trying to recover the connection with the database. "
                        f"Exception type: '{type(exc)}' and exception message: '{exc}'. "
                        f"Attempts={attempts}"
                    )
                    sleep(pow(2, attempts))
                    attempts += 1
    
                    if attempts >= max_attempts:
                        raise exc
    
        return error_handling
    
    
    @_execute_queries_with_error_handling
    def _execute_and_fetchall(db_execution_dto) -> List:
        """
        Used to execute only select queries that return a result.
        'parameters' option to provide the functionality of bind-parameters.
        """
        with _cursor() as cur:
            cur.execute(db_execution_dto.query, db_execution_dto.parameters)
            result = cur.fetchall()
        return result
    
    
    def convert_udf_execution_query_to_idempotent(query: str) -> str:
        def extract_table_name(query: str) -> str:
            """
            Extracts the name of the table from an INSERT INTO statement.
    
            Args:
                query (str): The SQL query to extract the table name from.
    
            Returns:
                str: The name of the table.
            """
            # Use a regular expression to extract the table name
            insert_regex = r"(?i)INSERT\s+INTO\s+(\w+)"
            match = re.search(insert_regex, query)
            if match:
                table_name = match.group(1)
                return table_name
            else:
                raise ValueError("Query is not a valid INSERT INTO statement.")
    
        return (
            f"{query.rstrip(';')}\n"
            f"WHERE NOT EXISTS (SELECT * FROM {extract_table_name(query)});"
        )
    
    
    def convert_to_idempotent(query: str) -> str:
        """
        This function creates an idempotent query to protect from a potential edge case
        where a table creation query is interrupted due to a UDF running and allocating memory.
        """
        idempotent_query = query
    
        if "CREATE" in query:
            idempotent_query = idempotent_query.replace(
                "CREATE TABLE", "CREATE TABLE IF NOT EXISTS"
            )
            idempotent_query = idempotent_query.replace(
                "CREATE MERGE TABLE", "CREATE MERGE TABLE IF NOT EXISTS"
            )
            idempotent_query = idempotent_query.replace(
                "CREATE REMOTE TABLE", "CREATE REMOTE TABLE IF NOT EXISTS"
            )
            idempotent_query = idempotent_query.replace(
                "CREATE VIEW", "CREATE OR REPLACE VIEW"
            )
    
        if "DROP" in query:
            idempotent_query = idempotent_query.replace(
                "DROP TABLE", "DROP TABLE IF EXISTS"
            )
            idempotent_query = idempotent_query.replace("DROP VIEW", "DROP VIEW IF EXISTS")
            idempotent_query = idempotent_query.replace(
                "DROP FUNCTION", "DROP FUNCTION IF EXISTS"
            )
    
        return idempotent_query
    
    
    @_execute_queries_with_error_handling
    def _execute(db_execution_dto: _DBExecutionDTO, lock):
        """
        Executes statements that don't have a result. For example "CREATE,DROP,UPDATE,INSERT".
    
        By adding create_function_query_lock we serialized the execution of the queries that contain 'create remote table',
        in order to a bug that was found.
        https://github.com/MonetDB/MonetDB/issues/7304
    
        By adding create_remote_table_query_lock we serialized the execution of the queries that contain 'create or replace function',
        in order to handle the error 'CREATE OR REPLACE FUNCTION: transaction conflict detected'
        https://www.mail-archive.com/checkin-list@monetdb.org/msg46062.html
    
        By adding insert_query_lock we serialized the execution of the queries that contain 'INSERT INTO'.
        We need this insert_query_lock in order to ensure that we will have the zero-cost that the monetdb provides on the udfs.
    
        'parameters' option to provide the functionality of bind-parameters.
        """
    
        try:
            with _lock(lock, db_execution_dto.timeout):
                with _cursor(commit=True) as cur:
                    cur.execute(db_execution_dto.query, db_execution_dto.parameters)
        except TimeoutError:
            error_msg = f"""
            The execution of {db_execution_dto} failed because the
            lock was not acquired during
            {db_execution_dto.timeout}
            """
            raise TimeoutError(error_msg)