diff --git a/src/main/java/eu/hbp/mip/akka/WokenClientController.java b/src/main/java/eu/hbp/mip/akka/WokenClientController.java index 0f225c860b82849d10d3aae044b2069dfd255c6a..210d35dd4bf8ba7f82524f67bc0ae72a51eb2030 100644 --- a/src/main/java/eu/hbp/mip/akka/WokenClientController.java +++ b/src/main/java/eu/hbp/mip/akka/WokenClientController.java @@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -92,16 +93,20 @@ public abstract class WokenClientController { return handleResponse.apply(result); } - protected <A extends Query> void sendWokenQuery(A query, ActorRef responseReceiver) { + protected <A extends Query> Future<Object> sendWokenQuery(A query, int timeout) { LOGGER.info("Akka is trying to reach remote " + wokenPath); ClusterClient.Send queryMessage = new ClusterClient.Send(wokenPath, query, true); - wokenClient.tell(queryMessage, responseReceiver); + return Patterns.ask(wokenClient, queryMessage, timeout); } protected ActorRef createActor(String actorBeanName, String actorName) { return actorSystem.actorOf(SpringExtension.SPRING_EXTENSION_PROVIDER.get(actorSystem) .props(actorBeanName), actorName); } + + protected ExecutionContext getExecutor() { + return actorSystem.dispatcher(); + } } diff --git a/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java b/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java index fada1e933372e4d02d3f93fadef94bdfed1dfacc..1ad91556831100547d6237b474e877dd6b7eca02 100644 --- a/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java +++ b/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java @@ -1,6 +1,6 @@ package eu.hbp.mip.controllers; -import akka.actor.ActorRef; +import akka.dispatch.OnSuccess; import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -17,6 +17,7 @@ import eu.hbp.mip.utils.HTTPUtil; import eu.hbp.mip.utils.JSONUtil; import eu.hbp.mip.woken.messages.query.MethodsQuery$; import eu.hbp.mip.woken.messages.query.MethodsResponse; +import eu.hbp.mip.woken.messages.query.QueryResult; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -27,6 +28,8 @@ import org.springframework.cache.annotation.Cacheable; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; import java.io.BufferedReader; import java.io.IOException; @@ -73,7 +76,6 @@ public class ExperimentApi extends WokenClientController { @Autowired private ExperimentRepository experimentRepository; - @ApiOperation(value = "Send a request to the workflow to run an experiment", response = Experiment.class) @RequestMapping(method = RequestMethod.POST) public ResponseEntity<String> runExperiment(@RequestBody ExperimentQuery expQuery) { @@ -274,14 +276,35 @@ public class ExperimentApi extends WokenClientController { return new ResponseEntity<>(gsonOnlyExposed.toJson(experiment.jsonify()), HttpStatus.OK); } - private void sendExperiment(Experiment experiment) { + private void sendExperiment(final Experiment experiment) { User user = securityConfiguration.getUser(); // this runs in the background. For future optimization: use a thread pool final eu.hbp.mip.woken.messages.query.ExperimentQuery experimentQuery = experiment.prepareQuery(user.getUsername()); - - ActorRef experimentsManager = createActor("experimentActor", experiment.getUuid().toString()); - sendWokenQuery(experimentQuery, experimentsManager); + final ExecutionContext ec = getExecutor(); + + Future<Object> response = sendWokenQuery(experimentQuery, 24*3600); + response.onSuccess(new OnSuccess<Object>() { + public void onSuccess(Object result) { + QueryResult queryResult = (QueryResult) result; + UUID uuid = experiment.getUuid(); + LOGGER.info("\n\nExperimentActor received response from woken for UUID: \n" + uuid.toString()); + Experiment experiment = experimentRepository.findOne(uuid); + if (experiment == null) { + LOGGER.error("Experiment with UUID=" + uuid + " not found in DB"); + } else { + if (queryResult.error().nonEmpty()) { + experiment.setHasServerError(true); + experiment.setResult(queryResult.error().get()); + } else { + experiment.setResult(queryResult.data().get().compactPrint()); + } + experiment.setFinished(Date.from(queryResult.timestamp().toInstant())); + experimentRepository.save(experiment); + LOGGER.info("Experiment " + uuid + " updated (finished)"); + } + } + }, ec); } private void sendExaremeExperiment(Experiment experiment, String algoCode) { @@ -308,12 +331,12 @@ public class ExperimentApi extends WokenClientController { { experiment.setResult("Unsupported variables !"); } - finishExpermient(experiment); + finishExperimient(experiment); }).start(); // << Temporary } - private void finishExpermient(Experiment experiment) + private void finishExperimient(Experiment experiment) { experiment.setFinished(new Date()); experimentRepository.save(experiment);