Commit 60c76b51 authored by Constantin Jucovschi's avatar Constantin Jucovschi

pom.xml

parent 784b4720
......@@ -17,21 +17,22 @@
<packaging>bundle</packaging>
<artifactId>client-java-camel</artifactId>
<properties>
<bundle.private-package>info.kwarc.sally4.client.impl</bundle.private-package>
<bundle.export-package>info.kwarc.sally4.client*</bundle.export-package>
</properties>
<parent>
<groupId>info.kwarc.sally4</groupId>
<artifactId>sally4</artifactId>
<version>0.0.6</version>
<version>1.0.1-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<properties>
<bundle.private-package>info.kwarc.sally4.client.impl</bundle.private-package>
<bundle.export-package>info.kwarc.sally4.client*</bundle.export-package>
</properties>
<artifactId>client-java-camel</artifactId>
<dependencies>
<dependency>
<groupId>info.kwarc.sally4.comm</groupId>
<artifactId>comm-core</artifactId>
......@@ -41,7 +42,7 @@
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-camel</artifactId>
<version>5.9.0</version>
<version>${activemq.version}</version>
</dependency>
<dependency>
......@@ -49,13 +50,5 @@
<artifactId>camel-jaxb</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.7</version>
</dependency>
</dependencies>
</project>
package info.kwarc.sally4.client;
public interface EventHandler<EvtType> {
void run(EvtType evtData);
}
package info.kwarc.sally4.client;
public class EventListener <EvtType> {
public enum Status {
Active,
Suspended
};
Status status;
EventHandler<EvtType> runner;
public EventHandler<EvtType> getRunner() {
return runner;
}
public void setRunner(EventHandler<EvtType> runner) {
this.runner = runner;
}
public Status getStatus() {
return status;
}
public void setStatus(Status status) {
this.status = status;
}
}
package info.kwarc.sally4.client;
import info.kwarc.sally4.client.EventListener.Status;
import java.util.HashSet;
import java.util.Set;
public class EventListenerFactory<EventType> {
Set<EventListener<EventType>> listeners;
public EventListenerFactory() {
listeners = new HashSet<EventListener<EventType>>();
}
public void emit(EventType t) {
for (EventListener<EventType> listener : listeners) {
if (listener.getStatus() == Status.Active) {
listener.getRunner().run(t);
}
}
}
void suspendAll() {
for (EventListener<EventType> listener : listeners) {
listener.setStatus(Status.Suspended);
}
}
public EventListener<EventType> create(EventHandler<EventType> runner) {
EventListener<EventType> result = create();
result.setRunner(runner);
return result;
}
public EventListener<EventType> create() {
EventListener<EventType> result = new EventListener<EventType>();
result.setStatus(Status.Active);
listeners.add(result);
return result;
}
}
package info.kwarc.sally4.client;
import info.kwarc.sally4.client.EventListener.Status;
import java.util.ArrayList;
import java.util.List;
public class EventListenerList {
List<EventListener<?>> list;
public EventListenerList() {
list = new ArrayList<EventListener<?>>();
}
public <T> void addEventHandler(EventListener<T> evt) {
list.add(evt);
}
public void suspendAll() {
for (EventListener<?> listener : list) {
listener.setStatus(Status.Suspended);
}
}
}
package info.kwarc.sally4.client;
import org.apache.camel.Exchange;
public interface IActionAcceptor <T> {
T getEventData();
Exchange cloneExchange();
void sendBack(Object obj);
}
package info.kwarc.sally4.client;
import org.apache.camel.RoutesBuilder;
public interface SallyClient {
void registerDocument(String docName, String[] interfaces, RoutesBuilder builder);
void registerDocument(String docName, String[] interfaces, TypedCallback<SallyDoc> docCallback);
}
package info.kwarc.sally4.client;
import info.kwarc.sally.comm.core.HeartbeatRequest;
import info.kwarc.sally.comm.core.HeartbeatResponse;
import info.kwarc.sally4.client.impl.MessageParser;
import java.io.StringReader;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import javax.xml.bind.Unmarshaller;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
public class SallyDoc extends RouteBuilder {
MessageParser parser;
HashSet<String> schemaSet;
ProducerTemplate producerTemplate;
String inputQueue;
String outputQueue;
EventHandler<IActionAcceptor<HeartbeatRequest>> heartbeatRequestHandler = new EventHandler<IActionAcceptor<HeartbeatRequest>>() {
@Override
public void run(IActionAcceptor<HeartbeatRequest> evtData) {
evtData.sendBack(new HeartbeatResponse());
}
};
public SallyDoc(MessageParser parser, List<String> schemas, String inputQueue, String outputQueue) {
this.parser = parser;
schemaSet = new HashSet<String>(schemas);
messageHandlers = new HashMap<Class<? extends Object>, SallyDoc.EventMessageHandler<?>>();
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
schemaSet.add("core");
addMessageHandler(HeartbeatRequest.class, heartbeatRequestHandler);
}
HashMap<Class<? extends Object>, EventMessageHandler<?>> messageHandlers ;
public class EventMessageHandler <T> extends EventListenerFactory<IActionAcceptor<T>>{
}
@SuppressWarnings("unchecked")
public <T> EventListener<IActionAcceptor<T>> addMessageHandler(Class<T> messageClass, EventHandler<IActionAcceptor<T>> eventRunner) {
EventMessageHandler<T> factory = null;
if (!messageHandlers.containsKey(messageClass)) {
factory = new EventMessageHandler<T>();
messageHandlers.put(messageClass, factory);
} else {
factory = (EventMessageHandler<T>) messageHandlers.get(messageClass);
}
return factory.create(eventRunner);
}
Processor unMarshalMessage = new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
String xmlMessage = exchange.getIn().getBody(String.class);
String msgSchema = MessageParser.getMessageSchema(xmlMessage);
if (!schemaSet.contains(msgSchema)) {
log.info("Message schema does not match any schema at registration. Ignoring."+msgSchema);
return;
}
Unmarshaller unmarshaller = parser.getUnmarshaller(msgSchema);
if (unmarshaller == null) {
throw new Exception("No unmarshaller found for schema "+msgSchema);
}
exchange.getIn().setBody(unmarshaller.unmarshal(new StringReader(xmlMessage)));
}
};
Processor marshalMessage = new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody(parser.marshall(exchange.getIn().getBody()));
}
};
public void sendEvent(Object obj) {
producerTemplate.sendBody("direct:sendAlexInOnly",obj);
}
public <T> T sendRequest(Object obj, Class<T> type) {
return producerTemplate.requestBody("direct:sendAlexInOut", obj, type);
}
Processor triggerOnMessageHandlers = new Processor() {
@Override
public void process(final Exchange exchange) throws Exception {
final Object obj = exchange.getIn().getBody();
EventMessageHandler<?> evtMsgHandler = messageHandlers.get(obj.getClass());
if (evtMsgHandler == null)
return;
Method m = evtMsgHandler.getClass().getMethod("emit", Object.class);
m.invoke(evtMsgHandler, new IActionAcceptor<Object>() {
@Override
public Object getEventData() {
return obj;
}
@Override
public Exchange cloneExchange() {
return exchange.copy();
}
@Override
public void sendBack(Object obj) {
exchange.getIn().setBody(obj);
}
});
}
};
@Override
public void configure() throws Exception {
producerTemplate = getContext().createProducerTemplate();
from(inputQueue)
.to("log:SallyDocImpl?showHeaders=true")
.convertBodyTo(String.class)
.process(unMarshalMessage)
.process(triggerOnMessageHandlers)
.process(marshalMessage)
.to("log:SallyDocImpl?showHeaders=true");
from("direct:sendAlexInOut")
.process(marshalMessage)
.inOut(outputQueue)
.process(unMarshalMessage);
from("direct:sendAlexInOnly")
.process(marshalMessage)
.inOnly(outputQueue);
}
}
package info.kwarc.sally4.client;
public interface TypedCallback <T> {
void run(T object);
}
package info.kwarc.sally4.client.impl;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import org.apache.camel.converter.jaxb.JaxbDataFormat;
import org.apache.felix.ipojo.annotations.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageParser {
public final static Pattern xmlNamespace = Pattern.compile("xmlns(:(\\w+))?=\"http://kwarc.info/sally/comm/([\\w/:.]+)\"");
final String packagePrefix = "info.kwarc.sally.comm";
class MessageMarshallers {
JAXBContext context;
Marshaller marshaller;
Unmarshaller unmarshaller;
public MessageMarshallers(JAXBContext context) {
this.context = context;
try {
marshaller = context.createMarshaller();
unmarshaller = context.createUnmarshaller();
} catch (JAXBException e) {
e.printStackTrace();
}
}
public JAXBContext getContext() {
return context;
}
public Marshaller getMarshaller() {
return marshaller;
}
public Unmarshaller getUnmarshaller() {
return unmarshaller;
}
}
Logger log;
HashMap<String, MessageMarshallers > bundleMappings = new HashMap<String, MessageMarshallers>();
public JaxbDataFormat getFormat(String commType) {
return new JaxbDataFormat(bundleMappings.get(commType).getContext());
}
public Unmarshaller getUnmarshaller(String schemaType) {
if (!bundleMappings.containsKey(schemaType))
return null;
return bundleMappings.get(schemaType).getUnmarshaller();
}
public Marshaller getMarshaller(String schemaType) {
if (!bundleMappings.containsKey(schemaType))
return null;
return bundleMappings.get(schemaType).getMarshaller();
}
public static String getMessageSchema(String xmlMessage) {
Matcher m = xmlNamespace.matcher(xmlMessage);
if (m.find()) {
return m.group(3);
} else
return null;
}
/* (non-Javadoc)
* @see info.kwarc.sally4.docmanager.impl.MessageParser1#validMessageSchemaDefinition(java.lang.String)
*/
public boolean validMessageSchemaDefinition(String iface) {
return bundleMappings.containsKey(iface);
}
public <T> T parseAs(String xmlMessage, Class<? extends T> resultClass) {
String msgType = getMessageSchema(xmlMessage);
if (!bundleMappings.containsKey(msgType))
return null;
StringReader r = new StringReader(xmlMessage);
try {
Object o = bundleMappings.get(msgType).getUnmarshaller().unmarshal(r);
if (resultClass.isAssignableFrom(o.getClass())) {
return (T) o;
}
} catch (JAXBException e) {
e.printStackTrace();
}
return null;
}
public MessageParser() {
log = LoggerFactory.getLogger(getClass());
addCommLibrary("core");
}
@Validate
void start() {
try {
// Make sure core is always there
bundleMappings.put("core", new MessageMarshallers(JAXBContext.newInstance("info.kwarc.sally.comm.core", getClass().getClassLoader())));
} catch (JAXBException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void addCommLibrary(String iFace) {
JAXBContext context;
try {
context = JAXBContext.newInstance(packagePrefix+"."+iFace, getClass().getClassLoader());
bundleMappings.put(iFace, new MessageMarshallers(context));
} catch (JAXBException e) {
e.printStackTrace();
}
}
public String marshall(Object obj) {
String pkg = obj.getClass().getPackage().getName();
if (!pkg.startsWith(packagePrefix))
return null;
String schema = pkg.substring(packagePrefix.length()+1);
Marshaller marshaller = getMarshaller(schema);
if (marshaller == null)
return null;
StringWriter output = new StringWriter();
try {
marshaller.marshal(obj, output);
} catch (JAXBException e) {
e.printStackTrace();
return null;
}
return output.toString();
}
}
......@@ -3,9 +3,11 @@ package info.kwarc.sally4.client.impl;
import info.kwarc.sally.comm.core.RegisterClientRequest;
import info.kwarc.sally.comm.core.RegisterClientResponse;
import info.kwarc.sally4.client.SallyClient;
import info.kwarc.sally4.client.utils.ProducerConsumerSplitterComponent;
import info.kwarc.sally4.client.SallyDoc;
import info.kwarc.sally4.client.TypedCallback;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import javax.xml.bind.JAXBContext;
......@@ -13,7 +15,7 @@ import javax.xml.bind.JAXBContext;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.ExchangeBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.converter.jaxb.JaxbDataFormat;
......@@ -36,9 +38,12 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient {
org.apache.camel.Component sallyConn;
MessageParser parser;
CamelContext context;
public SallyClientImpl(String host, String user, String password) {
public SallyClientImpl(String host, String user, String password, MessageParser parser) {
this.parser = parser;
this.host = host;
this.user = user;
this.password = password;
......@@ -91,6 +96,8 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient {
sallyConn = JmsComponent.jmsComponent(connectionFactory);
context.addComponent("activemq", sallyConn);
context.addRoutes(this);
context.start();
}
@Invalidate
......@@ -105,28 +112,29 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient {
DataFormat core = new JaxbDataFormat(context);
from("direct:sally_register")
.setProperty("origRequest", body())
.marshal(core)
.to("log:foo?showHeaders=true&showProperties=true")
.to("log:foo?showHeaders=true")
.inOut("activemq:queue:sally_register")
.to("log:foo?showHeaders=true&showProperties=true")
.to("log:foo?showHeaders=true")
.unmarshal(core)
.bean(method(this, "registerResponse"));
}
public void registerResponse(Exchange e) throws Exception {
RegisterClientResponse response = e.getIn().getBody(RegisterClientResponse.class);
RegisterClientRequest req = e.getProperty("origRequest", RegisterClientRequest.class);
RoutesBuilder builder = e.getProperty("builder", RoutesBuilder.class);
public SallyDoc registerResponse(RegisterClientResponse response,
@org.apache.camel.Property("schemas") List<String> schemas,
@org.apache.camel.Property("inputQueue") String inputQueue,
@org.apache.camel.Property("callback") TypedCallback<SallyDoc> callback) throws Exception {
String outputQueue = "sallyclient:queue:"+response.getSendQueue();
SallyDoc newDoc = new SallyDoc(parser, schemas, "sallyclient:queue:"+inputQueue, outputQueue);
callback.run(newDoc);
CamelContext docContext = new DefaultCamelContext();
docContext.addComponent("sallyclient", sallyConn);
ProducerConsumerSplitterComponent comp = new ProducerConsumerSplitterComponent("sallyclient:queue:"+req.getListenQueue(), "sallyclient:queue:"+response.getSendQueue());
comp.setCamelContext(docContext);
docContext.addComponent("sally", comp);
docContext.addRoutes(builder);
docContext.addRoutes(newDoc);
docContext.start();
return newDoc;
}
String genUUID(String name) {
......@@ -139,21 +147,24 @@ public class SallyClientImpl extends RouteBuilder implements SallyClient {
}
@Override
public void registerDocument(String docName, String[] schemas, RoutesBuilder builder) {
public void registerDocument(String docName, String[] schemas, TypedCallback<SallyDoc> callback) {
if (!started) {
try {
context.start();
started = true;
} catch (Exception e) {
e.printStackTrace();
return;
}
}
RegisterClientRequest regDoc = new RegisterClientRequest();
regDoc.setListenQueue(genUUID(docName));
regDoc.setEnvironmentID(System.getenv("SALLYENVID"));
regDoc.getSchemas().addAll(Arrays.asList(schemas));
getContext().createProducerTemplate().sendBodyAndProperty("direct:sally_register", regDoc, "builder", builder);
Exchange e = ExchangeBuilder.anExchange(context)
.withBody(regDoc).withProperty("schemas", regDoc.getSchemas()).withProperty("callback", callback)
.withProperty("inputQueue", regDoc.getListenQueue())
.build();
getContext().createProducerTemplate().send("direct:sally_register", e);
}
}
package info.kwarc.sally4.client.utils;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Map;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import org.apache.camel.converter.jaxb.JaxbDataFormat;
import org.apache.camel.spi.DataFormat;
public class CommUtils {
public final static String iFacePrefix = "info.kwarc.sally.comm.";
public final static String xmlFacePrefix = "http://kwarc.info/sally/comm/";
public static DataFormat getDataFormat(String iFace) {
try {
JAXBContext context = JAXBContext.newInstance(iFacePrefix+iFace, CommUtils.class.getClassLoader());
return new JaxbDataFormat(context);