Fix/mip 575/celery open many connections
Created by: apmariglis
fix for MIP-575, MIP-574, MIP-595
-
At all times there is only one
Celery
object per node in the system. This logic is implemented by aCeleryAppFactory
class inmipengine/controller/celery_app.py
-
Due to bug(s) in the Celery framework (my posts 1,2 on celery's github) if the broker (rabbitmq) would go down and up again, although the tasks would continue to be successfully queued and processed by the consumer celery app, the result would not be returned to the producer celery app until a new
Celery
instance is created on the producer side. A workaround for this problem is implemented in theCeleryWrapper
class inmipengine/controller/celery_app.py
, where a newCelery
object is instantiated when specific exceptions are raised and caught, indicating rabbitmq went down. -
The controller layer has to execute 3 different actions without any one of them waiting for the other. These actions are 1.repeatedly check which nodes are available (and check what data they contain, Node Landscape Aggregator) at any time, 2.execute algorithms and 3.repeatedly clean database artifacts(tables,views etc.) created during algorithms' execution and are not needed anymore (Cleaner,
mipengine/controller/cleaner.py
). Up until now these were handled by a mix ofasyncio
and threading which was becoming a mess. Implementing these using theasyncio
framework would be the most efficient approach but it would need some refactoring. Leaving that for a future task I removed all unnecessaryasyncio
usage and moved solely to threads which makes the code much simpler. More specifically:-
NodeLandscapeAggregator
:start
method starts executing the_update
in a new thread, updating the information about the nodes participating in the federation at any point -
Cleaner
:start
method starts executing the_cleanup_loop
method in a new thread, repeatedly checking if there are artifacts to be dropped and if so queues the relevant tasks to the nodes - Algorithm execution (initiated in
Controller
): Each algorithm execution is essentially comprised by a series of task queuing for the nodes to execute and return the results. Each algorithm execution is queuing tasks on a separate thread. Nevertheless, in the case of the algorithm execution although each algorithm is executed on a separate thread of aThreadpool
, the control is yield to the event loop by using therun_in_executor
method along withawait
.
-