Skip to content
Snippets Groups Projects
Commit 3a91d3a9 authored by Ludovic Claude's avatar Ludovic Claude
Browse files

Use a future to collect results of experiments

parent 1814f265
No related branches found
No related tags found
No related merge requests found
......@@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
......@@ -92,16 +93,20 @@ public abstract class WokenClientController {
return handleResponse.apply(result);
}
protected <A extends Query> void sendWokenQuery(A query, ActorRef responseReceiver) {
protected <A extends Query> Future<Object> sendWokenQuery(A query, int timeout) {
LOGGER.info("Akka is trying to reach remote " + wokenPath);
ClusterClient.Send queryMessage = new ClusterClient.Send(wokenPath, query, true);
wokenClient.tell(queryMessage, responseReceiver);
return Patterns.ask(wokenClient, queryMessage, timeout);
}
protected ActorRef createActor(String actorBeanName, String actorName) {
return actorSystem.actorOf(SpringExtension.SPRING_EXTENSION_PROVIDER.get(actorSystem)
.props(actorBeanName), actorName);
}
protected ExecutionContext getExecutor() {
return actorSystem.dispatcher();
}
}
package eu.hbp.mip.controllers;
import akka.actor.ActorRef;
import akka.dispatch.OnSuccess;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
......@@ -17,6 +17,7 @@ import eu.hbp.mip.utils.HTTPUtil;
import eu.hbp.mip.utils.JSONUtil;
import eu.hbp.mip.woken.messages.query.MethodsQuery$;
import eu.hbp.mip.woken.messages.query.MethodsResponse;
import eu.hbp.mip.woken.messages.query.QueryResult;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
......@@ -27,6 +28,8 @@ import org.springframework.cache.annotation.Cacheable;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import java.io.BufferedReader;
import java.io.IOException;
......@@ -73,7 +76,6 @@ public class ExperimentApi extends WokenClientController {
@Autowired
private ExperimentRepository experimentRepository;
@ApiOperation(value = "Send a request to the workflow to run an experiment", response = Experiment.class)
@RequestMapping(method = RequestMethod.POST)
public ResponseEntity<String> runExperiment(@RequestBody ExperimentQuery expQuery) {
......@@ -274,14 +276,35 @@ public class ExperimentApi extends WokenClientController {
return new ResponseEntity<>(gsonOnlyExposed.toJson(experiment.jsonify()), HttpStatus.OK);
}
private void sendExperiment(Experiment experiment) {
private void sendExperiment(final Experiment experiment) {
User user = securityConfiguration.getUser();
// this runs in the background. For future optimization: use a thread pool
final eu.hbp.mip.woken.messages.query.ExperimentQuery experimentQuery = experiment.prepareQuery(user.getUsername());
ActorRef experimentsManager = createActor("experimentActor", experiment.getUuid().toString());
sendWokenQuery(experimentQuery, experimentsManager);
final ExecutionContext ec = getExecutor();
Future<Object> response = sendWokenQuery(experimentQuery, 24*3600);
response.onSuccess(new OnSuccess<Object>() {
public void onSuccess(Object result) {
QueryResult queryResult = (QueryResult) result;
UUID uuid = experiment.getUuid();
LOGGER.info("\n\nExperimentActor received response from woken for UUID: \n" + uuid.toString());
Experiment experiment = experimentRepository.findOne(uuid);
if (experiment == null) {
LOGGER.error("Experiment with UUID=" + uuid + " not found in DB");
} else {
if (queryResult.error().nonEmpty()) {
experiment.setHasServerError(true);
experiment.setResult(queryResult.error().get());
} else {
experiment.setResult(queryResult.data().get().compactPrint());
}
experiment.setFinished(Date.from(queryResult.timestamp().toInstant()));
experimentRepository.save(experiment);
LOGGER.info("Experiment " + uuid + " updated (finished)");
}
}
}, ec);
}
private void sendExaremeExperiment(Experiment experiment, String algoCode) {
......@@ -308,12 +331,12 @@ public class ExperimentApi extends WokenClientController {
{
experiment.setResult("Unsupported variables !");
}
finishExpermient(experiment);
finishExperimient(experiment);
}).start();
// << Temporary
}
private void finishExpermient(Experiment experiment)
private void finishExperimient(Experiment experiment)
{
experiment.setFinished(new Date());
experimentRepository.save(experiment);
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment