Skip to content
Snippets Groups Projects
Commit 97784ec8 authored by Ludovic Claude's avatar Ludovic Claude
Browse files

Ensure Akka cluster has started (v.II)

parent 99eb788f
No related branches found
No related tags found
No related merge requests found
...@@ -14,11 +14,12 @@ import org.slf4j.LoggerFactory; ...@@ -14,11 +14,12 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.Semaphore;
@Component @Component
@DependsOn("wokenCluster")
public class StartupTasks implements ApplicationListener<ApplicationReadyEvent> { public class StartupTasks implements ApplicationListener<ApplicationReadyEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(StartupTasks.class); private static final Logger LOGGER = LoggerFactory.getLogger(StartupTasks.class);
...@@ -36,15 +37,8 @@ public class StartupTasks implements ApplicationListener<ApplicationReadyEvent> ...@@ -36,15 +37,8 @@ public class StartupTasks implements ApplicationListener<ApplicationReadyEvent>
@Autowired @Autowired
private MiningApi miningApi; private MiningApi miningApi;
@Autowired
Cluster wokenCluster;
@Autowired
private ActorRef wokenMediator;
@Override @Override
public void onApplicationEvent(ApplicationReadyEvent event) { public void onApplicationEvent(ApplicationReadyEvent event) {
startAkka();
// Pre-fill the local variable repository with the list of datasets, interpreted here as variables // Pre-fill the local variable repository with the list of datasets, interpreted here as variables
// (a bit like a categorical variable can be split into a set of variables (a.k.a one hot encoding in Data science) ) // (a bit like a categorical variable can be split into a set of variables (a.k.a one hot encoding in Data science) )
...@@ -81,26 +75,4 @@ public class StartupTasks implements ApplicationListener<ApplicationReadyEvent> ...@@ -81,26 +75,4 @@ public class StartupTasks implements ApplicationListener<ApplicationReadyEvent>
LOGGER.info("MIP Portal backend is ready!"); LOGGER.info("MIP Portal backend is ready!");
} }
private void startAkka() {
Semaphore semaphore = new Semaphore(1);
LOGGER.info("Step 1/3: Starting actor system...");
wokenCluster.registerOnMemberUp( () -> {
LOGGER.info("Step 2/3: Cluster up, registering the actors...");
LOGGER.info("Woken Mediator available at " + wokenMediator.path().toStringWithoutAddress());
LOGGER.info("Step 3/3: Startup complete.");
semaphore.release();
});
try {
semaphore.acquire();
Thread.sleep(5000);
} catch (InterruptedException e) {
LOGGER.warn("Cannot wait for Akka cluster start", e);
}
}
} }
...@@ -11,13 +11,10 @@ import org.slf4j.Logger; ...@@ -11,13 +11,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.*;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import javax.annotation.PostConstruct;
import java.util.List; import java.util.List;
import java.util.concurrent.Semaphore;
import static eu.hbp.mip.akka.SpringExtension.SPRING_EXTENSION_PROVIDER; import static eu.hbp.mip.akka.SpringExtension.SPRING_EXTENSION_PROVIDER;
...@@ -36,8 +33,10 @@ class AkkaConfiguration { ...@@ -36,8 +33,10 @@ class AkkaConfiguration {
private final Config config = ConfigFactory.load("application.conf"); private final Config config = ConfigFactory.load("application.conf");
@Bean @Bean
public ExtendedActorSystem actorSystem() { public ExtendedActorSystem actorSystem() {
LOGGER.info("Step 1/3: Starting actor system...");
LOGGER.info("Create actor system at " + wokenClusterHost() + ":" + wokenClusterPort()); LOGGER.info("Create actor system at " + wokenClusterHost() + ":" + wokenClusterPort());
ExtendedActorSystem system = (ExtendedActorSystem) ActorSystem.create("woken", config); ExtendedActorSystem system = (ExtendedActorSystem) ActorSystem.create("woken", config);
SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext); SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
...@@ -45,9 +44,36 @@ class AkkaConfiguration { ...@@ -45,9 +44,36 @@ class AkkaConfiguration {
} }
@Bean @Bean
@Lazy
public Cluster wokenCluster() { public Cluster wokenCluster() {
return Cluster.get(actorSystem()); Cluster cluster = Cluster.get(actorSystem());
LOGGER.info("Connect to Woken cluster nodes at " + String.join(",", wokenPath()));
Semaphore semaphore = new Semaphore(1);
cluster.registerOnMemberUp( () -> {
LOGGER.info("Step 2/3: Cluster up, registering the actors...");
// Do not call wokenMediator() here to avoid recursive loops
ActorRef mediator = DistributedPubSub.get(actorSystem()).mediator();
LOGGER.info("Woken Mediator available at " + mediator.path().toStringWithoutAddress());
semaphore.release();
});
try {
semaphore.acquire();
Thread.sleep(5000);
} catch (InterruptedException e) {
LOGGER.warn("Cannot wait for Akka cluster start", e);
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
cluster.leave(cluster.selfAddress());
})
);
LOGGER.info("Step 3/3: Cluster connected to Woken.");
return cluster;
} }
@Bean @Bean
...@@ -67,8 +93,8 @@ class AkkaConfiguration { ...@@ -67,8 +93,8 @@ class AkkaConfiguration {
@Bean @Bean
@Lazy @Lazy
@DependsOn("wokenCluster")
public ActorRef wokenMediator() { public ActorRef wokenMediator() {
LOGGER.info("Connect to Woken cluster nodes at " + String.join(",", wokenPath()));
return DistributedPubSub.get(actorSystem()).mediator(); return DistributedPubSub.get(actorSystem()).mediator();
} }
......
...@@ -10,25 +10,25 @@ ...@@ -10,25 +10,25 @@
<Root level="DEBUG"> <Root level="DEBUG">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Root> </Root>
<Logger name="akka.actor.LocalActorRefProvider(akka://woken)" level="info"> <Logger name="akka.actor.LocalActorRefProvider(akka://woken)" level="INFO">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Logger> </Logger>
<Logger name="akka.cluster" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-info}"> <Logger name="akka.cluster" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-INFO}">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Logger> </Logger>
<Logger name="akka.io" level="info"> <Logger name="akka.io" level="INFO">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Logger> </Logger>
<Logger name="akka.http.impl" level="info"> <Logger name="akka.http.impl" level="INFO">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Logger> </Logger>
<Logger name="akka.remote.artery.compress" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-info}"> <Logger name="akka.remote.artery.compress" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-INFO}">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Logger> </Logger>
<Logger name="akka.remote.artery.tcp" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-info}"> <Logger name="akka.remote.artery.tcp" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-INFO}">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Logger> </Logger>
<Logger name="akka.stream.impl" level="info"> <Logger name="akka.stream.impl" level="INFO">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Logger> </Logger>
</Loggers> </Loggers>
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment