diff --git a/src/main/java/eu/hbp/mip/akka/WokenClientController.java b/src/main/java/eu/hbp/mip/akka/WokenClientController.java index 2504d6468feaad808d19fffa52f29da4baeac77c..f66b558ab0132159995ab0d518f6ba210bb59ee3 100644 --- a/src/main/java/eu/hbp/mip/akka/WokenClientController.java +++ b/src/main/java/eu/hbp/mip/akka/WokenClientController.java @@ -1,11 +1,9 @@ package eu.hbp.mip.akka; -import akka.actor.ActorPath; -import akka.actor.ActorPaths; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.cluster.client.ClusterClient; -import akka.cluster.client.ClusterClientSettings; +import akka.cluster.pubsub.DistributedPubSub; +import akka.cluster.pubsub.DistributedPubSubMediator; import akka.pattern.Patterns; import akka.util.Timeout; import ch.chuv.lren.woken.messages.query.Query; @@ -22,8 +20,6 @@ import scala.concurrent.Future; import scala.concurrent.duration.Duration; import javax.annotation.PostConstruct; -import java.util.Collections; -import java.util.Set; import java.util.function.Function; /** @@ -42,29 +38,23 @@ public abstract class WokenClientController { @Value("#{'${akka.woken.path:/user/entrypoint}'}") private String wokenPath; - private ActorRef wokenClient; + private ActorRef wokenMediator; @SuppressWarnings("unused") @PostConstruct public void initClusterClient() { LOGGER.info("Start Woken client " + wokenReceptionistPath); - wokenClient = actorSystem.actorOf(ClusterClient.props( - ClusterClientSettings.create(actorSystem).withInitialContacts(initialContacts())), - "client-" + getClass().getSimpleName()); - } - - private Set<ActorPath> initialContacts () { - return Collections.singleton(ActorPaths.fromString(wokenReceptionistPath)); + wokenMediator = DistributedPubSub.get(actorSystem).mediator(); } @SuppressWarnings("unchecked") protected <A, B> B askWoken(A message, int waitInSeconds) throws Exception { LOGGER.info("Akka is trying to reach remote " + wokenPath); - ClusterClient.Send queryMessage = new ClusterClient.Send(wokenPath, message, true); + DistributedPubSubMediator.Send queryMessage = new DistributedPubSubMediator.Send(wokenPath, message, true); Timeout timeout = new Timeout(Duration.create(waitInSeconds, "seconds")); - Future<Object> future = Patterns.ask(wokenClient, queryMessage, timeout); + Future<Object> future = Patterns.ask(wokenMediator, queryMessage, timeout); return (B) Await.result(future, timeout.duration()); } @@ -94,9 +84,9 @@ public abstract class WokenClientController { protected <A extends Query> Future<Object> sendWokenQuery(A query, int timeout) { LOGGER.info("Akka is trying to reach remote " + wokenPath); - ClusterClient.Send queryMessage = new ClusterClient.Send(wokenPath, query, true); + DistributedPubSubMediator.Send queryMessage = new DistributedPubSubMediator.Send(wokenPath, query, true); - return Patterns.ask(wokenClient, queryMessage, timeout); + return Patterns.ask(wokenMediator, queryMessage, timeout); } protected ExecutionContext getExecutor() { diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 711776afa4ac564da86946c61b2af8c3b6c0f104..9ba1bc59e8f248a6ab3a7e8bee44747db10976d3 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -1,27 +1,32 @@ akka { - loglevel = INFO - log-config-on-start = on + loglevel = "WARNING" + loglevel = ${?LOG_LEVEL} + stdout-loglevel = "WARNING" + stdout-loglevel = ${?LOG_LEVEL} + loggers = ["akka.event.slf4j.Slf4jLogger"] + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" - actor { - provider = "cluster" + log-config-on-start = off + log-config-on-start = ${?LOG_CONFIG} - debug { - receive = on - autoreceive = off - lifecycle = on - fsm = on - unhandled = on - event-stream = off - } + log-dead-letters = 10 + log-dead-letters-during-shutdown = off + actor { + provider = "cluster" } remote { log-sent-messages = on log-received-messages = on log-remote-lifecycle-events = on + maximum-payload-bytes = 10000000 bytes + watch-failure-detector { + acceptable-heartbeat-pause = 20 s + } + netty.tcp { message-frame-size = 10000000b send-buffer-size = 10000000b @@ -31,21 +36,21 @@ akka { hostname = ${clustering.ip} # external (logical) hostname port = ${clustering.port} # external (logical) port - bind.hostname = 0.0.0.0 # internal (bind) hostname - bind.port = ${clustering.port} # internal (bind) port + bind-hostname = 0.0.0.0 # internal (bind) hostname + bind-port = ${clustering.port} # internal (bind) port } } cluster { - roles = ["test"] + seed-nodes = [ + "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} + ] + + roles = ["portal"] - client { - initial-contacts = ["akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port}"/system/receptionist"] - } } extensions += "akka.cluster.pubsub.DistributedPubSub" - extensions += "akka.cluster.client.ClusterClientReceptionist" }