Skip to content
Snippets Groups Projects
Commit 0ba1faf4 authored by Ludovic Claude's avatar Ludovic Claude
Browse files

Replace cluster client by distributed pubsub

parent eafc61a8
No related branches found
No related tags found
No related merge requests found
package eu.hbp.mip.akka;
import akka.actor.ActorPath;
import akka.actor.ActorPaths;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.client.ClusterClient;
import akka.cluster.client.ClusterClientSettings;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.pattern.Patterns;
import akka.util.Timeout;
import ch.chuv.lren.woken.messages.query.Query;
......@@ -22,8 +20,6 @@ import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.Set;
import java.util.function.Function;
/**
......@@ -42,29 +38,23 @@ public abstract class WokenClientController {
@Value("#{'${akka.woken.path:/user/entrypoint}'}")
private String wokenPath;
private ActorRef wokenClient;
private ActorRef wokenMediator;
@SuppressWarnings("unused")
@PostConstruct
public void initClusterClient() {
LOGGER.info("Start Woken client " + wokenReceptionistPath);
wokenClient = actorSystem.actorOf(ClusterClient.props(
ClusterClientSettings.create(actorSystem).withInitialContacts(initialContacts())),
"client-" + getClass().getSimpleName());
}
private Set<ActorPath> initialContacts () {
return Collections.singleton(ActorPaths.fromString(wokenReceptionistPath));
wokenMediator = DistributedPubSub.get(actorSystem).mediator();
}
@SuppressWarnings("unchecked")
protected <A, B> B askWoken(A message, int waitInSeconds) throws Exception {
LOGGER.info("Akka is trying to reach remote " + wokenPath);
ClusterClient.Send queryMessage = new ClusterClient.Send(wokenPath, message, true);
DistributedPubSubMediator.Send queryMessage = new DistributedPubSubMediator.Send(wokenPath, message, true);
Timeout timeout = new Timeout(Duration.create(waitInSeconds, "seconds"));
Future<Object> future = Patterns.ask(wokenClient, queryMessage, timeout);
Future<Object> future = Patterns.ask(wokenMediator, queryMessage, timeout);
return (B) Await.result(future, timeout.duration());
}
......@@ -94,9 +84,9 @@ public abstract class WokenClientController {
protected <A extends Query> Future<Object> sendWokenQuery(A query, int timeout) {
LOGGER.info("Akka is trying to reach remote " + wokenPath);
ClusterClient.Send queryMessage = new ClusterClient.Send(wokenPath, query, true);
DistributedPubSubMediator.Send queryMessage = new DistributedPubSubMediator.Send(wokenPath, query, true);
return Patterns.ask(wokenClient, queryMessage, timeout);
return Patterns.ask(wokenMediator, queryMessage, timeout);
}
protected ExecutionContext getExecutor() {
......
akka {
loglevel = INFO
log-config-on-start = on
loglevel = "WARNING"
loglevel = ${?LOG_LEVEL}
stdout-loglevel = "WARNING"
stdout-loglevel = ${?LOG_LEVEL}
loggers = ["akka.event.slf4j.Slf4jLogger"]
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "cluster"
log-config-on-start = off
log-config-on-start = ${?LOG_CONFIG}
debug {
receive = on
autoreceive = off
lifecycle = on
fsm = on
unhandled = on
event-stream = off
}
log-dead-letters = 10
log-dead-letters-during-shutdown = off
actor {
provider = "cluster"
}
remote {
log-sent-messages = on
log-received-messages = on
log-remote-lifecycle-events = on
maximum-payload-bytes = 10000000 bytes
watch-failure-detector {
acceptable-heartbeat-pause = 20 s
}
netty.tcp {
message-frame-size = 10000000b
send-buffer-size = 10000000b
......@@ -31,21 +36,21 @@ akka {
hostname = ${clustering.ip} # external (logical) hostname
port = ${clustering.port} # external (logical) port
bind.hostname = 0.0.0.0 # internal (bind) hostname
bind.port = ${clustering.port} # internal (bind) port
bind-hostname = 0.0.0.0 # internal (bind) hostname
bind-port = ${clustering.port} # internal (bind) port
}
}
cluster {
roles = ["test"]
seed-nodes = [
"akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port}
]
roles = ["portal"]
client {
initial-contacts = ["akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port}"/system/receptionist"]
}
}
extensions += "akka.cluster.pubsub.DistributedPubSub"
extensions += "akka.cluster.client.ClusterClientReceptionist"
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment