From f01e95f1dc372d29e0774613ffb2c4c101651637 Mon Sep 17 00:00:00 2001 From: Constantin Jucovschi <jucovschi@gmail.com> Date: Thu, 30 Jul 2015 08:32:22 +0200 Subject: [PATCH] using util event system --- pom.xml | 7 + .../kwarc/sally4/client/EventBusContext.java | 91 +++++++++++ .../kwarc/sally4/client/EventHandler.java | 5 - .../kwarc/sally4/client/EventListener.java | 28 ---- .../sally4/client/EventListenerFactory.java | 41 ----- .../sally4/client/EventListenerList.java | 26 --- .../kwarc/sally4/client/IActionAcceptor.java | 10 -- .../info/kwarc/sally4/client/SallyClient.java | 2 - .../info/kwarc/sally4/client/SallyDoc.java | 153 +----------------- .../info/kwarc/sally4/client/Subscribe.java | 12 ++ .../kwarc/sally4/client/TestSallyClient.java | 19 +++ .../sally4/client/impl/MessageParser.java | 4 + .../sally4/client/impl/SallyClientImpl.java | 12 +- .../sally4/client/impl/SallyDocImpl.java | 135 ++++++++++++++++ 14 files changed, 280 insertions(+), 265 deletions(-) create mode 100644 src/main/java/info/kwarc/sally4/client/EventBusContext.java delete mode 100644 src/main/java/info/kwarc/sally4/client/EventHandler.java delete mode 100644 src/main/java/info/kwarc/sally4/client/EventListener.java delete mode 100644 src/main/java/info/kwarc/sally4/client/EventListenerFactory.java delete mode 100644 src/main/java/info/kwarc/sally4/client/EventListenerList.java delete mode 100644 src/main/java/info/kwarc/sally4/client/IActionAcceptor.java create mode 100644 src/main/java/info/kwarc/sally4/client/Subscribe.java create mode 100644 src/main/java/info/kwarc/sally4/client/TestSallyClient.java create mode 100644 src/main/java/info/kwarc/sally4/client/impl/SallyDocImpl.java diff --git a/pom.xml b/pom.xml index eb87794..3b4d9b4 100644 --- a/pom.xml +++ b/pom.xml @@ -50,5 +50,12 @@ <artifactId>camel-jaxb</artifactId> <version>${camel.version}</version> </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>1.7.12</version> + <scope>runtime</scope> + </dependency> </dependencies> </project> diff --git a/src/main/java/info/kwarc/sally4/client/EventBusContext.java b/src/main/java/info/kwarc/sally4/client/EventBusContext.java new file mode 100644 index 0000000..35ec02d --- /dev/null +++ b/src/main/java/info/kwarc/sally4/client/EventBusContext.java @@ -0,0 +1,91 @@ +package info.kwarc.sally4.client; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map.Entry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventBusContext { + + Logger log = LoggerFactory.getLogger(getClass()); + + public EventBusContext() { + handlers = new HashMap<Object, EventBusContext.EventHandlers>(); + } + + static class EventHandlers { + HashMap<String, Method> handlerMethods; + + public EventHandlers(Object o) { + handlerMethods = new HashMap<String, Method>(); + for (Method m : o.getClass().getMethods()) { + if (m.getAnnotation(Subscribe.class) == null) + continue; + Class<?> []params = m.getParameterTypes(); + if (params.length == 0 || params.length > 2) + continue; + String id = params[0].getCanonicalName(); + if (params.length == 2) + id += "|"+params[1].getCanonicalName(); + handlerMethods.put(id, m); + } + } + + } + + HashMap<Object, EventHandlers> handlers; + + public void register(Object o) { + handlers.put(o, new EventHandlers(o)); + } + + public void unregister(Object o) { + handlers.remove(o); + } + + public void post(Object evt) { + post(evt, null); + } + + void run(Object o, Method m, Object evt, Object context) { + try { + if (m.getParameterTypes().length==1) { + m.invoke(o, evt); + } else { + m.invoke(o, evt, context); + } + } catch (IllegalAccessException e) { + log.error("Error in invoking event bus method ", e); + e.printStackTrace(); + } catch (IllegalArgumentException e) { + log.error("Error in invoking event bus method ", e); + e.printStackTrace(); + } catch (InvocationTargetException e) { + log.error("Error in invoking event bus method ", e); + e.printStackTrace(); + } + } + + public void post(Object evt, Object context) { + String p1 = evt.getClass().getCanonicalName(); + String p2 = ""; + if (context!=null) + p2=p1+"|"+context.getClass().getCanonicalName(); + + for (Entry<Object, EventBusContext.EventHandlers> entries : handlers.entrySet()) { + // contains event without context + if (entries.getValue().handlerMethods.containsKey(p1)) { + run(entries.getKey(), entries.getValue().handlerMethods.get(p1), evt, context); + } + + // contains event with context + if (context != null && entries.getValue().handlerMethods.containsKey(p2)) { + run(entries.getKey(), entries.getValue().handlerMethods.get(p2), evt, context); + } + + } + } +} \ No newline at end of file diff --git a/src/main/java/info/kwarc/sally4/client/EventHandler.java b/src/main/java/info/kwarc/sally4/client/EventHandler.java deleted file mode 100644 index bdd4540..0000000 --- a/src/main/java/info/kwarc/sally4/client/EventHandler.java +++ /dev/null @@ -1,5 +0,0 @@ -package info.kwarc.sally4.client; - -public interface EventHandler<EvtType> { - void run(EvtType evtData); -} diff --git a/src/main/java/info/kwarc/sally4/client/EventListener.java b/src/main/java/info/kwarc/sally4/client/EventListener.java deleted file mode 100644 index 772ce11..0000000 --- a/src/main/java/info/kwarc/sally4/client/EventListener.java +++ /dev/null @@ -1,28 +0,0 @@ -package info.kwarc.sally4.client; - -public class EventListener <EvtType> { - public enum Status { - Active, - Suspended - }; - - Status status; - - EventHandler<EvtType> runner; - - public EventHandler<EvtType> getRunner() { - return runner; - } - - public void setRunner(EventHandler<EvtType> runner) { - this.runner = runner; - } - - public Status getStatus() { - return status; - } - - public void setStatus(Status status) { - this.status = status; - } -} diff --git a/src/main/java/info/kwarc/sally4/client/EventListenerFactory.java b/src/main/java/info/kwarc/sally4/client/EventListenerFactory.java deleted file mode 100644 index d8158cf..0000000 --- a/src/main/java/info/kwarc/sally4/client/EventListenerFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -package info.kwarc.sally4.client; - -import info.kwarc.sally4.client.EventListener.Status; - -import java.util.HashSet; -import java.util.Set; - -public class EventListenerFactory<EventType> { - Set<EventListener<EventType>> listeners; - - public EventListenerFactory() { - listeners = new HashSet<EventListener<EventType>>(); - } - - public void emit(EventType t) { - for (EventListener<EventType> listener : listeners) { - if (listener.getStatus() == Status.Active) { - listener.getRunner().run(t); - } - } - } - - void suspendAll() { - for (EventListener<EventType> listener : listeners) { - listener.setStatus(Status.Suspended); - } - } - - public EventListener<EventType> create(EventHandler<EventType> runner) { - EventListener<EventType> result = create(); - result.setRunner(runner); - return result; - } - - public EventListener<EventType> create() { - EventListener<EventType> result = new EventListener<EventType>(); - result.setStatus(Status.Active); - listeners.add(result); - return result; - } -} diff --git a/src/main/java/info/kwarc/sally4/client/EventListenerList.java b/src/main/java/info/kwarc/sally4/client/EventListenerList.java deleted file mode 100644 index 1d878f6..0000000 --- a/src/main/java/info/kwarc/sally4/client/EventListenerList.java +++ /dev/null @@ -1,26 +0,0 @@ -package info.kwarc.sally4.client; - -import info.kwarc.sally4.client.EventListener.Status; - -import java.util.ArrayList; -import java.util.List; - - -public class EventListenerList { - List<EventListener<?>> list; - - public EventListenerList() { - list = new ArrayList<EventListener<?>>(); - } - - public <T> void addEventHandler(EventListener<T> evt) { - list.add(evt); - } - - public void suspendAll() { - for (EventListener<?> listener : list) { - listener.setStatus(Status.Suspended); - } - } - -} diff --git a/src/main/java/info/kwarc/sally4/client/IActionAcceptor.java b/src/main/java/info/kwarc/sally4/client/IActionAcceptor.java deleted file mode 100644 index 9c131b2..0000000 --- a/src/main/java/info/kwarc/sally4/client/IActionAcceptor.java +++ /dev/null @@ -1,10 +0,0 @@ -package info.kwarc.sally4.client; - -import org.apache.camel.Exchange; - -public interface IActionAcceptor <T> { - T getEventData(); - - Exchange cloneExchange(); - void sendBack(Object obj); -} diff --git a/src/main/java/info/kwarc/sally4/client/SallyClient.java b/src/main/java/info/kwarc/sally4/client/SallyClient.java index 5ac06ac..264b371 100644 --- a/src/main/java/info/kwarc/sally4/client/SallyClient.java +++ b/src/main/java/info/kwarc/sally4/client/SallyClient.java @@ -1,7 +1,5 @@ package info.kwarc.sally4.client; - - public interface SallyClient { void registerDocument(String docName, String[] interfaces, TypedCallback<SallyDoc> docCallback); } diff --git a/src/main/java/info/kwarc/sally4/client/SallyDoc.java b/src/main/java/info/kwarc/sally4/client/SallyDoc.java index 58d65a4..4e1be08 100644 --- a/src/main/java/info/kwarc/sally4/client/SallyDoc.java +++ b/src/main/java/info/kwarc/sally4/client/SallyDoc.java @@ -1,154 +1,13 @@ package info.kwarc.sally4.client; -import info.kwarc.sally.comm.core.HeartbeatRequest; -import info.kwarc.sally.comm.core.HeartbeatResponse; -import info.kwarc.sally4.client.impl.MessageParser; +public interface SallyDoc { -import java.io.StringReader; -import java.lang.reflect.Method; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; + public abstract EventBusContext getEventBusContext(); -import javax.xml.bind.Unmarshaller; + void sendEvent(Object obj); -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.builder.RouteBuilder; + <T> T sendRequest(Object obj, Class<T> type); -public class SallyDoc extends RouteBuilder { + void stop(); - MessageParser parser; - HashSet<String> schemaSet; - ProducerTemplate producerTemplate; - String inputQueue; - String outputQueue; - - EventHandler<IActionAcceptor<HeartbeatRequest>> heartbeatRequestHandler = new EventHandler<IActionAcceptor<HeartbeatRequest>>() { - - @Override - public void run(IActionAcceptor<HeartbeatRequest> evtData) { - evtData.sendBack(new HeartbeatResponse()); - } - }; - - public SallyDoc(MessageParser parser, List<String> schemas, String inputQueue, String outputQueue) { - this.parser = parser; - schemaSet = new HashSet<String>(schemas); - messageHandlers = new HashMap<Class<? extends Object>, SallyDoc.EventMessageHandler<?>>(); - this.inputQueue = inputQueue; - this.outputQueue = outputQueue; - schemaSet.add("core"); - addMessageHandler(HeartbeatRequest.class, heartbeatRequestHandler); - } - - HashMap<Class<? extends Object>, EventMessageHandler<?>> messageHandlers ; - - public class EventMessageHandler <T> extends EventListenerFactory<IActionAcceptor<T>>{ - } - - @SuppressWarnings("unchecked") - public <T> EventListener<IActionAcceptor<T>> addMessageHandler(Class<T> messageClass, EventHandler<IActionAcceptor<T>> eventRunner) { - EventMessageHandler<T> factory = null; - - if (!messageHandlers.containsKey(messageClass)) { - factory = new EventMessageHandler<T>(); - messageHandlers.put(messageClass, factory); - } else { - factory = (EventMessageHandler<T>) messageHandlers.get(messageClass); - } - return factory.create(eventRunner); - } - - Processor unMarshalMessage = new Processor() { - - @Override - public void process(Exchange exchange) throws Exception { - String xmlMessage = exchange.getIn().getBody(String.class); - String msgSchema = MessageParser.getMessageSchema(xmlMessage); - if (!schemaSet.contains(msgSchema)) { - log.info("Message schema does not match any schema at registration. Ignoring."+msgSchema); - return; - } - Unmarshaller unmarshaller = parser.getUnmarshaller(msgSchema); - if (unmarshaller == null) { - throw new Exception("No unmarshaller found for schema "+msgSchema); - } - exchange.getIn().setBody(unmarshaller.unmarshal(new StringReader(xmlMessage))); - } - }; - - Processor marshalMessage = new Processor() { - - @Override - public void process(Exchange exchange) throws Exception { - exchange.getIn().setBody(parser.marshall(exchange.getIn().getBody())); - } - }; - - public void sendEvent(Object obj) { - producerTemplate.sendBody("direct:sendAlexInOnly",obj); - } - - public <T> T sendRequest(Object obj, Class<T> type) { - return producerTemplate.requestBody("direct:sendAlexInOut", obj, type); - } - - - Processor triggerOnMessageHandlers = new Processor() { - - @Override - public void process(final Exchange exchange) throws Exception { - final Object obj = exchange.getIn().getBody(); - - EventMessageHandler<?> evtMsgHandler = messageHandlers.get(obj.getClass()); - if (evtMsgHandler == null) - return; - - Method m = evtMsgHandler.getClass().getMethod("emit", Object.class); - m.invoke(evtMsgHandler, new IActionAcceptor<Object>() { - - @Override - public Object getEventData() { - return obj; - } - - @Override - public Exchange cloneExchange() { - return exchange.copy(); - } - - @Override - public void sendBack(Object obj) { - exchange.getIn().setBody(obj); - } - - }); - } - }; - - @Override - public void configure() throws Exception { - producerTemplate = getContext().createProducerTemplate(); - - from(inputQueue) - .to("log:SallyDocImpl?showHeaders=true") - .convertBodyTo(String.class) - .process(unMarshalMessage) - .process(triggerOnMessageHandlers) - .process(marshalMessage) - .to("log:SallyDocImpl?showHeaders=true"); - - from("direct:sendAlexInOut") - .process(marshalMessage) - .inOut(outputQueue) - .process(unMarshalMessage); - - from("direct:sendAlexInOnly") - .process(marshalMessage) - .inOnly(outputQueue); - - } - -} +} \ No newline at end of file diff --git a/src/main/java/info/kwarc/sally4/client/Subscribe.java b/src/main/java/info/kwarc/sally4/client/Subscribe.java new file mode 100644 index 0000000..519c5fd --- /dev/null +++ b/src/main/java/info/kwarc/sally4/client/Subscribe.java @@ -0,0 +1,12 @@ +package info.kwarc.sally4.client; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface Subscribe { + +} diff --git a/src/main/java/info/kwarc/sally4/client/TestSallyClient.java b/src/main/java/info/kwarc/sally4/client/TestSallyClient.java new file mode 100644 index 0000000..38a10f7 --- /dev/null +++ b/src/main/java/info/kwarc/sally4/client/TestSallyClient.java @@ -0,0 +1,19 @@ +package info.kwarc.sally4.client; + +import info.kwarc.sally4.client.impl.SallyClientImpl; + +public class TestSallyClient { + + public static void main(String[] args) { + SallyClient client = new SallyClientImpl("tcp://localhost:61616", "karaf", "karaf"); + client.registerDocument("blah", new String[] {"frames"}, new TypedCallback<SallyDoc>() { + + @Override + public void run(SallyDoc object) { + } + + }); + + } + +} diff --git a/src/main/java/info/kwarc/sally4/client/impl/MessageParser.java b/src/main/java/info/kwarc/sally4/client/impl/MessageParser.java index c4b1c21..2fabd67 100644 --- a/src/main/java/info/kwarc/sally4/client/impl/MessageParser.java +++ b/src/main/java/info/kwarc/sally4/client/impl/MessageParser.java @@ -49,6 +49,10 @@ public class MessageParser { } Logger log; + + public boolean supportSchema(String schema) { + return bundleMappings.containsKey(schema); + } HashMap<String, MessageMarshallers > bundleMappings = new HashMap<String, MessageMarshallers>(); diff --git a/src/main/java/info/kwarc/sally4/client/impl/SallyClientImpl.java b/src/main/java/info/kwarc/sally4/client/impl/SallyClientImpl.java index 8e31bb5..3190671 100644 --- a/src/main/java/info/kwarc/sally4/client/impl/SallyClientImpl.java +++ b/src/main/java/info/kwarc/sally4/client/impl/SallyClientImpl.java @@ -42,11 +42,11 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient { CamelContext context; - public SallyClientImpl(String host, String user, String password, MessageParser parser) { - this.parser = parser; + public SallyClientImpl(String host, String user, String password) { this.host = host; this.user = user; this.password = password; + parser = new MessageParser(); } @Property(name="SallyHost") @@ -88,6 +88,7 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient { @Validate public void start() throws Exception { + started = true; context = new DefaultCamelContext(); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(host); @@ -123,10 +124,10 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient { public SallyDoc registerResponse(RegisterClientResponse response, @org.apache.camel.Property("schemas") List<String> schemas, @org.apache.camel.Property("inputQueue") String inputQueue, - @org.apache.camel.Property("callback") TypedCallback<SallyDoc> callback) throws Exception { + @org.apache.camel.Property("callback") TypedCallback<SallyDocImpl> callback) throws Exception { String outputQueue = "sallyclient:queue:"+response.getSendQueue(); - SallyDoc newDoc = new SallyDoc(parser, schemas, "sallyclient:queue:"+inputQueue, outputQueue); + SallyDocImpl newDoc = new SallyDocImpl(parser, schemas, "sallyclient:queue:"+inputQueue, outputQueue); callback.run(newDoc); CamelContext docContext = new DefaultCamelContext(); @@ -150,8 +151,7 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient { public void registerDocument(String docName, String[] schemas, TypedCallback<SallyDoc> callback) { if (!started) { try { - context.start(); - started = true; + start(); } catch (Exception e) { e.printStackTrace(); } diff --git a/src/main/java/info/kwarc/sally4/client/impl/SallyDocImpl.java b/src/main/java/info/kwarc/sally4/client/impl/SallyDocImpl.java new file mode 100644 index 0000000..6a4d20a --- /dev/null +++ b/src/main/java/info/kwarc/sally4/client/impl/SallyDocImpl.java @@ -0,0 +1,135 @@ +package info.kwarc.sally4.client.impl; + +import info.kwarc.sally.comm.core.HeartbeatRequest; +import info.kwarc.sally.comm.core.HeartbeatResponse; +import info.kwarc.sally4.client.EventBusContext; +import info.kwarc.sally4.client.SallyDoc; +import info.kwarc.sally4.client.Subscribe; + +import java.io.StringReader; +import java.util.List; + +import javax.xml.bind.Unmarshaller; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultExchange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SallyDocImpl extends RouteBuilder implements SallyDoc { + + MessageParser parser; + ProducerTemplate producerTemplate; + String inputQueue; + String outputQueue; + EventBusContext busContext; + + Logger log = LoggerFactory.getLogger(getClass()); + + @Subscribe + public void heartbeatRequest(HeartbeatRequest req, DefaultExchange e) { + e.getIn().setBody(new HeartbeatResponse()); + } + + public SallyDocImpl(MessageParser parser, List<String> schemas, String inputQueue, String outputQueue) { + this.parser = parser; + this.inputQueue = inputQueue; + this.outputQueue = outputQueue; + busContext = new EventBusContext(); + for (String schema : schemas) { + parser.addCommLibrary(schema); + } + busContext.register(this); + } + + /* (non-Javadoc) + * @see info.kwarc.sally4.client.SallyDoc#getEventBusContext() + */ + @Override + public EventBusContext getEventBusContext() { + return busContext; + } + + Processor unMarshalMessage = new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + String xmlMessage = exchange.getIn().getBody(String.class); + String msgSchema = MessageParser.getMessageSchema(xmlMessage); + if (!parser.supportSchema(msgSchema)) { + log.info("Message schema does not match any schema at registration. Ignoring."+msgSchema); + return; + } + Unmarshaller unmarshaller = parser.getUnmarshaller(msgSchema); + if (unmarshaller == null) { + throw new Exception("No unmarshaller found for schema "+msgSchema); + } + exchange.getIn().setBody(unmarshaller.unmarshal(new StringReader(xmlMessage))); + } + }; + + Processor marshalMessage = new Processor() { + + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody(parser.marshall(exchange.getIn().getBody())); + } + }; + + @Override + public void stop() { + try { + getEventBusContext().unregister(this); + getContext().stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void sendEvent(Object obj) { + producerTemplate.sendBody("direct:sendAlexInOnly",obj); + } + + @Override + public <T> T sendRequest(Object obj, Class<T> type) { + return producerTemplate.requestBody("direct:sendAlexInOut", obj, type); + } + + + Processor triggerOnMessageHandlers = new Processor() { + + @Override + public void process(final Exchange exchange) throws Exception { + final Object obj = exchange.getIn().getBody(); + busContext.post(obj, exchange); + } + }; + + @Override + public void configure() throws Exception { + producerTemplate = getContext().createProducerTemplate(); + + from(inputQueue) + .to("log:SallyDocImpl?showHeaders=true") + .convertBodyTo(String.class) + .process(unMarshalMessage) + .process(triggerOnMessageHandlers) + .process(marshalMessage) + .to("log:SallyDocImpl?showHeaders=true"); + + from("direct:sendAlexInOut") + .process(marshalMessage) + .inOut(outputQueue) + .process(unMarshalMessage); + + from("direct:sendAlexInOnly") + .process(marshalMessage) + .inOnly(outputQueue); + + } + +} -- GitLab