Skip to content

Fix/mip 575/celery open many connections

Created by: apmariglis

fix for MIP-575, MIP-574, MIP-595

  1. At all times there is only one Celery object per node in the system. This logic is implemented by a CeleryAppFactory class in mipengine/controller/celery_app.py

  2. Due to bug(s) in the Celery framework (my posts 1,2 on celery's github) if the broker (rabbitmq) would go down and up again, although the tasks would continue to be successfully queued and processed by the consumer celery app, the result would not be returned to the producer celery app until a new Celery instance is created on the producer side. A workaround for this problem is implemented in the CeleryWrapper class in mipengine/controller/celery_app.py, where a new Celery object is instantiated when specific exceptions are raised and caught, indicating rabbitmq went down.

  3. The controller layer has to execute 3 different actions without any one of them waiting for the other. These actions are 1.repeatedly check which nodes are available (and check what data they contain, Node Landscape Aggregator) at any time, 2.execute algorithms and 3.repeatedly clean database artifacts(tables,views etc.) created during algorithms' execution and are not needed anymore (Cleaner, mipengine/controller/cleaner.py). Up until now these were handled by a mix of asyncio and threading which was becoming a mess. Implementing these using the asyncio framework would be the most efficient approach but it would need some refactoring. Leaving that for a future task I removed all unnecessary asyncio usage and moved solely to threads which makes the code much simpler. More specifically:

    • NodeLandscapeAggregator: start method starts executing the _update in a new thread, updating the information about the nodes participating in the federation at any point
    • Cleaner: start method starts executing the _cleanup_loop method in a new thread, repeatedly checking if there are artifacts to be dropped and if so queues the relevant tasks to the nodes
    • Algorithm execution (initiated in Controller): Each algorithm execution is essentially comprised by a series of task queuing for the nodes to execute and return the results. Each algorithm execution is queuing tasks on a separate thread. Nevertheless, in the case of the algorithm execution although each algorithm is executed on a separate thread of a Threadpool, the control is yield to the event loop by using the run_in_executor method along with await.

Merge request reports