Skip to content
Snippets Groups Projects
Commit dff630b2 authored by Ludovic Claude's avatar Ludovic Claude
Browse files

Ensure Akka cluster has started

parent 1edc0cfe
No related branches found
No related tags found
No related merge requests found
package eu.hbp.mip;
import akka.actor.ActorRef;
import akka.cluster.Cluster;
import ch.chuv.lren.woken.messages.datasets.Dataset;
import com.google.gson.Gson;
import eu.hbp.mip.controllers.DatasetsApi;
import eu.hbp.mip.controllers.MiningApi;
import eu.hbp.mip.controllers.VariablesApi;
import eu.hbp.mip.model.Algorithm;
import eu.hbp.mip.model.MiningQuery;
import eu.hbp.mip.model.Variable;
import eu.hbp.mip.repositories.VariableRepository;
import org.slf4j.Logger;
......@@ -16,7 +16,7 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.concurrent.Semaphore;
@Component
public class StartupTasks implements ApplicationListener<ApplicationReadyEvent> {
......@@ -36,8 +36,16 @@ public class StartupTasks implements ApplicationListener<ApplicationReadyEvent>
@Autowired
private MiningApi miningApi;
@Autowired
Cluster wokenCluster;
@Autowired
private ActorRef wokenMediator;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
startAkka();
// Pre-fill the local variable repository with the list of datasets, interpreted here as variables
// (a bit like a categorical variable can be split into a set of variables (a.k.a one hot encoding in Data science) )
// Try 5 times, to be more robust in the face of cluster failures / slow startup
......@@ -72,4 +80,27 @@ public class StartupTasks implements ApplicationListener<ApplicationReadyEvent>
LOGGER.info("MIP Portal backend is ready!");
}
private void startAkka() {
Semaphore semaphore = new Semaphore(1);
LOGGER.info("Step 1/3: Starting actor system...");
wokenCluster.registerOnMemberUp( () -> {
LOGGER.info("Step 2/3: Cluster up, registering the actors...");
LOGGER.info("Woken Mediator available at " + wokenMediator.path().toStringWithoutAddress());
LOGGER.info("Step 3/3: Startup complete.");
semaphore.release();
});
try {
semaphore.acquire();
Thread.sleep(5000);
} catch (InterruptedException e) {
LOGGER.warn("Cannot wait for Akka cluster start", e);
}
}
}
......@@ -14,7 +14,9 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import javax.annotation.PostConstruct;
import java.util.List;
import static eu.hbp.mip.akka.SpringExtension.SPRING_EXTENSION_PROVIDER;
......@@ -36,24 +38,37 @@ class AkkaConfiguration {
@Bean
public ExtendedActorSystem actorSystem() {
LOGGER.info("Create actor system at " + wokenClusterHost() + ":" + wokenClusterPort());
ExtendedActorSystem system = (ExtendedActorSystem) ActorSystem.create("woken", config);
SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
return system;
}
@Bean
@Lazy
public Cluster wokenCluster() {
return Cluster.get(actorSystem());
}
@Bean
public String wokenClusterHost() {
return config.getString("clustering.ip");
}
@Bean
public Integer wokenClusterPort() {
return config.getInt("clustering.port");
}
@Bean
public List<String> wokenPath() {
return config.getStringList("akka.cluster.seed-nodes");
}
@Bean
@Lazy
public ActorRef wokenMediator() {
LOGGER.info("Connect to Woken cluster at " + String.join(",", wokenPath()));
LOGGER.info("Connect to Woken cluster nodes at " + String.join(",", wokenPath()));
return DistributedPubSub.get(actorSystem()).mediator();
}
......
......@@ -21,11 +21,9 @@ akka {
}
remote {
log-sent-messages = on
log-received-messages = on
log-remote-lifecycle-events = on
maximum-payload-bytes = 10000000 bytes
log-sent-messages = off
log-received-messages = off
log-remote-lifecycle-events = off
watch-failure-detector {
acceptable-heartbeat-pause = 20 s
......@@ -42,7 +40,6 @@ akka {
bind.port = ${clustering.port} # internal (bind) port
}
}
cluster {
......@@ -61,7 +58,7 @@ akka {
clustering {
ip = "127.0.0.1"
ip = ${?CLUSTER_IP}
port = 4089
port = 4489
port = ${?CLUSTER_PORT}
seed-ip = "woken"
seed-ip = ${?CLUSTER_IP}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment