Skip to content
Snippets Groups Projects
Commit ce2ad277 authored by Mirco Nasuti's avatar Mirco Nasuti
Browse files

Trying to make it work

parent 219460f9
No related branches found
No related tags found
No related merge requests found
...@@ -53,6 +53,8 @@ ...@@ -53,6 +53,8 @@
<javax-inject.version>1</javax-inject.version> <javax-inject.version>1</javax-inject.version>
<akka-actor.version>2.4.11</akka-actor.version> <akka-actor.version>2.4.11</akka-actor.version>
<akka-remote.version>2.4.11</akka-remote.version> <akka-remote.version>2.4.11</akka-remote.version>
<spring-context.version>4.3.4.RELEASE</spring-context.version>
<protobuf-java.version>3.1.0</protobuf-java.version>
</properties> </properties>
<repositories> <repositories>
...@@ -198,6 +200,16 @@ ...@@ -198,6 +200,16 @@
<artifactId>akka-remote_2.11</artifactId> <artifactId>akka-remote_2.11</artifactId>
<version>${akka-remote.version}</version> <version>${akka-remote.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf-java.version}</version>
</dependency>
</dependencies> </dependencies>
<pluginRepositories> <pluginRepositories>
......
package eu.hbp.mip.akka; package eu.hbp.mip.akka;
import akka.actor.Props;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.event.Logging; import akka.event.Logging;
import akka.event.LoggingAdapter; import akka.event.LoggingAdapter;
import akka.japi.Creator;
import eu.hbp.mip.messages.external.QueryError; import eu.hbp.mip.messages.external.QueryError;
import eu.hbp.mip.messages.external.QueryResult; import eu.hbp.mip.messages.external.QueryResult;
import eu.hbp.mip.model.Experiment; import eu.hbp.mip.model.Experiment;
import eu.hbp.mip.repositories.ExperimentRepository; import eu.hbp.mip.repositories.ExperimentRepository;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.Date; import java.util.Date;
import java.util.UUID; import java.util.UUID;
...@@ -17,54 +18,42 @@ import java.util.UUID; ...@@ -17,54 +18,42 @@ import java.util.UUID;
/** /**
* Created by mirco on 30.11.16. * Created by mirco on 30.11.16.
*/ */
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ExperimentActor extends UntypedActor { public class ExperimentActor extends UntypedActor {
@Autowired @Autowired
private ExperimentRepository experimentRepository; private ExperimentRepository experimentRepository;
public static Props props(final UUID expUUID) {
return Props.create(new Creator<ExperimentActor>() {
private static final long serialVersionUID = 1L;
@Override
public ExperimentActor create() throws Exception {
return new ExperimentActor(expUUID);
}
});
}
LoggingAdapter log = Logging.getLogger(getContext().system(), this); LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private final UUID expUUID;
private ExperimentActor(UUID expUUID) {
this.expUUID = expUUID;
}
@Override @Override
public void onReceive(Object message) throws Throwable { public void onReceive(Object message) throws Throwable {
UUID uuid = UUID.fromString(this.getSelf().path().name());
if (message instanceof QueryResult) { if (message instanceof QueryResult) {
QueryResult queryResult = (QueryResult) message; QueryResult queryResult = (QueryResult) message;
log.info("received query result for : " + expUUID.toString()); log.info("received query result for : " + uuid.toString());
Experiment experiment = experimentRepository.findOne(expUUID); Experiment experiment = experimentRepository.findOne(uuid);
if(experiment == null) if(experiment == null)
{ {
log.error("Experiment with UUID="+expUUID+" not found in DB"); log.error("Experiment with UUID="+uuid+" not found in DB");
return; return;
} }
experiment.setResult(queryResult.data().get()); experiment.setResult(queryResult.data().get());
experiment.setFinished(new Date()); experiment.setFinished(new Date());
experimentRepository.save(experiment); experimentRepository.save(experiment);
log.info("Experiment "+ expUUID +" updated (finished)"); log.info("Experiment "+ uuid +" updated (finished)");
} }
else if (message instanceof QueryError) { else if (message instanceof QueryError) {
QueryError queryError = (QueryError) message; QueryError queryError = (QueryError) message;
log.warning("received query error"); log.warning("received query error");
Experiment experiment = experimentRepository.findOne(expUUID); Experiment experiment = experimentRepository.findOne(uuid);
if(experiment == null) if(experiment == null)
{ {
log.error("Experiment with UUID="+expUUID+" not found in DB"); log.error("Experiment with UUID="+uuid+" not found in DB");
return; return;
} }
experiment.setHasServerError(true); experiment.setHasServerError(true);
...@@ -72,7 +61,7 @@ public class ExperimentActor extends UntypedActor { ...@@ -72,7 +61,7 @@ public class ExperimentActor extends UntypedActor {
experimentRepository.save(experiment); experimentRepository.save(experiment);
experiment.setFinished(new Date()); experiment.setFinished(new Date());
experimentRepository.save(experiment); experimentRepository.save(experiment);
log.info("Experiment "+ expUUID +" updated (finished)"); log.info("Experiment "+ uuid +" updated (finished)");
} }
else { else {
......
...@@ -4,10 +4,16 @@ import akka.actor.UntypedActor; ...@@ -4,10 +4,16 @@ import akka.actor.UntypedActor;
import akka.event.Logging; import akka.event.Logging;
import akka.event.LoggingAdapter; import akka.event.LoggingAdapter;
import eu.hbp.mip.messages.external.Methods; import eu.hbp.mip.messages.external.Methods;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/** /**
* Created by mirco on 30.11.16. * Created by mirco on 30.11.16.
*/ */
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class SimpleActor extends UntypedActor { public class SimpleActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this); LoggingAdapter log = Logging.getLogger(getContext().system(), this);
......
...@@ -10,48 +10,32 @@ import org.springframework.context.ApplicationContext; ...@@ -10,48 +10,32 @@ import org.springframework.context.ApplicationContext;
* Created by mirco on 24.10.16. * Created by mirco on 24.10.16.
*/ */
/** public class SpringExtension
* An Akka Extension to provide access to Spring managed Actor Beans. extends AbstractExtensionId<SpringExtension.SpringExt> {
*/
public class SpringExtension extends public static final SpringExtension SPRING_EXTENSION_PROVIDER
AbstractExtensionId<SpringExtension.SpringExt> { = new SpringExtension();
/** private SpringExtension()
* The identifier used to access the SpringExtension. {
*/ /* Private constructor for singleton */
public static SpringExtension SpringExtProvider = new SpringExtension(); }
/**
* Is used by Akka to instantiate the Extension identified by this
* ExtensionId, internal use only.
*/
@Override @Override
public SpringExt createExtension(ExtendedActorSystem system) { public SpringExt createExtension(ExtendedActorSystem system) {
return new SpringExt(); return new SpringExt();
} }
/**
* The Extension implementation.
*/
public static class SpringExt implements Extension { public static class SpringExt implements Extension {
private volatile ApplicationContext applicationContext; private volatile ApplicationContext applicationContext;
/**
* Used to initialize the Spring application context for the extension.
* @param applicationContext
*/
public void initialize(ApplicationContext applicationContext) { public void initialize(ApplicationContext applicationContext) {
this.applicationContext = applicationContext; this.applicationContext = applicationContext;
} }
/**
* Create a Props for the specified actorBeanName using the
* SpringActorProducer class.
*
* @param actorBeanName The name of the actor bean to create Props for
* @return a Props that will create the named actor bean using Spring
*/
public Props props(String actorBeanName) { public Props props(String actorBeanName) {
return Props.create(SpringActorProducer.class, return Props.create(
applicationContext, actorBeanName); SpringActorProducer.class, applicationContext, actorBeanName);
} }
} }
} }
\ No newline at end of file
...@@ -4,15 +4,17 @@ import akka.actor.ActorSystem; ...@@ -4,15 +4,17 @@ import akka.actor.ActorSystem;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import static eu.hbp.mip.akka.SpringExtension.SpringExtProvider; import static eu.hbp.mip.akka.SpringExtension.SPRING_EXTENSION_PROVIDER;
/** /**
* Created by mirco on 24.10.16. * Created by mirco on 24.10.16.
*/ */
@Configuration @Configuration
@ComponentScan
class AkkaConfiguration { class AkkaConfiguration {
@Autowired @Autowired
...@@ -21,7 +23,7 @@ class AkkaConfiguration { ...@@ -21,7 +23,7 @@ class AkkaConfiguration {
@Bean @Bean
public ActorSystem actorSystem() { public ActorSystem actorSystem() {
ActorSystem system = ActorSystem.create("AkkaActorSystem"); ActorSystem system = ActorSystem.create("AkkaActorSystem");
SpringExtProvider.get(system).initialize(applicationContext); SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
return system; return system;
} }
......
...@@ -3,12 +3,10 @@ package eu.hbp.mip.controllers; ...@@ -3,12 +3,10 @@ package eu.hbp.mip.controllers;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import eu.hbp.mip.akka.ExperimentActor; import eu.hbp.mip.akka.SpringExtension;
import eu.hbp.mip.akka.SimpleActor;
import eu.hbp.mip.configuration.SecurityConfiguration; import eu.hbp.mip.configuration.SecurityConfiguration;
import eu.hbp.mip.messages.external.MethodsQuery; import eu.hbp.mip.messages.external.MethodsQuery;
import eu.hbp.mip.model.Experiment; import eu.hbp.mip.model.Experiment;
...@@ -76,7 +74,7 @@ public class ExperimentApi { ...@@ -76,7 +74,7 @@ public class ExperimentApi {
private ExperimentRepository experimentRepository; private ExperimentRepository experimentRepository;
@Autowired @Autowired
private ActorSystem actorSystem; public ActorSystem actorSystem;
@Value("#{'${akka.woken-path:akka.tcp://woken@127.0.0.1:8088/user/entrypoint}'}") @Value("#{'${akka.woken-path:akka.tcp://woken@127.0.0.1:8088/user/entrypoint}'}")
private String wokenPath; private String wokenPath;
...@@ -212,9 +210,11 @@ public class ExperimentApi { ...@@ -212,9 +210,11 @@ public class ExperimentApi {
public ResponseEntity listAvailableMethodsAndValidations() throws IOException { public ResponseEntity listAvailableMethodsAndValidations() throws IOException {
LOGGER.info("List available methods and validations"); LOGGER.info("List available methods and validations");
LOGGER.info("Akka is trying to reach remote " + wokenPath);
ActorSelection wokenActor = actorSystem.actorSelection(wokenPath); ActorSelection wokenActor = actorSystem.actorSelection(wokenPath);
ActorRef simpleActor = actorSystem.actorOf(Props.create(SimpleActor.class)); ActorRef methodsManager = actorSystem.actorOf(SpringExtension.SPRING_EXTENSION_PROVIDER.get(actorSystem)
wokenActor.tell(new MethodsQuery(), simpleActor); .props("simpleActor"));
wokenActor.tell(new MethodsQuery(), methodsManager);
return ResponseEntity.ok().build(); return ResponseEntity.ok().build();
} }
...@@ -280,9 +280,11 @@ public class ExperimentApi { ...@@ -280,9 +280,11 @@ public class ExperimentApi {
// this runs in the background. For future optimization: use a thread pool // this runs in the background. For future optimization: use a thread pool
final eu.hbp.mip.messages.external.ExperimentQuery experimentQuery = experiment.computeQuery(); final eu.hbp.mip.messages.external.ExperimentQuery experimentQuery = experiment.computeQuery();
LOGGER.info("Akka is trying to reach remote " + wokenPath);
ActorSelection wokenActor = actorSystem.actorSelection(wokenPath); ActorSelection wokenActor = actorSystem.actorSelection(wokenPath);
ActorRef experimentActor = actorSystem.actorOf(Props.create(ExperimentActor.class, experiment.getUuid())); ActorRef experimentsManager = actorSystem.actorOf(SpringExtension.SPRING_EXTENSION_PROVIDER.get(actorSystem)
wokenActor.tell(experimentQuery, experimentActor); .props("experimentActor"), experiment.getUuid().toString());
wokenActor.tell(experimentQuery, experimentsManager);
} }
private void sendExaremeExperiment(Experiment experiment) { private void sendExaremeExperiment(Experiment experiment) {
......
akka { akka {
loglevel = INFO loglevel = DEBUG
log-config-on-start = on
debug {
autoreceive = on
lifecycle = on
unhandled = on
fsm = on
event-stream = on
}
actor { actor {
provider = "akka.remote.RemoteActorRefProvider" provider = "akka.remote.RemoteActorRefProvider"
} }
remote { remote {
log-sent-messages = on
log-received-messages = on
enabled-transports = ["akka.remote.netty.tcp"] enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp { netty.tcp {
hostname = 127.0.0.1 # external (logical) hostname hostname = 127.0.0.1 # external (logical) hostname
port = 8088 # external (logical) port port = 8089 # external (logical) port
bind-hostname = 127.0.0.1 # internal (bind) hostname bind-hostname = 127.0.0.1 # internal (bind) hostname
bind-port = 8088 # internal (bind) port bind-port = 8088 # internal (bind) port
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment