Skip to content
Snippets Groups Projects
Commit 11512afa authored by Manuel Spuhler's avatar Manuel Spuhler
Browse files

init standalone mode - wip

parent 009b96b5
No related branches found
No related tags found
No related merge requests found
Showing
with 43 additions and 560 deletions
......@@ -17,7 +17,6 @@ import org.springframework.stereotype.Component;
@Component
@DependsOn("wokenCluster")
public class StartupTasks implements ApplicationListener<ApplicationReadyEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(StartupTasks.class);
......@@ -43,32 +42,32 @@ public class StartupTasks implements ApplicationListener<ApplicationReadyEvent>
// Pre-fill the local variable repository with the list of datasets, interpreted here as variables
// (a bit like a categorical variable can be split into a set of variables (a.k.a one hot encoding in Data science) )
// Try 5 times, to be more robust in the face of cluster failures / slow startup
LOGGER.info("Prefill variable repository with datasets...");
for (int i = 0; i < 5; i++) {
try {
StringBuilder fetchedDatasets = new StringBuilder();
for (Dataset dataset : datasetsApi.fetchDatasets()) {
final String code = dataset.id().code();
fetchedDatasets.append(code).append(' ');
Variable v = variableRepository.findOne(code);
if (v == null) {
LOGGER.info("Store additional variable {}", code);
v = new Variable(code);
variableRepository.save(v);
}
}
LOGGER.info("Datasets fetched from Woken: " + fetchedDatasets.toString());
variablesRepositoryOk = true;
break;
} catch (Exception e) {
variablesRepositoryOk = false;
LOGGER.error("Cannot initialise the variable repository. Is the connection to Woken working?", e);
}
}
// LOGGER.info("Prefill variable repository with datasets...");
// for (int i = 0; i < 5; i++) {
// try {
// StringBuilder fetchedDatasets = new StringBuilder();
// for (Dataset dataset : datasetsApi.fetchDatasets()) {
// final String code = dataset.id().code();
// fetchedDatasets.append(code).append(' ');
// Variable v = variableRepository.findOne(code);
// if (v == null) {
// LOGGER.info("Store additional variable {}", code);
// v = new Variable(code);
// variableRepository.save(v);
// }
// }
// LOGGER.info("Datasets fetched from Woken: " + fetchedDatasets.toString());
// variablesRepositoryOk = true;
// break;
// } catch (Exception e) {
// variablesRepositoryOk = false;
// LOGGER.error("Cannot initialise the variable repository. Is the connection to Woken working?", e);
// }
// }
if (!variablesRepositoryOk) {
System.exit(1);
}
// if (!variablesRepositoryOk) {
// System.exit(1);
// }
/*
for (String variableJson: variablesApi.loadVariables()) {
......
package eu.hbp.mip.akka;
import akka.cluster.Cluster;
import ch.chuv.lren.woken.messages.Ping;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.stereotype.Component;
import scala.Option;
@Component
public class AkkaClusterHealthCheck extends WokenClientController implements HealthIndicator {
@Autowired
private Cluster wokenCluster;
@Override
public Health health() {
if (wokenCluster.state().getLeader() == null) {
return Health.down().withDetail("Error", "No leader in the cluster").build();
} else if (!wokenCluster.state().allRoles().contains("woken")) {
return Health.down().withDetail("Error", "Woken server cannot be seen in the cluster").build();
}
try {
askWoken(new Ping(Option.apply("woken")), 5);
return Health.up().build();
} catch (Exception e) {
return Health.down().withDetail("Error", "Cannot reach Woken: " + e.toString()).build();
}
}
}
package eu.hbp.mip.akka;
import akka.actor.AbstractActor;
import ch.chuv.lren.woken.messages.query.QueryResult;
import eu.hbp.mip.model.Experiment;
import eu.hbp.mip.repositories.ExperimentRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.UUID;
/**
* Created by mirco on 30.11.16.
*/
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ExperimentActor extends AbstractActor {
@Autowired
private ExperimentRepository experimentRepository;
private static final Logger LOGGER = LoggerFactory.getLogger(ExperimentActor.class);
public Receive createReceive() {
return receiveBuilder()
.match(QueryResult.class, this::handleQueryResult)
.matchAny(o -> LOGGER.info("received unknown message " + o))
.build();
}
private void handleQueryResult(QueryResult queryResult) {
LOGGER.info("ActorExperiment - onReceive method has been called");
UUID uuid = UUID.fromString(this.getSelf().path().name());
LOGGER.info("\n\nExperimentActor received response from woken for UUID: \n" + uuid.toString());
Experiment experiment = experimentRepository.findOne(uuid);
if (experiment == null) {
LOGGER.error("Experiment with UUID=" + uuid + " not found in DB");
getContext().stop(getSelf());
} else {
if (queryResult.error().nonEmpty()) {
experiment.setHasServerError(true);
experiment.setResult(queryResult.error().get());
} else {
// TODO: use new WokenConversions().toJson(queryResult)
experiment.setResult(queryResult.data().get().compactPrint());
}
experiment.setFinished(Date.from(queryResult.timestamp().toInstant()));
experimentRepository.save(experiment);
LOGGER.info("Experiment " + uuid + " updated (finished)");
getContext().stop(getSelf());
}
}
}
package eu.hbp.mip.akka;
import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
import org.springframework.context.ApplicationContext;
public class SpringActorProducer implements IndirectActorProducer {
private final ApplicationContext applicationContext;
private final String actorBeanName;
public SpringActorProducer(ApplicationContext applicationContext,
String actorBeanName) {
this.applicationContext = applicationContext;
this.actorBeanName = actorBeanName;
}
@Override
public Actor produce() {
return (Actor) applicationContext.getBean(actorBeanName);
}
@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
}
}
package eu.hbp.mip.akka;
import akka.actor.AbstractExtensionId;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;
import akka.actor.Props;
import org.springframework.context.ApplicationContext;
/*
* Created by mirco on 24.10.16.
*/
public class SpringExtension
extends AbstractExtensionId<SpringExtension.SpringExt> {
public static final SpringExtension SPRING_EXTENSION_PROVIDER
= new SpringExtension();
private SpringExtension()
{
/* Private constructor for singleton */
}
@Override
public SpringExt createExtension(ExtendedActorSystem system) {
return new SpringExt();
}
public static class SpringExt implements Extension {
private volatile ApplicationContext applicationContext;
public void initialize(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
public Props props(String actorBeanName) {
return Props.create(
SpringActorProducer.class, applicationContext, actorBeanName);
}
}
}
package eu.hbp.mip.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.pattern.Patterns;
import akka.util.Timeout;
import ch.chuv.lren.woken.messages.RemoteMessage;
import ch.chuv.lren.woken.messages.query.Query;
import ch.chuv.lren.woken.messages.query.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.function.Function;
/**
* Base class for controllers using Woken services
*/
public abstract class WokenClientController {
protected final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
@Autowired
private ActorSystem actorSystem;
@Value("#{'${akka.woken.path:/user/entrypoint}'}")
private String wokenPath;
@SuppressWarnings("unchecked")
protected <A extends RemoteMessage, B> B askWoken(A message, int waitInSeconds) throws Exception {
LOGGER.info("Trying to reach remote Akka actor " + wokenPath + "...");
DistributedPubSubMediator.Send queryMessage = new DistributedPubSubMediator.Send(wokenPath, message, false);
Timeout timeout = new Timeout(Duration.create(waitInSeconds, "seconds"));
Future<Object> future = Patterns.ask(wokenMediator(), queryMessage, timeout);
B result = (B) Await.result(future, timeout.duration());
LOGGER.info("Akka actor returned a result for message of class " + message.getClass());
return result;
}
protected <A extends RemoteMessage, B> ResponseEntity requestWoken(A message, int waitInSeconds, Function<B, ResponseEntity> handleResponse) {
try {
B result = askWoken(message, waitInSeconds);
return handleResponse.apply(result);
} catch (Exception e) {
final String msg = "Cannot receive result from woken: " + e.getMessage();
LOGGER.error(msg, e);
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(msg);
}
}
protected <A extends Query> ResponseEntity askWokenQuery(A query, int waitInSeconds, Function<QueryResult, ResponseEntity> handleResponse) {
try {
QueryResult result = askWoken(query, waitInSeconds);
return handleResponse.apply(result);
} catch (Exception e) {
final String msg = "Cannot receive algorithm result from woken: " + e.getMessage();
LOGGER.error(msg, e);
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(msg);
}
}
protected <A extends Query> Future<Object> sendWokenQuery(A query, int waitInSeconds) {
LOGGER.info("Akka is trying to reach remote " + wokenPath);
DistributedPubSubMediator.Send queryMessage = new DistributedPubSubMediator.Send(wokenPath, query, false);
return Patterns.ask(wokenMediator(), queryMessage, waitInSeconds);
}
protected ExecutionContext getExecutor() {
return actorSystem.dispatcher();
}
private ActorRef wokenMediator() {
return DistributedPubSub.get(actorSystem).mediator();
}
}
package eu.hbp.mip.configuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.cluster.Cluster;
import akka.cluster.Member;
import akka.cluster.pubsub.DistributedPubSub;
import ch.chuv.lren.woken.utils.ConfigurationLoader;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.*;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import static eu.hbp.mip.akka.SpringExtension.SPRING_EXTENSION_PROVIDER;
/**
* Created by mirco on 24.10.16.
*/
@Configuration
@ComponentScan
public class AkkaConfiguration {
protected final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
@Autowired
private ApplicationContext applicationContext;
private final Config config;
{
Config appConfig = ConfigFactory.parseResourcesAnySyntax("application.conf")
.withFallback(ConfigFactory.parseResourcesAnySyntax("kamon.conf"));
config = ConfigurationLoader.appendClusterConfiguration(appConfig).resolve();
}
@Bean
public ExtendedActorSystem actorSystem() {
LOGGER.info("Step 1/3: Starting actor system...");
LOGGER.info("Create actor system at " + wokenClusterHost() + ":" + wokenClusterPort());
ExtendedActorSystem system = (ExtendedActorSystem) ActorSystem.create(wokenClusterName(), config);
SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
return system;
}
@Bean
@DependsOn("actorSystem")
public Cluster wokenCluster() {
Cluster cluster = Cluster.get(actorSystem());
LOGGER.info("Connect to Woken cluster nodes at " + String.join(",", wokenPath()));
Semaphore semaphore = new Semaphore(1);
cluster.registerOnMemberUp( () -> {
LOGGER.info("Step 2/3: Cluster up, registering the actors...");
// Do not call wokenMediator() here to avoid recursive loops
ActorRef mediator = DistributedPubSub.get(actorSystem()).mediator();
LOGGER.info("Woken Mediator available at " + mediator.path().toStringWithoutAddress());
semaphore.release();
});
for (Member member: cluster.state().getMembers()) {
LOGGER.info("Member " + StringUtils.join(member.getRoles(), ",") + " at " + member.address().toString() + " is in the cluster");
}
if (cluster.state().members().size() < 2) {
LOGGER.info("Waiting for Woken cluster connection...");
try {
semaphore.tryAcquire(5, TimeUnit.MINUTES);
Thread.sleep(5000);
} catch (InterruptedException e) {
LOGGER.warn("Cannot wait for Akka cluster start", e);
}
}
final Runnable leaveCluster = () -> cluster.leave(cluster.selfAddress());
actorSystem().registerOnTermination(leaveCluster);
cluster.registerOnMemberRemoved( () -> {
LOGGER.info("Exiting...");
cluster.leave(cluster.selfAddress());
actorSystem().registerOnTermination(() -> System.exit(0));
actorSystem().terminate();
});
Runtime.getRuntime().addShutdownHook(new Thread(leaveCluster)
);
LOGGER.info("Step 3/3: Cluster connected to Woken.");
return cluster;
}
@Bean
public String wokenClusterHost() {
return config.getString("clustering.ip");
}
@Bean
public String wokenClusterName() {
return config.getString("clustering.cluster.name");
}
@Bean
public Integer wokenClusterPort() {
return config.getInt("clustering.port");
}
@Bean
public List<String> wokenPath() {
return config.getStringList("akka.cluster.seed-nodes");
}
}
......@@ -20,7 +20,6 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
@EnableSpringDataWebSupport
@DependsOn("wokenCluster")
public class WebConfiguration {
@Bean
......
......@@ -4,11 +4,6 @@
package eu.hbp.mip.controllers;
import ch.chuv.lren.woken.messages.datasets.Dataset;
import ch.chuv.lren.woken.messages.datasets.DatasetsQuery;
import ch.chuv.lren.woken.messages.datasets.DatasetsResponse;
import eu.hbp.mip.akka.WokenClientController;
import eu.hbp.mip.model.DatasetDescription;
import io.swagger.annotations.*;
import org.slf4j.Logger;
......@@ -30,40 +25,9 @@ import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
@RestController
@RequestMapping(value = "/datasets", produces = {APPLICATION_JSON_VALUE})
@Api(value = "/datasets", description = "the datasets API")
public class DatasetsApi extends WokenClientController {
public class DatasetsApi {
private static final Logger LOGGER = LoggerFactory.getLogger(DatasetsApi.class);
@ApiOperation(value = "Get dataset list", response = Dataset.class, responseContainer = "List")
@RequestMapping(method = RequestMethod.GET)
@Cacheable(value = "datasets")
public ResponseEntity getDatasets(
) {
LOGGER.info("Get list of datasets");
try {
List<DatasetDescription> datasets = new ArrayList<>();
for (Dataset d: fetchDatasets()) {
DatasetDescription dataset = new DatasetDescription();
LOGGER.info("Dataset {}", d);
dataset.setCode(d.id().code());
dataset.setLabel(d.label());
dataset.setDescription(d.description());
dataset.setAnonymisationLevel(d.anonymisationLevel().toString());
datasets.add(dataset);
}
return ResponseEntity.ok(datasets);
} catch (Exception e) {
final String msg = "Cannot receive datasets from woken: " + e.getMessage();
LOGGER.error(msg, e);
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(msg);
}
}
public List<ch.chuv.lren.woken.messages.datasets.Dataset> fetchDatasets() throws Exception {
DatasetsResponse result = askWoken(new DatasetsQuery(Option.empty()), 30);
return new ArrayList<>(scala.collection.JavaConversions.asJavaCollection(result.datasets()));
}
}
......@@ -6,7 +6,7 @@ import com.google.gson.*;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import eu.hbp.mip.akka.WokenClientController;
import eu.hbp.mip.model.*;
import eu.hbp.mip.repositories.ExperimentRepository;
import eu.hbp.mip.repositories.ModelRepository;
......@@ -35,7 +35,7 @@ import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
@RestController
@RequestMapping(value = "/experiments", produces = { APPLICATION_JSON_VALUE })
@Api(value = "/experiments", description = "the experiments API")
public class ExperimentApi extends WokenClientController {
public class ExperimentApi {
private static final Logger LOGGER = LoggerFactory.getLogger(ExperimentApi.class);
......@@ -62,16 +62,7 @@ public class ExperimentApi extends WokenClientController {
@Autowired
private ExperimentRepository experimentRepository;
@ApiOperation(value = "Send a request to the workflow to run an experiment", response = Experiment.class)
@RequestMapping(method = RequestMethod.POST)
public ResponseEntity<String> runExperiment(@RequestBody ExperimentQuery expQuery) {
LOGGER.info("Run an experiment");
Experiment experiment = saveExperiment(expQuery);
sendExperiment(experiment);
return new ResponseEntity<>(gsonOnlyExposed.toJson(experiment.jsonify()), HttpStatus.OK);
}
@ApiOperation(value = "Create an experiment on Exareme", response = Experiment.class)
@RequestMapping(value = "/exareme", method = RequestMethod.POST)
......@@ -367,38 +358,6 @@ public class ExperimentApi extends WokenClientController {
return new ResponseEntity<>(gsonOnlyExposed.toJson(experiment.jsonify()), HttpStatus.OK);
}
private void sendExperiment(final Experiment experiment) {
User user = userInfo.getUser();
// this runs in the background. For future optimization: use a thread pool
final ch.chuv.lren.woken.messages.query.ExperimentQuery experimentQuery = experiment
.prepareQuery(user.getUsername());
final ExecutionContext ec = getExecutor();
Future<Object> response = sendWokenQuery(experimentQuery, 24 * 3600);
response.onSuccess(new OnSuccess<Object>() {
public void onSuccess(Object result) {
QueryResult queryResult = (QueryResult) result;
UUID uuid = experiment.getUuid();
LOGGER.info("\n\nExperimentActor received response from woken for UUID: \n" + uuid.toString());
Experiment experiment = experimentRepository.findOne(uuid);
if (experiment == null) {
LOGGER.error("Experiment with UUID=" + uuid + " not found in DB");
} else {
if (queryResult.error().nonEmpty()) {
experiment.setHasServerError(true);
LOGGER.error("Experiment failed with message: " + queryResult.error().get());
}
// TODO: use new WokenConversions().toJson(queryResult)
experiment.setResult(queryResult.data().get().compactPrint());
experiment.setFinished(Date.from(queryResult.timestamp().toInstant()));
experimentRepository.save(experiment);
LOGGER.info("Experiment " + uuid + " updated (finished)");
}
}
}, ec);
}
private void finishExperiment(Experiment experiment) {
experiment.setFinished(new Date());
experimentRepository.save(experiment);
......
package eu.hbp.mip.controllers;
import com.google.gson.*;
import eu.hbp.mip.akka.WokenClientController;
import ch.chuv.lren.woken.messages.query.MethodsQuery$;
import ch.chuv.lren.woken.messages.query.MethodsResponse;
import io.swagger.annotations.Api;
......@@ -21,7 +21,7 @@ import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
@RestController
@RequestMapping(value = "/methods", produces = { APPLICATION_JSON_VALUE })
@Api(value = "/methods", description = "the methods API")
public class MethodsApi extends WokenClientController {
public class MethodsApi {
private static final Logger LOGGER = LoggerFactory.getLogger(MethodsApi.class);
......@@ -36,20 +36,6 @@ public class MethodsApi extends WokenClientController {
@Value("#{'${services.workflows.workflowAuthorization}'}")
private String workflowAuthorization;
@ApiOperation(value = "List available methods and validations", response = String.class)
@Cacheable(value = "methods", unless = "#result.getStatusCode().value()!=200")
@RequestMapping(method = RequestMethod.GET)
public ResponseEntity listAvailableMethodsAndValidations() {
LOGGER.info("List available methods and validations");
return requestWoken(MethodsQuery$.MODULE$, 10, r -> {
MethodsResponse result = (MethodsResponse) r;
JsonObject catalog = new JsonParser().parse(result.methods().compactPrint()).getAsJsonObject();
return ResponseEntity.ok(gson.toJson(catalog));
});
}
@ApiOperation(value = "List Exareme algorithms and validations", response = String.class)
@Cacheable(value = "exareme", unless = "#result.getStatusCode().value()!=200")
@RequestMapping(value = "/exareme", method = RequestMethod.GET)
......
......@@ -3,7 +3,7 @@ package eu.hbp.mip.controllers;
import eu.hbp.mip.utils.HTTPUtil;
import com.google.gson.Gson;
import eu.hbp.mip.akka.WokenClientController;
import eu.hbp.mip.model.Mining;
import eu.hbp.mip.model.User;
import eu.hbp.mip.model.UserInfo;
......@@ -34,7 +34,7 @@ import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
@RestController
@RequestMapping(value = "/mining", produces = { APPLICATION_JSON_VALUE })
@Api(value = "/mining", description = "the mining API")
public class MiningApi extends WokenClientController {
public class MiningApi {
private static final Logger LOGGER = LoggerFactory.getLogger(MiningApi.class);
private static final Gson gson = new Gson();
......@@ -45,27 +45,6 @@ public class MiningApi extends WokenClientController {
@Value("#{'${services.exareme.miningExaremeUrl:http://localhost:9090/mining/query}'}")
public String miningExaremeQueryUrl;
@ApiOperation(value = "Run an algorithm", response = String.class)
@Cacheable(value = "mining", condition = "#query != null and (#query.getAlgorithm().getCode() == 'histograms' or #query.getAlgorithm().getCode() == 'histograms')", key = "#query.toString()", unless = "#result.getStatusCode().value()!=200")
@RequestMapping(method = RequestMethod.POST)
public ResponseEntity runAlgorithm(@RequestBody eu.hbp.mip.model.MiningQuery query) {
LOGGER.info("Run an algorithm");
User user = userInfo.getUser();
return askWokenQuery(query.prepareQuery(user.getUsername()), 120, result -> {
if (result.error().nonEmpty()) {
LOGGER.error(result.error().get());
return ResponseEntity.status(HttpStatus.BAD_REQUEST)
.body("{\"error\":\"" + result.error().get() + "\"}");
} else {
Mining mining = new Mining(result.jobId(), result.node(), unwrap(result.algorithm()),
result.type().mime(), Date.from(result.timestamp().toInstant()),
result.data().get().compactPrint());
return ResponseEntity.ok(gson.toJson(mining.jsonify()));
}
});
}
@ApiOperation(value = "Create an histogram on Exareme", response = String.class)
@RequestMapping(value = "/exareme", method = RequestMethod.POST)
public ResponseEntity runExaremeMining(@RequestBody List<HashMap<String, String>> queryList) {
......
......@@ -13,7 +13,7 @@ import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import eu.hbp.mip.akka.WokenClientController;
import eu.hbp.mip.model.Algorithm;
import eu.hbp.mip.model.MiningQuery;
import eu.hbp.mip.model.Variable;
......@@ -43,7 +43,7 @@ import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
@RestController
@RequestMapping(value = "/variables", produces = {APPLICATION_JSON_VALUE})
@Api(value = "/variables", description = "the variables API")
public class VariablesApi extends WokenClientController {
public class VariablesApi {
private static final Logger LOGGER = LoggerFactory.getLogger(VariablesApi.class);
......@@ -79,36 +79,7 @@ public class VariablesApi extends WokenClientController {
return ResponseEntity.ok(variablesObjects);
}
@ApiOperation(value = "Get variables available for a dataset", response = List.class, responseContainer = "List")
@Cacheable(value = "availableVariables",
unless = "#result.getStatusCode().value()!=200")
@RequestMapping(value = "/availableVariables", method = RequestMethod.GET)
public ResponseEntity getAvailableVariables(
@ApiParam(value = "List of datasets : ds1,ds2,...") @RequestParam(value = "datasets") String datasets) {
LOGGER.info("Get available variables for datasets " + datasets);
List<Variable> dsAsVariables = Arrays.stream(datasets.split(",")).map(Variable::new).collect(Collectors.toList());
WokenConversions conv = new WokenConversions();
VariablesForDatasetsQuery query = new VariablesForDatasetsQuery(conv.toDatasets(dsAsVariables), true);
return requestWoken(query, 30, r -> {
VariablesForDatasetsResponse response = (VariablesForDatasetsResponse) r;
if (response.error().isDefined()) {
return ResponseEntity.status(HttpStatus.NOT_FOUND).body(response.error().get());
} else {
List<Variable> variables = new ArrayList<>();
for (VariableMetaData var: JavaConversions.setAsJavaSet(response.variables())) {
variables.add(new Variable(var.code()));
}
return ResponseEntity.ok(variables);
}
});
}
@ApiOperation(value = "Get a variable", response = Object.class)
@Cacheable("variable")
......
......@@ -10,7 +10,13 @@
<Root level="${env:LOG_LEVEL:-INFO}">
<AppenderRef ref="Console"/>
</Root>
<Logger name="akka.actor.LocalActorRefProvider(akka://woken)" level="INFO">
<Logger name="akka.actor.LocalActorRefProvider(akka://
<Logger name="io.netty" level="${env:NETTY_LOG_LEVEL:-ERROR}">
<AppenderRef ref="Console"/>
</Logger>
</Loggers>
</Configuration>
woken)" level="INFO">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="akka.cluster" level="${env:AKKA_CLUSTER_LOG_LEVEL:-INFO}">
......@@ -30,9 +36,4 @@
</Logger>
<Logger name="akka.stream.impl" level="${env:AKKA_STREAM_LOG_LEVEL:-INFO}">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="io.netty" level="${env:NETTY_LOG_LEVEL:-ERROR}">
<AppenderRef ref="Console"/>
</Logger>
</Loggers>
</Configuration>
</Logger>
\ No newline at end of file
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment