From dff630b204a8c0f40be82e91aa10bbdd3b3d690d Mon Sep 17 00:00:00 2001
From: Ludovic Claude <ludovic.claude@laposte.net>
Date: Thu, 26 Apr 2018 00:46:04 +0200
Subject: [PATCH] Ensure Akka cluster has started

---
 src/main/java/eu/hbp/mip/StartupTasks.java    | 37 +++++++++++++++++--
 .../mip/configuration/AkkaConfiguration.java  | 17 ++++++++-
 src/main/resources/application.conf           | 11 ++----
 3 files changed, 54 insertions(+), 11 deletions(-)

diff --git a/src/main/java/eu/hbp/mip/StartupTasks.java b/src/main/java/eu/hbp/mip/StartupTasks.java
index 4fc511a7a..05e778dc7 100644
--- a/src/main/java/eu/hbp/mip/StartupTasks.java
+++ b/src/main/java/eu/hbp/mip/StartupTasks.java
@@ -1,12 +1,12 @@
 package eu.hbp.mip;
 
+import akka.actor.ActorRef;
+import akka.cluster.Cluster;
 import ch.chuv.lren.woken.messages.datasets.Dataset;
 import com.google.gson.Gson;
 import eu.hbp.mip.controllers.DatasetsApi;
 import eu.hbp.mip.controllers.MiningApi;
 import eu.hbp.mip.controllers.VariablesApi;
-import eu.hbp.mip.model.Algorithm;
-import eu.hbp.mip.model.MiningQuery;
 import eu.hbp.mip.model.Variable;
 import eu.hbp.mip.repositories.VariableRepository;
 import org.slf4j.Logger;
@@ -16,7 +16,7 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
 import org.springframework.context.ApplicationListener;
 import org.springframework.stereotype.Component;
 
-import java.util.Collections;
+import java.util.concurrent.Semaphore;
 
 @Component
 public class StartupTasks implements ApplicationListener<ApplicationReadyEvent> {
@@ -36,8 +36,16 @@ public class StartupTasks implements ApplicationListener<ApplicationReadyEvent>
     @Autowired
     private MiningApi miningApi;
 
+    @Autowired
+    Cluster wokenCluster;
+
+    @Autowired
+    private ActorRef wokenMediator;
+
     @Override
     public void onApplicationEvent(ApplicationReadyEvent event) {
+        startAkka();
+
         // Pre-fill the local variable repository with the list of datasets, interpreted here as variables
         // (a bit like a categorical variable can be split into a set of variables (a.k.a one hot encoding in Data science) )
         // Try 5 times, to be more robust in the face of cluster failures / slow startup
@@ -72,4 +80,27 @@ public class StartupTasks implements ApplicationListener<ApplicationReadyEvent>
 
         LOGGER.info("MIP Portal backend is ready!");
     }
+
+    private void startAkka() {
+        Semaphore semaphore = new Semaphore(1);
+        LOGGER.info("Step 1/3: Starting actor system...");
+        wokenCluster.registerOnMemberUp( () -> {
+            LOGGER.info("Step 2/3: Cluster up, registering the actors...");
+
+            LOGGER.info("Woken Mediator available at " + wokenMediator.path().toStringWithoutAddress());
+
+            LOGGER.info("Step 3/3: Startup complete.");
+            semaphore.release();
+        });
+
+        try {
+            semaphore.acquire();
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            LOGGER.warn("Cannot wait for Akka cluster start", e);
+        }
+
+    }
+
+
 }
diff --git a/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java b/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java
index 32716277a..e7f11e10e 100644
--- a/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java
+++ b/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java
@@ -14,7 +14,9 @@ import org.springframework.context.ApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
 
+import javax.annotation.PostConstruct;
 import java.util.List;
 
 import static eu.hbp.mip.akka.SpringExtension.SPRING_EXTENSION_PROVIDER;
@@ -36,24 +38,37 @@ class AkkaConfiguration {
 
     @Bean
     public ExtendedActorSystem actorSystem() {
+        LOGGER.info("Create actor system at " + wokenClusterHost() + ":" + wokenClusterPort());
         ExtendedActorSystem system = (ExtendedActorSystem) ActorSystem.create("woken", config);
         SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
         return system;
     }
 
     @Bean
+    @Lazy
     public Cluster wokenCluster() {
         return Cluster.get(actorSystem());
     }
 
+    @Bean
+    public String wokenClusterHost() {
+        return config.getString("clustering.ip");
+    }
+
+    @Bean
+    public Integer wokenClusterPort() {
+        return config.getInt("clustering.port");
+    }
+
     @Bean
     public List<String> wokenPath() {
         return config.getStringList("akka.cluster.seed-nodes");
     }
 
     @Bean
+    @Lazy
     public ActorRef wokenMediator() {
-        LOGGER.info("Connect to Woken cluster at " + String.join(",", wokenPath()));
+        LOGGER.info("Connect to Woken cluster nodes at " + String.join(",", wokenPath()));
         return DistributedPubSub.get(actorSystem()).mediator();
     }
 
diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf
index 5f553c58d..6b1930376 100644
--- a/src/main/resources/application.conf
+++ b/src/main/resources/application.conf
@@ -21,11 +21,9 @@ akka {
   }
 
   remote {
-    log-sent-messages = on
-    log-received-messages = on
-    log-remote-lifecycle-events = on
-
-    maximum-payload-bytes = 10000000 bytes
+    log-sent-messages = off
+    log-received-messages = off
+    log-remote-lifecycle-events = off
 
     watch-failure-detector {
       acceptable-heartbeat-pause = 20 s
@@ -42,7 +40,6 @@ akka {
       bind.port = ${clustering.port}  # internal (bind) port
 
     }
-
   }
 
   cluster {
@@ -61,7 +58,7 @@ akka {
 clustering {
   ip = "127.0.0.1"
   ip = ${?CLUSTER_IP}
-  port = 4089
+  port = 4489
   port = ${?CLUSTER_PORT}
   seed-ip = "woken"
   seed-ip = ${?CLUSTER_IP}
-- 
GitLab