From 97784ec8258d0729e4ba8f2dbd648f10537e6316 Mon Sep 17 00:00:00 2001 From: Ludovic Claude <ludovic.claude@laposte.net> Date: Thu, 26 Apr 2018 02:04:57 +0200 Subject: [PATCH] Ensure Akka cluster has started (v.II) --- src/main/java/eu/hbp/mip/StartupTasks.java | 32 +------------- .../mip/configuration/AkkaConfiguration.java | 42 +++++++++++++++---- src/main/resources/log4j2.xml | 14 +++---- 3 files changed, 43 insertions(+), 45 deletions(-) diff --git a/src/main/java/eu/hbp/mip/StartupTasks.java b/src/main/java/eu/hbp/mip/StartupTasks.java index 05e778dc7..b09bc06a5 100644 --- a/src/main/java/eu/hbp/mip/StartupTasks.java +++ b/src/main/java/eu/hbp/mip/StartupTasks.java @@ -14,11 +14,12 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; +import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; -import java.util.concurrent.Semaphore; @Component +@DependsOn("wokenCluster") public class StartupTasks implements ApplicationListener<ApplicationReadyEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(StartupTasks.class); @@ -36,15 +37,8 @@ public class StartupTasks implements ApplicationListener<ApplicationReadyEvent> @Autowired private MiningApi miningApi; - @Autowired - Cluster wokenCluster; - - @Autowired - private ActorRef wokenMediator; - @Override public void onApplicationEvent(ApplicationReadyEvent event) { - startAkka(); // Pre-fill the local variable repository with the list of datasets, interpreted here as variables // (a bit like a categorical variable can be split into a set of variables (a.k.a one hot encoding in Data science) ) @@ -81,26 +75,4 @@ public class StartupTasks implements ApplicationListener<ApplicationReadyEvent> LOGGER.info("MIP Portal backend is ready!"); } - private void startAkka() { - Semaphore semaphore = new Semaphore(1); - LOGGER.info("Step 1/3: Starting actor system..."); - wokenCluster.registerOnMemberUp( () -> { - LOGGER.info("Step 2/3: Cluster up, registering the actors..."); - - LOGGER.info("Woken Mediator available at " + wokenMediator.path().toStringWithoutAddress()); - - LOGGER.info("Step 3/3: Startup complete."); - semaphore.release(); - }); - - try { - semaphore.acquire(); - Thread.sleep(5000); - } catch (InterruptedException e) { - LOGGER.warn("Cannot wait for Akka cluster start", e); - } - - } - - } diff --git a/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java b/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java index e7f11e10e..b6322e4b5 100644 --- a/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java +++ b/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java @@ -11,13 +11,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Lazy; +import org.springframework.context.annotation.*; -import javax.annotation.PostConstruct; import java.util.List; +import java.util.concurrent.Semaphore; import static eu.hbp.mip.akka.SpringExtension.SPRING_EXTENSION_PROVIDER; @@ -36,8 +33,10 @@ class AkkaConfiguration { private final Config config = ConfigFactory.load("application.conf"); + @Bean public ExtendedActorSystem actorSystem() { + LOGGER.info("Step 1/3: Starting actor system..."); LOGGER.info("Create actor system at " + wokenClusterHost() + ":" + wokenClusterPort()); ExtendedActorSystem system = (ExtendedActorSystem) ActorSystem.create("woken", config); SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext); @@ -45,9 +44,36 @@ class AkkaConfiguration { } @Bean - @Lazy public Cluster wokenCluster() { - return Cluster.get(actorSystem()); + Cluster cluster = Cluster.get(actorSystem()); + LOGGER.info("Connect to Woken cluster nodes at " + String.join(",", wokenPath())); + Semaphore semaphore = new Semaphore(1); + cluster.registerOnMemberUp( () -> { + LOGGER.info("Step 2/3: Cluster up, registering the actors..."); + + // Do not call wokenMediator() here to avoid recursive loops + ActorRef mediator = DistributedPubSub.get(actorSystem()).mediator(); + + LOGGER.info("Woken Mediator available at " + mediator.path().toStringWithoutAddress()); + + semaphore.release(); + }); + + try { + semaphore.acquire(); + Thread.sleep(5000); + } catch (InterruptedException e) { + LOGGER.warn("Cannot wait for Akka cluster start", e); + } + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + cluster.leave(cluster.selfAddress()); + }) + ); + + LOGGER.info("Step 3/3: Cluster connected to Woken."); + + return cluster; } @Bean @@ -67,8 +93,8 @@ class AkkaConfiguration { @Bean @Lazy + @DependsOn("wokenCluster") public ActorRef wokenMediator() { - LOGGER.info("Connect to Woken cluster nodes at " + String.join(",", wokenPath())); return DistributedPubSub.get(actorSystem()).mediator(); } diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 025dcecbc..659940446 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -10,25 +10,25 @@ <Root level="DEBUG"> <AppenderRef ref="Console"/> </Root> - <Logger name="akka.actor.LocalActorRefProvider(akka://woken)" level="info"> + <Logger name="akka.actor.LocalActorRefProvider(akka://woken)" level="INFO"> <AppenderRef ref="Console"/> </Logger> - <Logger name="akka.cluster" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-info}"> + <Logger name="akka.cluster" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-INFO}"> <AppenderRef ref="Console"/> </Logger> - <Logger name="akka.io" level="info"> + <Logger name="akka.io" level="INFO"> <AppenderRef ref="Console"/> </Logger> - <Logger name="akka.http.impl" level="info"> + <Logger name="akka.http.impl" level="INFO"> <AppenderRef ref="Console"/> </Logger> - <Logger name="akka.remote.artery.compress" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-info}"> + <Logger name="akka.remote.artery.compress" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-INFO}"> <AppenderRef ref="Console"/> </Logger> - <Logger name="akka.remote.artery.tcp" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-info}"> + <Logger name="akka.remote.artery.tcp" level="${sys:AKKA_CLUSTER_LOG_LEVEL:-INFO}"> <AppenderRef ref="Console"/> </Logger> - <Logger name="akka.stream.impl" level="info"> + <Logger name="akka.stream.impl" level="INFO"> <AppenderRef ref="Console"/> </Logger> </Loggers> -- GitLab