Commit f01e95f1 authored by Constantin Jucovschi's avatar Constantin Jucovschi

using util event system

parent 60c76b51
......@@ -50,5 +50,12 @@
<artifactId>camel-jaxb</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.12</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
package info.kwarc.sally4.client;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EventBusContext {
Logger log = LoggerFactory.getLogger(getClass());
public EventBusContext() {
handlers = new HashMap<Object, EventBusContext.EventHandlers>();
}
static class EventHandlers {
HashMap<String, Method> handlerMethods;
public EventHandlers(Object o) {
handlerMethods = new HashMap<String, Method>();
for (Method m : o.getClass().getMethods()) {
if (m.getAnnotation(Subscribe.class) == null)
continue;
Class<?> []params = m.getParameterTypes();
if (params.length == 0 || params.length > 2)
continue;
String id = params[0].getCanonicalName();
if (params.length == 2)
id += "|"+params[1].getCanonicalName();
handlerMethods.put(id, m);
}
}
}
HashMap<Object, EventHandlers> handlers;
public void register(Object o) {
handlers.put(o, new EventHandlers(o));
}
public void unregister(Object o) {
handlers.remove(o);
}
public void post(Object evt) {
post(evt, null);
}
void run(Object o, Method m, Object evt, Object context) {
try {
if (m.getParameterTypes().length==1) {
m.invoke(o, evt);
} else {
m.invoke(o, evt, context);
}
} catch (IllegalAccessException e) {
log.error("Error in invoking event bus method ", e);
e.printStackTrace();
} catch (IllegalArgumentException e) {
log.error("Error in invoking event bus method ", e);
e.printStackTrace();
} catch (InvocationTargetException e) {
log.error("Error in invoking event bus method ", e);
e.printStackTrace();
}
}
public void post(Object evt, Object context) {
String p1 = evt.getClass().getCanonicalName();
String p2 = "";
if (context!=null)
p2=p1+"|"+context.getClass().getCanonicalName();
for (Entry<Object, EventBusContext.EventHandlers> entries : handlers.entrySet()) {
// contains event without context
if (entries.getValue().handlerMethods.containsKey(p1)) {
run(entries.getKey(), entries.getValue().handlerMethods.get(p1), evt, context);
}
// contains event with context
if (context != null && entries.getValue().handlerMethods.containsKey(p2)) {
run(entries.getKey(), entries.getValue().handlerMethods.get(p2), evt, context);
}
}
}
}
\ No newline at end of file
package info.kwarc.sally4.client;
public interface EventHandler<EvtType> {
void run(EvtType evtData);
}
package info.kwarc.sally4.client;
public class EventListener <EvtType> {
public enum Status {
Active,
Suspended
};
Status status;
EventHandler<EvtType> runner;
public EventHandler<EvtType> getRunner() {
return runner;
}
public void setRunner(EventHandler<EvtType> runner) {
this.runner = runner;
}
public Status getStatus() {
return status;
}
public void setStatus(Status status) {
this.status = status;
}
}
package info.kwarc.sally4.client;
import info.kwarc.sally4.client.EventListener.Status;
import java.util.HashSet;
import java.util.Set;
public class EventListenerFactory<EventType> {
Set<EventListener<EventType>> listeners;
public EventListenerFactory() {
listeners = new HashSet<EventListener<EventType>>();
}
public void emit(EventType t) {
for (EventListener<EventType> listener : listeners) {
if (listener.getStatus() == Status.Active) {
listener.getRunner().run(t);
}
}
}
void suspendAll() {
for (EventListener<EventType> listener : listeners) {
listener.setStatus(Status.Suspended);
}
}
public EventListener<EventType> create(EventHandler<EventType> runner) {
EventListener<EventType> result = create();
result.setRunner(runner);
return result;
}
public EventListener<EventType> create() {
EventListener<EventType> result = new EventListener<EventType>();
result.setStatus(Status.Active);
listeners.add(result);
return result;
}
}
package info.kwarc.sally4.client;
import info.kwarc.sally4.client.EventListener.Status;
import java.util.ArrayList;
import java.util.List;
public class EventListenerList {
List<EventListener<?>> list;
public EventListenerList() {
list = new ArrayList<EventListener<?>>();
}
public <T> void addEventHandler(EventListener<T> evt) {
list.add(evt);
}
public void suspendAll() {
for (EventListener<?> listener : list) {
listener.setStatus(Status.Suspended);
}
}
}
package info.kwarc.sally4.client;
import org.apache.camel.Exchange;
public interface IActionAcceptor <T> {
T getEventData();
Exchange cloneExchange();
void sendBack(Object obj);
}
package info.kwarc.sally4.client;
public interface SallyClient {
void registerDocument(String docName, String[] interfaces, TypedCallback<SallyDoc> docCallback);
}
package info.kwarc.sally4.client;
import info.kwarc.sally.comm.core.HeartbeatRequest;
import info.kwarc.sally.comm.core.HeartbeatResponse;
import info.kwarc.sally4.client.impl.MessageParser;
public interface SallyDoc {
import java.io.StringReader;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
public abstract EventBusContext getEventBusContext();
import javax.xml.bind.Unmarshaller;
void sendEvent(Object obj);
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
<T> T sendRequest(Object obj, Class<T> type);
public class SallyDoc extends RouteBuilder {
void stop();
MessageParser parser;
HashSet<String> schemaSet;
ProducerTemplate producerTemplate;
String inputQueue;
String outputQueue;
EventHandler<IActionAcceptor<HeartbeatRequest>> heartbeatRequestHandler = new EventHandler<IActionAcceptor<HeartbeatRequest>>() {
@Override
public void run(IActionAcceptor<HeartbeatRequest> evtData) {
evtData.sendBack(new HeartbeatResponse());
}
};
public SallyDoc(MessageParser parser, List<String> schemas, String inputQueue, String outputQueue) {
this.parser = parser;
schemaSet = new HashSet<String>(schemas);
messageHandlers = new HashMap<Class<? extends Object>, SallyDoc.EventMessageHandler<?>>();
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
schemaSet.add("core");
addMessageHandler(HeartbeatRequest.class, heartbeatRequestHandler);
}
HashMap<Class<? extends Object>, EventMessageHandler<?>> messageHandlers ;
public class EventMessageHandler <T> extends EventListenerFactory<IActionAcceptor<T>>{
}
@SuppressWarnings("unchecked")
public <T> EventListener<IActionAcceptor<T>> addMessageHandler(Class<T> messageClass, EventHandler<IActionAcceptor<T>> eventRunner) {
EventMessageHandler<T> factory = null;
if (!messageHandlers.containsKey(messageClass)) {
factory = new EventMessageHandler<T>();
messageHandlers.put(messageClass, factory);
} else {
factory = (EventMessageHandler<T>) messageHandlers.get(messageClass);
}
return factory.create(eventRunner);
}
Processor unMarshalMessage = new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
String xmlMessage = exchange.getIn().getBody(String.class);
String msgSchema = MessageParser.getMessageSchema(xmlMessage);
if (!schemaSet.contains(msgSchema)) {
log.info("Message schema does not match any schema at registration. Ignoring."+msgSchema);
return;
}
Unmarshaller unmarshaller = parser.getUnmarshaller(msgSchema);
if (unmarshaller == null) {
throw new Exception("No unmarshaller found for schema "+msgSchema);
}
exchange.getIn().setBody(unmarshaller.unmarshal(new StringReader(xmlMessage)));
}
};
Processor marshalMessage = new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody(parser.marshall(exchange.getIn().getBody()));
}
};
public void sendEvent(Object obj) {
producerTemplate.sendBody("direct:sendAlexInOnly",obj);
}
public <T> T sendRequest(Object obj, Class<T> type) {
return producerTemplate.requestBody("direct:sendAlexInOut", obj, type);
}
Processor triggerOnMessageHandlers = new Processor() {
@Override
public void process(final Exchange exchange) throws Exception {
final Object obj = exchange.getIn().getBody();
EventMessageHandler<?> evtMsgHandler = messageHandlers.get(obj.getClass());
if (evtMsgHandler == null)
return;
Method m = evtMsgHandler.getClass().getMethod("emit", Object.class);
m.invoke(evtMsgHandler, new IActionAcceptor<Object>() {
@Override
public Object getEventData() {
return obj;
}
@Override
public Exchange cloneExchange() {
return exchange.copy();
}
@Override
public void sendBack(Object obj) {
exchange.getIn().setBody(obj);
}
});
}
};
@Override
public void configure() throws Exception {
producerTemplate = getContext().createProducerTemplate();
from(inputQueue)
.to("log:SallyDocImpl?showHeaders=true")
.convertBodyTo(String.class)
.process(unMarshalMessage)
.process(triggerOnMessageHandlers)
.process(marshalMessage)
.to("log:SallyDocImpl?showHeaders=true");
from("direct:sendAlexInOut")
.process(marshalMessage)
.inOut(outputQueue)
.process(unMarshalMessage);
from("direct:sendAlexInOnly")
.process(marshalMessage)
.inOnly(outputQueue);
}
}
}
\ No newline at end of file
package info.kwarc.sally4.client;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
}
package info.kwarc.sally4.client;
import info.kwarc.sally4.client.impl.SallyClientImpl;
public class TestSallyClient {
public static void main(String[] args) {
SallyClient client = new SallyClientImpl("tcp://localhost:61616", "karaf", "karaf");
client.registerDocument("blah", new String[] {"frames"}, new TypedCallback<SallyDoc>() {
@Override
public void run(SallyDoc object) {
}
});
}
}
......@@ -49,6 +49,10 @@ public class MessageParser {
}
Logger log;
public boolean supportSchema(String schema) {
return bundleMappings.containsKey(schema);
}
HashMap<String, MessageMarshallers > bundleMappings = new HashMap<String, MessageMarshallers>();
......
......@@ -42,11 +42,11 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient {
CamelContext context;
public SallyClientImpl(String host, String user, String password, MessageParser parser) {
this.parser = parser;
public SallyClientImpl(String host, String user, String password) {
this.host = host;
this.user = user;
this.password = password;
parser = new MessageParser();
}
@Property(name="SallyHost")
......@@ -88,6 +88,7 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient {
@Validate
public void start() throws Exception {
started = true;
context = new DefaultCamelContext();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(host);
......@@ -123,10 +124,10 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient {
public SallyDoc registerResponse(RegisterClientResponse response,
@org.apache.camel.Property("schemas") List<String> schemas,
@org.apache.camel.Property("inputQueue") String inputQueue,
@org.apache.camel.Property("callback") TypedCallback<SallyDoc> callback) throws Exception {
@org.apache.camel.Property("callback") TypedCallback<SallyDocImpl> callback) throws Exception {
String outputQueue = "sallyclient:queue:"+response.getSendQueue();
SallyDoc newDoc = new SallyDoc(parser, schemas, "sallyclient:queue:"+inputQueue, outputQueue);
SallyDocImpl newDoc = new SallyDocImpl(parser, schemas, "sallyclient:queue:"+inputQueue, outputQueue);
callback.run(newDoc);
CamelContext docContext = new DefaultCamelContext();
......@@ -150,8 +151,7 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient {
public void registerDocument(String docName, String[] schemas, TypedCallback<SallyDoc> callback) {
if (!started) {
try {
context.start();
started = true;
start();
} catch (Exception e) {
e.printStackTrace();
}
......
package info.kwarc.sally4.client.impl;
import info.kwarc.sally.comm.core.HeartbeatRequest;
import info.kwarc.sally.comm.core.HeartbeatResponse;
import info.kwarc.sally4.client.EventBusContext;
import info.kwarc.sally4.client.SallyDoc;
import info.kwarc.sally4.client.Subscribe;
import java.io.StringReader;
import java.util.List;
import javax.xml.bind.Unmarshaller;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultExchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SallyDocImpl extends RouteBuilder implements SallyDoc {
MessageParser parser;
ProducerTemplate producerTemplate;
String inputQueue;
String outputQueue;
EventBusContext busContext;
Logger log = LoggerFactory.getLogger(getClass());
@Subscribe
public void heartbeatRequest(HeartbeatRequest req, DefaultExchange e) {
e.getIn().setBody(new HeartbeatResponse());
}
public SallyDocImpl(MessageParser parser, List<String> schemas, String inputQueue, String outputQueue) {
this.parser = parser;
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
busContext = new EventBusContext();
for (String schema : schemas) {
parser.addCommLibrary(schema);
}
busContext.register(this);
}
/* (non-Javadoc)
* @see info.kwarc.sally4.client.SallyDoc#getEventBusContext()
*/
@Override
public EventBusContext getEventBusContext() {
return busContext;
}
Processor unMarshalMessage = new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
String xmlMessage = exchange.getIn().getBody(String.class);
String msgSchema = MessageParser.getMessageSchema(xmlMessage);
if (!parser.supportSchema(msgSchema)) {
log.info("Message schema does not match any schema at registration. Ignoring."+msgSchema);
return;
}
Unmarshaller unmarshaller = parser.getUnmarshaller(msgSchema);
if (unmarshaller == null) {
throw new Exception("No unmarshaller found for schema "+msgSchema);
}
exchange.getIn().setBody(unmarshaller.unmarshal(new StringReader(xmlMessage)));
}
};
Processor marshalMessage = new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody(parser.marshall(exchange.getIn().getBody()));
}
};
@Override
public void stop() {
try {
getEventBusContext().unregister(this);
getContext().stop();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void sendEvent(Object obj) {
producerTemplate.sendBody("direct:sendAlexInOnly",obj);
}
@Override
public <T> T sendRequest(Object obj, Class<T> type) {
return producerTemplate.requestBody("direct:sendAlexInOut", obj, type);
}
Processor triggerOnMessageHandlers = new Processor() {
@Override
public void process(final Exchange exchange) throws Exception {
final Object obj = exchange.getIn().getBody();
busContext.post(obj, exchange);
}
};
@Override
public void configure() throws Exception {
producerTemplate = getContext().createProducerTemplate();
from(inputQueue)
.to("log:SallyDocImpl?showHeaders=true")
.convertBodyTo(String.class)
.process(unMarshalMessage)
.process(triggerOnMessageHandlers)
.process(marshalMessage)
.to("log:SallyDocImpl?showHeaders=true");
from("direct:sendAlexInOut")
.process(marshalMessage)
.inOut(outputQueue)
.process(unMarshalMessage);
from("direct:sendAlexInOnly")
.process(marshalMessage)
.inOnly(outputQueue);
}
}