package eu.hbp.mip.akka; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.cluster.pubsub.DistributedPubSub; import akka.cluster.pubsub.DistributedPubSubMediator; import akka.pattern.Patterns; import akka.util.Timeout; import ch.chuv.lren.woken.messages.query.Query; import ch.chuv.lren.woken.messages.query.QueryResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import scala.concurrent.Await; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import javax.annotation.PostConstruct; import java.util.function.Function; /** * Base class for controllers using Woken services */ public abstract class WokenClientController { protected final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); @Autowired private ActorSystem actorSystem; @Autowired private String wokenReceptionistPath; @Value("#{'${akka.woken.path:/user/entrypoint}'}") private String wokenPath; private ActorRef wokenMediator; @SuppressWarnings("unused") @PostConstruct public void initClusterClient() { LOGGER.info("Start Woken client " + wokenReceptionistPath); wokenMediator = DistributedPubSub.get(actorSystem).mediator(); } @SuppressWarnings("unchecked") protected <A, B> B askWoken(A message, int waitInSeconds) throws Exception { LOGGER.info("Akka is trying to reach remote " + wokenPath); DistributedPubSubMediator.Send queryMessage = new DistributedPubSubMediator.Send(wokenPath, message, true); Timeout timeout = new Timeout(Duration.create(waitInSeconds, "seconds")); Future<Object> future = Patterns.ask(wokenMediator, queryMessage, timeout); return (B) Await.result(future, timeout.duration()); } protected <A, B> ResponseEntity requestWoken(A message, int waitInSeconds, Function<B, ResponseEntity> handleResponse) { try { B result = askWoken(message, waitInSeconds); return handleResponse.apply(result); } catch (Exception e) { final String msg = "Cannot receive result from woken: " + e.getMessage(); LOGGER.error(msg, e); return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(msg); } } protected <A extends Query> ResponseEntity askWokenQuery(A query, int waitInSeconds, Function<QueryResult, ResponseEntity> handleResponse) { try { QueryResult result = askWoken(query, waitInSeconds); return handleResponse.apply(result); } catch (Exception e) { final String msg = "Cannot receive algorithm result from woken: " + e.getMessage(); LOGGER.error(msg, e); return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(msg); } } protected <A extends Query> Future<Object> sendWokenQuery(A query, int timeout) { LOGGER.info("Akka is trying to reach remote " + wokenPath); DistributedPubSubMediator.Send queryMessage = new DistributedPubSubMediator.Send(wokenPath, query, true); return Patterns.ask(wokenMediator, queryMessage, timeout); } protected ExecutionContext getExecutor() { return actorSystem.dispatcher(); } }