From 489434f908e6fe415e648bc6340f09281ad06296 Mon Sep 17 00:00:00 2001 From: Mirco Nasuti <mirco.nasuti@chuv.ch> Date: Tue, 29 Nov 2016 14:45:27 +0100 Subject: [PATCH] Work in progress --- pom.xml | 22 ++++++- .../java/eu/hbp/mip/akka/ListenerActor.java | 52 +++++++++++++++++ .../eu/hbp/mip/akka/ListeningService.java | 14 +++++ .../eu/hbp/mip/akka/SpringActorProducer.java | 28 +++++++++ .../java/eu/hbp/mip/akka/SpringExtension.java | 57 +++++++++++++++++++ .../mip/configuration/AkkaConfiguration.java | 32 +++++++++++ .../eu/hbp/mip/controllers/ExperimentApi.java | 21 ++++++- src/main/resources/akka/application.conf | 14 +++++ 8 files changed, 236 insertions(+), 4 deletions(-) create mode 100644 src/main/java/eu/hbp/mip/akka/ListenerActor.java create mode 100644 src/main/java/eu/hbp/mip/akka/ListeningService.java create mode 100644 src/main/java/eu/hbp/mip/akka/SpringActorProducer.java create mode 100644 src/main/java/eu/hbp/mip/akka/SpringExtension.java create mode 100644 src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java create mode 100644 src/main/resources/akka/application.conf diff --git a/pom.xml b/pom.xml index 1322a22b1..05d771937 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,10 @@ <spring-data-jpa.version>1.10.2.RELEASE</spring-data-jpa.version> <spring-boot-starter-actuator.version>1.4.0.RELEASE</spring-boot-starter-actuator.version> <aspectjweaver.version>1.8.9</aspectjweaver.version> + <woken-messages.version>a0244b0</woken-messages.version> + <javax-inject.version>1</javax-inject.version> + <akka-actor.version>2.4.11</akka-actor.version> + <akka-remote.version>2.4.11</akka-remote.version> </properties> <repositories> @@ -177,7 +181,22 @@ <dependency> <groupId>woken-messages</groupId> <artifactId>woken-messages_2.11</artifactId> - <version>a0244b0</version> + <version>${woken-messages.version}</version> + </dependency> + <dependency> + <groupId>javax.inject</groupId> + <artifactId>javax.inject</artifactId> + <version>${javax-inject.version}</version> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-actor_2.11</artifactId> + <version>${akka-actor.version}</version> + </dependency> + <dependency> + <groupId>com.typesafe.akka</groupId> + <artifactId>akka-remote_2.11</artifactId> + <version>${akka-remote.version}</version> </dependency> </dependencies> @@ -206,6 +225,7 @@ <include>**/*.json</include> <include>**/*.csv</include> <include>**/*.sql</include> + <include>**/*.conf</include> </includes> <filtering>true</filtering> </resource> diff --git a/src/main/java/eu/hbp/mip/akka/ListenerActor.java b/src/main/java/eu/hbp/mip/akka/ListenerActor.java new file mode 100644 index 000000000..69659cd26 --- /dev/null +++ b/src/main/java/eu/hbp/mip/akka/ListenerActor.java @@ -0,0 +1,52 @@ +package eu.hbp.mip.akka; + +import akka.actor.UntypedActor; +import org.springframework.context.annotation.Scope; + +import javax.inject.Inject; +import javax.inject.Named; + +/** + * Created by mirco on 24.10.16. + */ + +@Named("ListenerActor") +@Scope("prototype") +public class ListenerActor extends UntypedActor { + + public static class AlgoQuery { + private final String query; + public AlgoQuery(String query) { + this.query = query; + } + public String getQuery() { + return query; + } + } + public static class AlgoResult { + private final String result; + public AlgoResult(String result) { + this.result = result; + } + public String getResult() { + return result; + } + } + final ListeningService listeningService; + @Inject + public ListenerActor(@Named("ListeningService") ListeningService listeningService) { + this.listeningService = listeningService; + } + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof AlgoResult) { + AlgoResult algoResult = (AlgoResult) message; + listeningService.listen(algoResult.getResult()); + } + else { + unhandled(message); + } + } + +} \ No newline at end of file diff --git a/src/main/java/eu/hbp/mip/akka/ListeningService.java b/src/main/java/eu/hbp/mip/akka/ListeningService.java new file mode 100644 index 000000000..49d673d9d --- /dev/null +++ b/src/main/java/eu/hbp/mip/akka/ListeningService.java @@ -0,0 +1,14 @@ +package eu.hbp.mip.akka; + +import javax.inject.Named; + +/** + * Created by mirco on 24.10.16. + */ + +@Named("ListeningService") +public class ListeningService { + public void listen(String result) { + System.out.println(result); + } +} \ No newline at end of file diff --git a/src/main/java/eu/hbp/mip/akka/SpringActorProducer.java b/src/main/java/eu/hbp/mip/akka/SpringActorProducer.java new file mode 100644 index 000000000..ffea92145 --- /dev/null +++ b/src/main/java/eu/hbp/mip/akka/SpringActorProducer.java @@ -0,0 +1,28 @@ +package eu.hbp.mip.akka; + +import akka.actor.Actor; +import akka.actor.IndirectActorProducer; +import org.springframework.context.ApplicationContext; + + +public class SpringActorProducer implements IndirectActorProducer { + + private final ApplicationContext applicationContext; + private final String actorBeanName; + + public SpringActorProducer(ApplicationContext applicationContext, + String actorBeanName) { + this.applicationContext = applicationContext; + this.actorBeanName = actorBeanName; + } + + @Override + public Actor produce() { + return (Actor) applicationContext.getBean(actorBeanName); + } + + @Override + public Class<? extends Actor> actorClass() { + return (Class<? extends Actor>) applicationContext.getType(actorBeanName); + } +} \ No newline at end of file diff --git a/src/main/java/eu/hbp/mip/akka/SpringExtension.java b/src/main/java/eu/hbp/mip/akka/SpringExtension.java new file mode 100644 index 000000000..fb6f585c7 --- /dev/null +++ b/src/main/java/eu/hbp/mip/akka/SpringExtension.java @@ -0,0 +1,57 @@ +package eu.hbp.mip.akka; + +import akka.actor.AbstractExtensionId; +import akka.actor.ExtendedActorSystem; +import akka.actor.Extension; +import akka.actor.Props; +import org.springframework.context.ApplicationContext; + +/** + * Created by mirco on 24.10.16. + */ + +/** + * An Akka Extension to provide access to Spring managed Actor Beans. + */ +public class SpringExtension extends + AbstractExtensionId<SpringExtension.SpringExt> { + + /** + * The identifier used to access the SpringExtension. + */ + public static SpringExtension SpringExtProvider = new SpringExtension(); + /** + * Is used by Akka to instantiate the Extension identified by this + * ExtensionId, internal use only. + */ + @Override + public SpringExt createExtension(ExtendedActorSystem system) { + return new SpringExt(); + } + + /** + * The Extension implementation. + */ + public static class SpringExt implements Extension { + private volatile ApplicationContext applicationContext; + /** + * Used to initialize the Spring application context for the extension. + * @param applicationContext + */ + public void initialize(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + + /** + * Create a Props for the specified actorBeanName using the + * SpringActorProducer class. + * + * @param actorBeanName The name of the actor bean to create Props for + * @return a Props that will create the named actor bean using Spring + */ + public Props props(String actorBeanName) { + return Props.create(SpringActorProducer.class, + applicationContext, actorBeanName); + } + } +} \ No newline at end of file diff --git a/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java b/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java new file mode 100644 index 000000000..1ee3b1962 --- /dev/null +++ b/src/main/java/eu/hbp/mip/configuration/AkkaConfiguration.java @@ -0,0 +1,32 @@ +package eu.hbp.mip.configuration; + +import akka.actor.ActorSystem; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static eu.hbp.mip.akka.SpringExtension.SpringExtProvider; + +/** + * Created by mirco on 24.10.16. + */ + +@Configuration +class AkkaConfiguration { + + // the application context is needed to initialize the Akka Spring Extension + @Autowired + private ApplicationContext applicationContext; + + /** + * Actor system singleton for this application. + */ + @Bean + public ActorSystem actorSystem() { + ActorSystem system = ActorSystem.create("AkkaJavaSpring"); + // initialize the application context in the Akka Spring Extension + SpringExtProvider.get(system).initialize(applicationContext); + return system; + } +} \ No newline at end of file diff --git a/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java b/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java index 6f8ea38f1..9d16aa9da 100644 --- a/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java +++ b/src/main/java/eu/hbp/mip/controllers/ExperimentApi.java @@ -1,5 +1,7 @@ package eu.hbp.mip.controllers; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -76,6 +78,9 @@ public class ExperimentApi { @Autowired private ExperimentRepository experimentRepository; + @Autowired + private ActorSystem actorSystem; + @ApiOperation(value = "Send a request to the workflow to run an experiment", response = Experiment.class) @RequestMapping(method = RequestMethod.POST) @@ -287,6 +292,16 @@ public class ExperimentApi { // this runs in the background. For future optimization: use a thread pool final String url = experimentUrl; final String query = experiment.computeQuery(); + + ActorRef wokenActor = actorSystem.actorFor("woken"); + + // Should maybe use this instead ??? + // ActorRef wokenActor = actorSystem.actorOf( + // SpringExtension.SpringExtProvider.get(actorSystem).props("Woken"), "woken"); + + wokenActor.tell(query, null); + + new Thread() { @Override public void run() { @@ -321,9 +336,9 @@ public class ExperimentApi { } if(!JSONUtil.isJSONValid(experiment.getResult())) - { - experiment.setResult("Unsupported variables !"); - } + { + experiment.setResult("Unsupported variables !"); + } finishExpermient(experiment); } }.start(); diff --git a/src/main/resources/akka/application.conf b/src/main/resources/akka/application.conf new file mode 100644 index 000000000..2105dd95c --- /dev/null +++ b/src/main/resources/akka/application.conf @@ -0,0 +1,14 @@ +akka { + actor { + provider = remote + } + remote { + # enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "127.0.0.1" # external (logical) hostname + port = 8000 # external (logical) port + bind-hostname = "127.0.0.1" # internal (bind) hostname + bind-port = 2552 # internal (bind) port + } + } +} \ No newline at end of file -- GitLab