diff --git a/Dockerfile b/Dockerfile index 4ae3fd07d092559e6ce11f5d87c3ed34c58fecd7..e59fbb8d26bc0786f2e1b02f0091435322cd72f9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,6 +2,10 @@ FROM hbpmip/java-base-build:3.5.0-jdk-8-9 as java-build-env COPY pom.xml /project/ + +RUN cp /usr/share/maven/ref/settings-docker.xml /root/.m2/settings.xml \ + && mvn clean package + COPY src/ /project/src/ # Repeating the file copy works better. I dunno why. diff --git a/pom.xml b/pom.xml index 10f3ce7b4dfcc7dbc7fefc4e37408bf2bad01b81..7e933d22302208229b7d611d5bb1273fa7879cf3 100644 --- a/pom.xml +++ b/pom.xml @@ -207,6 +207,16 @@ <artifactId>akka-remote_2.11</artifactId> <version>${akka.version}</version> </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster_2.11</artifactId> + <version>${akka.version}</version> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-cluster-tools_2.11</artifactId> + <version>${akka.version}</version> + </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> diff --git a/src/main/java/eu/hbp/mip/akka/WokenClientController.java b/src/main/java/eu/hbp/mip/akka/WokenClientController.java new file mode 100644 index 0000000000000000000000000000000000000000..0f225c860b82849d10d3aae044b2069dfd255c6a --- /dev/null +++ b/src/main/java/eu/hbp/mip/akka/WokenClientController.java @@ -0,0 +1,107 @@ +package eu.hbp.mip.akka; + +import akka.actor.ActorPath; +import akka.actor.ActorPaths; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.cluster.client.ClusterClient; +import akka.cluster.client.ClusterClientSettings; +import akka.pattern.Patterns; +import akka.util.Timeout; +import eu.hbp.mip.woken.messages.query.Query; +import eu.hbp.mip.woken.messages.query.QueryResult; +import org.apache.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import javax.annotation.PostConstruct; +import java.util.Collections; +import java.util.Set; +import java.util.function.Function; + +/** + * Base class for controllers using Woken services + */ +public abstract class WokenClientController { + + private final Logger LOGGER = Logger.getLogger(this.getClass()); + + @Autowired + private ActorSystem actorSystem; + + @Autowired + private String wokenReceptionistPath; + + @Value("#{'${akka.woken.path:/user/entrypoint}'}") + private String wokenPath; + + private ActorRef wokenClient; + + @PostConstruct + public void initClusterClient() { + wokenClient = actorSystem.actorOf(ClusterClient.props( + ClusterClientSettings.create(actorSystem).withInitialContacts(initialContacts())), + "client-" + getClass().getSimpleName()); + } + + private Set<ActorPath> initialContacts () { + return Collections.singleton(ActorPaths.fromString(wokenReceptionistPath)); + } + + protected <A, B> ResponseEntity askWoken(A message, int waitInSeconds, Function<B, ResponseEntity> handleResponse) { + LOGGER.info("Akka is trying to reach remote " + wokenPath); + + ClusterClient.Send queryMessage = new ClusterClient.Send(wokenPath, message, true); + Timeout timeout = new Timeout(Duration.create(waitInSeconds, "seconds")); + + Future<Object> future = Patterns.ask(wokenClient, queryMessage, timeout); + + B result; + try { + result = (B) Await.result(future, timeout.duration()); + } catch (Exception e) { + LOGGER.error("Cannot receive algorithm result from woken: " + e.getMessage(), e); + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); + } + + return handleResponse.apply(result); + } + + + protected <A extends Query> ResponseEntity askWokenQuery(A query, int waitInSeconds, Function<QueryResult, ResponseEntity> handleResponse) { + LOGGER.info("Akka is trying to reach remote " + wokenPath); + + ClusterClient.Send queryMessage = new ClusterClient.Send(wokenPath, query, true); + Timeout timeout = new Timeout(Duration.create(waitInSeconds, "seconds")); + + Future<Object> future = Patterns.ask(wokenClient, queryMessage, timeout); + + QueryResult result; + try { + result = (QueryResult) Await.result(future, timeout.duration()); + } catch (Exception e) { + LOGGER.error("Cannot receive algorithm result from woken: " + e.getMessage(), e); + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); + } + + return handleResponse.apply(result); + } + + protected <A extends Query> void sendWokenQuery(A query, ActorRef responseReceiver) { + LOGGER.info("Akka is trying to reach remote " + wokenPath); + + ClusterClient.Send queryMessage = new ClusterClient.Send(wokenPath, query, true); + + wokenClient.tell(queryMessage, responseReceiver); + } + + protected ActorRef createActor(String actorBeanName, String actorName) { + return actorSystem.actorOf(SpringExtension.SPRING_EXTENSION_PROVIDER.get(actorSystem) + .props(actorBeanName), actorName); + } +} diff --git a/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java b/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java index 907cda3bd0393b47d8d08898073a0eecc72be08b..a99b5bfc1c90ee531e98d36fdda59d97a9550453 100644 --- a/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java +++ b/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java @@ -32,14 +32,15 @@ class AkkaConfiguration { @Bean public ActorSystem actorSystem() { - ActorSystem system = ActorSystem.create("AkkaActorSystem"); + ActorSystem system = ActorSystem.create("PortalBackend"); SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext); return system; } @Bean - public String wokenRefPath() { - return "akka.tcp://woken@"+wokenHost+":"+wokenPort+wokenPath; + public String wokenReceptionistPath() { + return "akka.tcp://woken@" + wokenHost + ":" + wokenPort + "/system/receptionist"; } + } diff --git a/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java b/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java index 3f4c865ab47b92c3936531058e03588adadfdf71..fada1e933372e4d02d3f93fadef94bdfed1dfacc 100644 --- a/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java +++ b/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java @@ -1,16 +1,12 @@ package eu.hbp.mip.controllers; import akka.actor.ActorRef; -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; -import akka.pattern.Patterns; -import akka.util.Timeout; import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import eu.hbp.mip.akka.SpringExtension; +import eu.hbp.mip.akka.WokenClientController; import eu.hbp.mip.configuration.SecurityConfiguration; import eu.hbp.mip.model.Experiment; import eu.hbp.mip.model.ExperimentQuery; @@ -31,15 +27,11 @@ import org.springframework.cache.annotation.Cacheable; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.net.MalformedURLException; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -53,7 +45,7 @@ import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; @RestController @RequestMapping(value = "/experiments", produces = {APPLICATION_JSON_VALUE}) @Api(value = "/experiments", description = "the experiments API") -public class ExperimentApi { +public class ExperimentApi extends WokenClientController { private static final Logger LOGGER = Logger.getLogger(ExperimentApi.class); @@ -81,12 +73,6 @@ public class ExperimentApi { @Autowired private ExperimentRepository experimentRepository; - @Autowired - private ActorSystem actorSystem; - - @Autowired - private String wokenRefPath; - @ApiOperation(value = "Send a request to the workflow to run an experiment", response = Experiment.class) @RequestMapping(method = RequestMethod.POST) @@ -106,17 +92,13 @@ public class ExperimentApi { LOGGER.info("Experiment saved"); - try { - if(isExaremeAlgo(expQuery)) - { - String algoCode = expQuery.getAlgorithms().get(0).getCode(); - sendExaremeExperiment(experiment, algoCode); - } - else - { - sendExperiment(experiment); - } - } catch (MalformedURLException mue) { LOGGER.trace(mue.getMessage()); } // ignore + if (isExaremeAlgo(expQuery)) { + String algoCode = expQuery.getAlgorithms().get(0).getCode(); + sendExaremeExperiment(experiment, algoCode); + } + else { + sendExperiment(experiment); + } return new ResponseEntity<>(gsonOnlyExposed.toJson(experiment.jsonify()), HttpStatus.OK); } @@ -219,30 +201,20 @@ public class ExperimentApi { public ResponseEntity listAvailableMethodsAndValidations() throws IOException { LOGGER.info("List available methods and validations"); - LOGGER.info("Akka is trying to reach remote " + wokenRefPath); - ActorSelection wokenActor = actorSystem.actorSelection(wokenRefPath); + return askWoken(MethodsQuery$.MODULE$, 5, r -> { + MethodsResponse result = (MethodsResponse) r; - Timeout timeout = new Timeout(Duration.create(5, "seconds")); - Future<Object> future = Patterns.ask(wokenActor, MethodsQuery$.MODULE$, timeout); - MethodsResponse result; - try { - result = (MethodsResponse) Await.result(future, timeout.duration()); - } catch (Exception e) { - LOGGER.error("Cannot receive methods list from woken !"); - LOGGER.trace(e.getMessage()); - return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); - } - - // >> Temporary : should return result.methods() in the future - JsonObject catalog = new JsonParser().parse(result.methods()).getAsJsonObject(); - InputStream is = ExperimentApi.class.getClassLoader().getResourceAsStream(EXAREME_ALGO_JSON_FILE); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); - JsonObject exaremeAlgo = new JsonParser().parse(br).getAsJsonObject(); - catalog.get("algorithms").getAsJsonArray().add(exaremeAlgo); - // << Temporary + // >> Temporary : should return result.methods() in the future + JsonObject catalog = new JsonParser().parse(result.methods()).getAsJsonObject(); + InputStream is = ExperimentApi.class.getClassLoader().getResourceAsStream(EXAREME_ALGO_JSON_FILE); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + JsonObject exaremeAlgo = new JsonParser().parse(br).getAsJsonObject(); + catalog.get("algorithms").getAsJsonArray().add(exaremeAlgo); + // << Temporary - return ResponseEntity.ok(gson.toJson(catalog)); + return ResponseEntity.ok(gson.toJson(catalog)); + }); } private ResponseEntity<String> doListExperiments( @@ -302,17 +274,14 @@ public class ExperimentApi { return new ResponseEntity<>(gsonOnlyExposed.toJson(experiment.jsonify()), HttpStatus.OK); } - private void sendExperiment(Experiment experiment) throws MalformedURLException { + private void sendExperiment(Experiment experiment) { User user = securityConfiguration.getUser(); // this runs in the background. For future optimization: use a thread pool final eu.hbp.mip.woken.messages.query.ExperimentQuery experimentQuery = experiment.prepareQuery(user.getUsername()); - LOGGER.info("Akka is trying to reach remote " + wokenRefPath); - ActorSelection wokenActor = actorSystem.actorSelection(wokenRefPath); - ActorRef experimentsManager = actorSystem.actorOf(SpringExtension.SPRING_EXTENSION_PROVIDER.get(actorSystem) - .props("experimentActor"), experiment.getUuid().toString()); - wokenActor.tell(experimentQuery, experimentsManager); + ActorRef experimentsManager = createActor("experimentActor", experiment.getUuid().toString()); + sendWokenQuery(experimentQuery, experimentsManager); } private void sendExaremeExperiment(Experiment experiment, String algoCode) { diff --git a/src/main/java/eu/hbp/mip/controllers/MiningApi.java b/src/main/java/eu/hbp/mip/controllers/MiningApi.java index da2bee0478d467e395198afab130aee52c78c20c..7922692ded822b2e39d481415d4e5fa0aa61c7e3 100644 --- a/src/main/java/eu/hbp/mip/controllers/MiningApi.java +++ b/src/main/java/eu/hbp/mip/controllers/MiningApi.java @@ -1,16 +1,11 @@ package eu.hbp.mip.controllers; -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; -import akka.pattern.Patterns; -import akka.util.Timeout; import com.google.gson.Gson; +import eu.hbp.mip.akka.WokenClientController; import eu.hbp.mip.model.Mining; -import eu.hbp.mip.woken.messages.query.QueryResult; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.apache.log4j.Logger; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.Cacheable; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -18,9 +13,6 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import java.io.IOException; import java.sql.Date; @@ -33,51 +25,34 @@ import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; @RestController @RequestMapping(value = "/mining", produces = {APPLICATION_JSON_VALUE}) @Api(value = "/mining", description = "the mining API") -public class MiningApi { +public class MiningApi extends WokenClientController { private static final Logger LOGGER = Logger.getLogger(MiningApi.class); private static final Gson gson = new Gson(); - @Autowired - public ActorSystem actorSystem; - - @Autowired - public String wokenRefPath; - - @ApiOperation(value = "Run an algorithm", response = String.class) @Cacheable(value = "mining", condition = "#query.getAlgorithm().getCode() == 'histograms'", key = "#query.toString()", unless = "#result.getStatusCode().value()!=200") @RequestMapping(method = RequestMethod.POST) public ResponseEntity runAlgorithm(@RequestBody eu.hbp.mip.model.MiningQuery query) throws IOException { LOGGER.info("Run an algorithm"); - LOGGER.info("Akka is trying to reach remote " + wokenRefPath); - ActorSelection wokenActor = actorSystem.actorSelection(wokenRefPath); - - Timeout timeout = new Timeout(Duration.create(120, "seconds")); - Future<Object> future = Patterns.ask(wokenActor, query.prepareQuery(), timeout); - QueryResult result; - try { - result = (QueryResult) Await.result(future, timeout.duration()); - } catch (Exception e) { - LOGGER.error("Cannot receive algorithm result from woken: " + e.getMessage(), e); - return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); - } - - if (result.error().nonEmpty()) { - LOGGER.error(result.error().get()); - return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("{\"error\":\"" + result.error().get() + "\"}"); - } else { - Mining mining = new Mining( - result.jobId(), - result.node(), - result.algorithm(), - result.shape(), - Date.from(result.timestamp().toInstant()), - result.data().get().compactPrint() - ); - return ResponseEntity.ok(gson.toJson(mining.jsonify())); - } + return askWokenQuery(query.prepareQuery(), 120, + result -> { + if (result.error().nonEmpty()) { + LOGGER.error(result.error().get()); + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("{\"error\":\"" + result.error().get() + "\"}"); + } else { + Mining mining = new Mining( + result.jobId(), + result.node(), + result.algorithm(), + result.shape(), + Date.from(result.timestamp().toInstant()), + result.data().get().compactPrint() + ); + return ResponseEntity.ok(gson.toJson(mining.jsonify())); + } + }); } } diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index eaee946a9b49667f3083c13915c5cd040ec73b0a..62cb042c504dbc633cc7ca4f01ab053d5d6ed308 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -1,31 +1,72 @@ akka { - actor { - provider = "akka.remote.RemoteActorRefProvider" - } loglevel = INFO - log-config-on-start = on - debug { - autoreceive = on - lifecycle = on - unhandled = on - fsm = on - event-stream = on + + actor { + provider = "cluster" + + debug { + receive = on + autoreceive = on + lifecycle = on + fsm = on + } + + serializers { + query-result-serializer = "eu.hbp.mip.woken.messages.query.QueryResultSerializer" + } + + serialization-bindings { + "eu.hbp.mip.woken.messages.query.QueryResult" = query-result-serializer + } + enable-additional-serialization-bindings = off + allow-java-serialization = on + warn-about-java-serializer-usage = on + } + remote { - maximum-payload-bytes = 10000000 bytes log-sent-messages = on log-received-messages = on - enabled-transports = ["akka.remote.netty.tcp"] + log-remote-lifecycle-events = on + maximum-payload-bytes = 10000000 bytes + netty.tcp { message-frame-size = 10000000b send-buffer-size = 10000000b receive-buffer-size = 10000000b maximum-frame-size = 10000000b - hostname = 127.0.0.1 # external (logical) hostname - port = 4089 # external (logical) port - bind-hostname = 0.0.0.0 # internal (bind) hostname - bind-port = 4089 # internal (bind) port + hostname = ${clustering.ip} # external (logical) hostname + port = ${clustering.port} # external (logical) port + + bind.hostname = 0.0.0.0 # internal (bind) hostname + bind.port = ${clustering.port} # internal (bind) port + } + } + + cluster { + roles = ["test"] + + client { + initial-contacts = ["akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port}"/system/receptionist"] } } + + extensions += "akka.cluster.pubsub.DistributedPubSub" + extensions += "akka.cluster.client.ClusterClientReceptionist" + +} + +clustering { + ip = "127.0.0.1" + ip = ${?CLUSTER_IP} + port = 4089 + port = ${?CLUSTER_PORT} + seed-ip = "127.0.0.1" + seed-ip = ${?CLUSTER_IP} + seed-ip = ${?WOKEN_PORT_8088_TCP_ADDR} + seed-port = 8088 + seed-port = ${?WOKEN_PORT_8088_TCP_PORT} + cluster.name = "woken" + cluster.name = ${?CLUSTER_NAME} }