Skip to content
Snippets Groups Projects
Commit fa57cc3b authored by Tom Wiesing's avatar Tom Wiesing
Browse files

Clean rewrite

parent 8f0b7f60
Branches master
No related tags found
No related merge requests found
......@@ -15,42 +15,86 @@ namespace ConnectToSally
/// </summary>
class DotNetSallyClient
{
protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
#region Class Variables
/*
* Connection Properties
*/
/// <summary>
/// A logger so that we can write things to the console.
/// Address this DotNetSallyClient connects to.
/// </summary>
private static readonly ILog logger = LogManager.GetLogger(typeof(DotNetSallyClient));
public string address { get; private set; }
/// <summary>
/// Username for the connection.
/// Username used to connect to Sally.
/// </summary>
public string user { get; private set; }
/// <summary>
/// Password for the connection.
/// Password used to connect to Sally.
/// </summary>
public string password { get; private set; }
/// <summary>
/// Address for the connection.
/// </summary>
public string address { get; private set; }
/*
* Connection State
*/
/// <summary>
/// Connection session variable
/// </summary>
public ISession session { get; private set; }
private ISession session { get; set; }
/// <summary>
/// Connection variable
/// </summary>
public IConnection connection { get; private set; }
private IConnection connection { get; set; }
/*
* Logger
*/
/// <summary>
/// A logger so that we can write things to the console.
/// </summary>
private static readonly ILog logger = LogManager.GetLogger(typeof(DotNetSallyClient));
#endregion
#region Connection Start
/// <summary>
/// Constructs a new DotNetSallyClient and establishes a connection.
/// </summary>
/// <param name="address">Adress to connect to. </param>
/// <param name="user">Username to connect with. </param>
/// <param name="password">Password to connect with. </param>
public DotNetSallyClient(string address= "activemq:tcp://localhost:61616", string user="karaf", string password="karaf")
{
// store connection params
this.address = address;
this.user = user;
this.password = password;
BasicConfigurator.Configure(); // ???
// try to connect
if (!connect())
{
logFatal("Could not start connection");
return;
}
logInfo("Started connection");
}
/// <summary>
/// Starts the connection to Sally.
/// </summary>
protected bool startConnection()
/// <returns>If the connection was a success</returns>
protected bool connect()
{
// set up a URI and get ready for connecting
Uri connecturi = new Uri(this.address);
......@@ -84,60 +128,64 @@ namespace ConnectToSally
//nope something went wrong.
return false;
}
#endregion
#region Connection Getters
/// <summary>
/// Starts a DotNetSallyClient object with a session and connection with Sally.
/// returns the session of this SallyClient.
/// </summary>
/// <param name="address">Adress to connect to. </param>
/// <param name="user">Username to connect with. </param>
/// <param name="password">Password to connect with. </param>
public DotNetSallyClient(string address= "activemq:tcp://localhost:61616", string user="karaf", string password="karaf")
{
// store connection params
this.address = address;
this.user = user;
this.password = password;
/// <returns></returns>
public ISession getSallySession(){
//set up the logger.
BasicConfigurator.Configure();
if (!startConnection())
{
// if we could not start the connection, we should complain.
Console.WriteLine("DotNetSallyClient <" + address + ">: Could not start connection");
logger.Fatal("DotNetSallyClient <" + address + ">: Could not start connection");
return;
// if there is no connection, we can not return a session
if (this.connection == null || !this.connection.IsStarted) {
throw new Exception(logFatal("No connection started yet. Start a connection and try again."));
}
// we're done for now.
Console.WriteLine("DotNetSallyClient <"+ address + ">: Started connection");
// we can now return it.
return this.session;
}
#endregion
#region Logging
/// <summary>
/// returns the session of this SallyClient.
/// Generates a log message
/// </summary>
/// <param name="message">Message content</param>
/// <returns></returns>
public ISession getSallySession(){
// if there is no connection, we can not return it.
if (!this.connection.IsStarted) {
throw new Exception("DotNetSallyClient <" + address + ">: No connection started yet. Start a connection and try again.");
private string logMessage(string message)
{
return "DotNetSallyClient <" + address + ">: " + message;
}
//and return it.
return this.session;
/// <summary>
/// Logs an info message to console and to the log.
/// </summary>
/// <param name="message"></param>
private string logInfo(string message)
{
string msg = logMessage(message);
logger.Info(msg);
return msg;
}
/// <summary>
/// Registers a document with this Sally Client.
/// Logs a fatal message to console and to the log.
/// </summary>
/// <param name="doc"></param>
public void registerDocument(SallyDocument doc)
/// <param name="message"></param>
private string logFatal(string message)
{
//just call the document method to do this.
doc.registerWith(this);
string msg = logMessage(message);
logger.Fatal(msg);
return msg;
}
#endregion
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using SallySchemas;
using Apache.NMS;
using Apache.NMS.Util;
using ConnectToSally;
using System.Xml;
using System.Xml.Serialization;
using System.IO;
using ConnectToSally;
using log4net;
using Apache.NMS;
using Apache.NMS.Util;
using SallySchemas;
namespace SallyConnect
{
class SallyDocument
{
/// <summary>
/// The name of the sally queue we want to subscribe to
/// Delegate to handle SallyMessages
/// </summary>
public string sallyQueue { get; private set; }
/// <param name="msg"></param>
/// <param name="decoded"></param>
public delegate void SallyResponseHandler(IMessage msg, Object decoded);
class SallyDocument
{
#region Constructor
/// <summary>
/// A boolean stating if we are subscribed to the given SallyQueue
/// the SallyClient associated to this document.
/// </summary>
public bool subscribedToQueue { get; private set; }
public DotNetSallyClient client { get; private set; }
/// <summary>
/// The system environment ID used by Sally.
/// The environmentID of this Document
/// </summary>
public string environmentID { get; private set; }
/// <summary>
/// Interfaces implemented by this client.
/// The interfaces supported by this Document
/// </summary>
public string[] interfaces { get; private set; }
/// <summary>
/// Messaging session
/// </summary>
private ISession session { get; set; }
public SallyDocument(DotNetSallyClient client, string environmentID, string[] interfaces)
{
// connect to the client
this.client = client;
// set up environment Id and interfaces
this.environmentID = environmentID;
this.interfaces = interfaces;
}
#endregion
#region Handlers
/// <summary>
/// Messaging producer
/// Contains all doc Queue handlers
/// </summary>
private IMessageProducer sender { get; set; }
private Dictionary<int, Tuple<Type, SallyResponseHandler>> handlers = new Dictionary<int, Tuple<Type, SallyResponseHandler>>();
/// <summary>
/// A logger for this document.
/// contains the number of handlers (for id purposes)
/// </summary>
private static readonly ILog logger = LogManager.GetLogger(typeof(SallyDocument));
private int handler_count = 0;
/// <summary>
/// Represents a Sally Client.
/// registers a handler for a specific type of message.
/// </summary>
/// <param name="environmentID">Environment ID to use when registering with Sally. </param>
/// <param name="interfaces">A list of interfaces this client should implement. </param>
public SallyDocument(string environmentID, string[] interfaces)
/// <param name="T"></param>
/// <param name="handler"></param>
/// <returns></returns>
public int handleType(Type T, SallyResponseHandler handler)
{
int new_handler_id = handler_count++;
// set up basic variables.
this.environmentID = environmentID;
this.interfaces = interfaces;
this.subscribedToQueue = false;
handlers.Add(new_handler_id, new Tuple<Type, SallyResponseHandler>(T, handler));
// generate a new unique client name.
this.sallyQueue = "dotnet_client_" + Guid.NewGuid().ToString();
return new_handler_id;
}
#region Initial register With Sally
/// <summary>
/// Registers this document with a SallyClient
/// removes a certain handler.
/// </summary>
/// <param name="dns">SallyClient to register with</param>
public void registerWith(DotNetSallyClient dns)
/// <param name="id"></param>
/// <returns></returns>
public bool removeHandler(int id)
{
//load the session.
this.session = dns.getSallySession();
// build the message to send
string xmlMessage = Apache.NMS.Util.XmlUtil.Serialize(getCoreRegisterDocument());
ITextMessage regRequest = session.CreateTextMessage(xmlMessage);
//be ready to respond on the document queue
registerDocQueueListener();
// send the sally registration
sendReceive(SessionUtil.GetDestination(this.session, "queue://sally_register"), regRequest);
if(handlers.ContainsKey(id))
{
return handlers.Remove(id);
} else
{
return false;
}
}
/// <summary>
/// Registers to the response Queue so that we can send messages.
/// calls all handlers.
/// </summary>
/// <param name="responseQueue">Name of the response queue</param>
protected void registerToResponseQueue(String responseQueue)
/// <param name="msg"></param>
private void handleMessage(IMessage msg)
{
// create the destination
IDestination responseQDestination = SessionUtil.GetDestination(this.session, responseQueue);
// and store the sender
this.sender = session.CreateProducer(responseQDestination);
// turn the message into a string.
System.Byte[] content = (msg as IBytesMessage).Content;
String xmlMsg = System.Text.Encoding.Default.GetString(content);
// we are now ready to send messages.
this.subscribedToQueue = true;
// the object we want to decode.
object decoded;
Console.WriteLine("SallyDocument <" + environmentID + ">: Registered to document queue");
}
/// <summary>
/// Builds a request to register this document with Sally
/// </summary>
/// <returns>the request</returns>
protected SallySchemas.RegisterClientRequest getCoreRegisterDocument()
// iterate over all the handlers
foreach (KeyValuePair<int, Tuple<Type, SallyResponseHandler>> entry in handlers)
{
// get the handler
Tuple<Type, SallyResponseHandler> handler = entry.Value;
//create the request
SallySchemas.RegisterClientRequest reg = new RegisterClientRequest();
// try building a reader
XmlSerializer serializer = new XmlSerializer(handler.Item1);
XmlReader reader = XmlReader.Create(new StringReader(xmlMsg));
// and set up the parameters.
reg.EnvironmentID = this.environmentID;
reg.ListenQueue = this.sallyQueue;
reg.Schemas = this.interfaces;
// try to decode the object
try
{
decoded = serializer.Deserialize(reader);
} catch (InvalidOperationException) { continue;
} catch (XmlException) { continue; }
// if it is null, continue
if (decoded == null){ continue; }
// and return it.
return reg;
// and try to handle it.
handler.Item2(msg, decoded);
}
}
#endregion
#region Heartbeat Responding
#region Send & Receive Helpers
/// <summary>
/// Registers a listener for the document queue
/// responds to a certain message.
/// </summary>
/// <param name="docName"></param>
/// <param name="interfaces"></param>
protected void registerDocQueueListener()
/// <param name="msg"></param>
/// <param name="response"></param>
/// <param name="onResponse"></param>
public void respondToMessage(IMessage msg, IMessage response, MessageListener onResponse = null)
{
//the queue
IDestination docQueue = SessionUtil.GetDestination(this.session, "queue://" + this.sallyQueue);
// create a temporary queue and consumer
ITemporaryQueue tempQueue = this.client.getSallySession().CreateTemporaryQueue();
IMessageConsumer tempQueueConsumer = this.client.getSallySession().CreateConsumer(tempQueue);
// build and register a listener
IMessageConsumer regConsumer = this.session.CreateConsumer(docQueue);
regConsumer.Listener += new MessageListener(OnDocQueueMessage);
// handle responses if needed
if(onResponse != null)
{
tempQueueConsumer.Listener += onResponse;
}
Console.WriteLine("SallyDocument <" + environmentID + ">: Registered document queue listener");
// set the reply to and ids for the message
response.NMSReplyTo = tempQueue;
response.NMSCorrelationID = msg.NMSCorrelationID;
// finally make a consumer and send the message.
IMessageProducer producer = this.client.getSallySession().CreateProducer(msg.NMSReplyTo);
producer.Send(response);
}
/// <summary>
/// Handler for messages received on the document queue
/// Sends a message.
/// </summary>
/// <param name="receivedMsg">message received</param>
protected void OnDocQueueMessage(IMessage receivedMsg)
/// <param name="msg"></param>
/// <param name="to"></param>
/// <param name="onResponse"></param>
public void sendMessage(IMessage msg, IDestination to, MessageListener onResponse = null)
{
if (receivedMsg == null)
// create a temporary queue and consumer
ITemporaryQueue tempQueue = this.client.getSallySession().CreateTemporaryQueue();
IMessageConsumer tempQueueConsumer = this.client.getSallySession().CreateConsumer(tempQueue);
// handle responses if needed
if (onResponse != null)
{
// nothing to do ...
logger.Warn("No ITextMessage...");
tempQueueConsumer.Listener += onResponse;
msg.NMSReplyTo = tempQueue;
}
else
{
string res_xml = Utilities.getMessageContent(receivedMsg, "HeartbeatRequest");
// check if we have indeed received a message.
if (res_xml == null)
{
logger.Warn("Unknown message type..");
Console.WriteLine("Unknown message type..");
return;
} else
// set the reply to and ids for the message
msg.NMSDestination = to;
// finally make a consumer and send the message.
IMessageProducer producer = this.client.getSallySession().CreateProducer(to);
producer.Send(msg);
}
public void sendMessage(IMessage msg, MessageListener onResponse = null)
{
Console.WriteLine("SallyDocument <" + environmentID + ">: Received HeartbeatRequest");
HeartbeatRequest res = Utilities.deserialize<HeartbeatRequest>(res_xml);
heartBeartResponder(receivedMsg);
// send the message to the send Queue
sendMessage(msg, createDestination(sendQueueId), onResponse);
}
/// <summary>
/// creates a message to be sent with Sally
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public IMessage createMessage(object obj)
{
return this.client.getSallySession().CreateTextMessage(
Apache.NMS.Util.XmlUtil.Serialize(obj)
);
}
/// <summary>
/// Returns a destination
/// </summary>
/// <param name="queue"></param>
/// <returns></returns>
public IDestination createDestination(string queue)
{
return SessionUtil.GetDestination(this.client.getSallySession(), "queue://" + queue);
}
/// <summary>
/// Responds to a heart beat request
/// Creates a listener for a message.
/// </summary>
/// <param name="receivedHeartBeat"></param>
protected void heartBeartResponder(IMessage receivedHeartBeat)
/// <param name="T"></param>
/// <param name="handler"></param>
/// <returns></returns>
public MessageListener createListener(Type T, SallyResponseHandler handler)
{
return new MessageListener(msg =>
{
// create the heartbeatreponse
SallySchemas.HeartbeatResponse hbResponseContent = new HeartbeatResponse();
logInfo("hello!!!");
//read the message text
System.Byte[] content = (msg as IBytesMessage).Content;
String xmlMsg = System.Text.Encoding.Default.GetString(content);
// deserialse the message
XmlSerializer serializer = new XmlSerializer(T);
XmlReader reader = XmlReader.Create(new StringReader(xmlMsg));
// and call the handler
handler(msg, serializer.Deserialize(reader));
});
}
// and serialise it.
ITextMessage hbResponse = this.session.CreateTextMessage(Utilities.serialize<HeartbeatResponse>(hbResponseContent));
#endregion
// and respond to it.
replyAndReceive(receivedHeartBeat, hbResponse);
#region Registration
Console.WriteLine("SallyDocument <" + environmentID + ">: Send HeartbeatResponse");
}
/// <summary>
/// Contains the ID of the send Queue
/// </summary>
private string sendQueueId;
/// <summary>
/// Replies to a message and handles the response.
/// Registers this document with Sally and starts listening to Heartbeats and messages.
/// </summary>
/// <param name="destination">Destination to send message to. </param>
protected void replyAndReceive(IMessage inResponseTo, IMessage message)
/// <param name="queueID"></param>
public void register(String queueID = null)
{
// create a temporary queue and consumer
ITemporaryQueue tempQueue = this.session.CreateTemporaryQueue();
IMessageConsumer tempQueueConsumer = this.session.CreateConsumer(tempQueue);
// handle the response with the onResponseQueueMessage
tempQueueConsumer.Listener += new MessageListener(OnResponseQueueMessage);
// set the reply to and ids for the message
message.NMSReplyTo = tempQueue;
message.NMSCorrelationID = inResponseTo.NMSCorrelationID;
// if we do not have a queueId, then we generate one.
if(queueID == null)
{
queueID = "dotnet_client_" + Guid.NewGuid().ToString();
}
// finally make a consumer and send the message.
IMessageProducer producer = this.session.CreateProducer(inResponseTo.NMSReplyTo);
producer.Send(message);
//register to the Queue#
IDestination docQueue = SessionUtil.GetDestination(this.client.getSallySession(), "queue://" + queueID);
Console.WriteLine("SallyDocument <" + environmentID + ">: Replying with CorrelationID " + inResponseTo.NMSCorrelationID.ToString());
// build and register a listener
IMessageConsumer regConsumer = this.client.getSallySession().CreateConsumer(docQueue);
regConsumer.Listener += new MessageListener(handleMessage);
// register as a listener
logInfo("Registered document queue listener. ");
//create a request to send
RegisterClientRequest regRequest = new RegisterClientRequest();
regRequest.ListenQueue = queueID;
regRequest.Schemas = interfaces;
regRequest.EnvironmentID = environmentID;
// prepare to send and receive nmessage
IMessage message = createMessage(regRequest);
IDestination dest = createDestination("sally_register");
MessageListener handler = createListener(
typeof(RegisterClientResponse),
new SallyResponseHandler(_handleSallyRegister)
);
// send it.
sendMessage(message, dest, handler);
logInfo("Sent registration message");
}
/// <summary>
/// Handles the registration
/// </summary>
/// <param name="msg"></param>
/// <param name="decoded"></param>
private void _handleSallyRegister(IMessage msg, object decoded)
{
// get the response
RegisterClientResponse response = (RegisterClientResponse)decoded;
logInfo("Got registration response");
// listen to heart beats
logInfo("Registering heartbeat responder...");
handleType(typeof(HeartbeatRequest), new SallyResponseHandler(_handleHeartBeat));
// store the send Queue
sendQueueId = response.SendQueue;
}
#endregion
#region Message Send & Receive
#region Heartbeats
/// <summary>
/// Sends a message to a destionation and handles the response.
/// Handles Heart Beats
/// </summary>
/// <param name="destination">Destination to send message to. </param>
/// <param name="message">Message to send. </param>
protected void sendReceive(IDestination destination, IMessage message)
/// <param name="msg"></param>
/// <param name="decoded"></param>
private void _handleHeartBeat(IMessage msg, object decoded)
{
// create a temporary queue and consumer
ITemporaryQueue tempQueue = this.session.CreateTemporaryQueue();
IMessageConsumer tempQueueConsumer = this.session.CreateConsumer(tempQueue);
// handle the response with the onResponseQueueMessage
tempQueueConsumer.Listener += new MessageListener(OnResponseQueueMessage);
logInfo("Responding to heartbeat request");
// set the reply to and ids for the message
message.NMSReplyTo = tempQueue;
message.NMSCorrelationID = Guid.NewGuid().ToString();
// i take your request
HeartbeatRequest request = (HeartbeatRequest)decoded;
// finally make a consumer and send the message.
IMessageProducer producer = this.session.CreateProducer(destination);
producer.Send(message);
// and raise you a response
HeartbeatResponse response = new HeartbeatResponse();
Console.WriteLine("SallyDocument <" + environmentID + ">: Sending message to "+destination.ToString());
// we send it
respondToMessage(msg, createMessage(response));
}
#endregion
#region Logging
/// <summary>
/// The temporary one where sally gives back a queue in response for a document register request. Also used for other trivial sendReceives
/// A logger for this document.
/// </summary>
/// <param name="receivedMsg"></param>
protected void OnResponseQueueMessage(IMessage receivedMsg)
{
Console.WriteLine("SallyDocument <" + environmentID + ">: Received response Queue message");
private static readonly ILog logger = LogManager.GetLogger(typeof(SallyDocument));
if (receivedMsg == null)
/// <summary>
/// Generates a log message
/// </summary>
/// <param name="message">Message content</param>
/// <returns></returns>
private string logMessage(string message)
{
// if there is no message, we have nothing to do
logger.Warn("No ITextMessage...");
return "SallyDocument <" + environmentID + ">: " + message;
}
else
/// <summary>
/// Logs an info message to console and to the log.
/// </summary>
/// <param name="message"></param>
private string logInfo(string message)
{
// read the message content
string res_xml = Utilities.getMessageContent(receivedMsg, "RegisterClientResponse");
// and register to the response queue if possible.
if(res_xml == null) {
logger.Warn("SallyDocument <" + environmentID + ">: Unrecognised response Queue message");
Console.WriteLine("SallyDocument < " + environmentID + " >: Unrecognised response Queue message");
} else {
RegisterClientResponse res = Utilities.deserialize<RegisterClientResponse>(res_xml);
registerToResponseQueue(res.SendQueue);
}
}
}
string msg = logMessage(message);
#endregion
logger.Info(msg);
public bool isRegistered() {
return this.subscribedToQueue;
return msg;
}
/// <summary>
/// Sends a message to sally. The wait parameter is used for debugging.
/// Logs a warning message to console and to the log.
/// </summary>
/// <param name="xmlMessage"></param>
/// <param name="wait"></param>
public void sendToSally(String xmlMessage, bool wait = false)
/// <param name="message"></param>
private string logWarn(string message)
{
if (!this.subscribedToQueue)
{// we are not subscribed to a quenue yet, so we can not send anything.
// in the debug case, we should wait for a second
// and try again.
if (wait) {
System.Threading.Thread.Sleep(1000);
sendToSally(xmlMessage, false);
return;
}
// otherwise die with a fatal message
logger.Fatal("Register to the queue from sally's response first...");
Console.WriteLine("Register to the queue from sally's response first...");
string msg = logMessage(message);
}
else
{// we are subscribed => send the message.
logger.Warn(msg);
this.sender.Send(xmlMessage);
Console.WriteLine("Sent a message on response queue..");
return msg;
}
}
/// <summary>
/// Logs a fatal message to console and to the log.
/// </summary>
/// <param name="message"></param>
private string logFatal(string message)
{
string msg = logMessage(message);
logger.Fatal(msg);
return msg;
}
#endregion
}
}
......@@ -16,20 +16,10 @@ namespace SallyConnect
// we first create the sally client.
DotNetSallyClient dns = new DotNetSallyClient("activemq:tcp://localhost:61616");//neptune.eecs.jacobs-university.de:61616
SallyDocument doc = new SallyDocument(dns, docName, interfaces);
//and then register a document to it.
SallyDocument sD = new SallyDocument(docName, interfaces);
dns.registerDocument(sD); // register the document.
//now just wait
Console.Read();
/*
System.Threading.Thread.Sleep(1000);
if (sD.isRegistered()){
sD.sendToSally("some message");
}
doc.register();
Console.Read();
*/
}
}
}
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment