Newer
Older
using System;
using System.Collections.Generic;
using Apache.NMS;
using Apache.NMS.Util;
using SallySchemas;
/// <summary>
/// Delegate to handle SallyMessages
/// </summary>
/// <param name="msg"></param>
/// <param name="decoded"></param>
public delegate void SallyResponseHandler(IMessage msg, Object decoded);
public string[] interfaces { get; private set; }
public SallyDocument(DotNetSallyClient client, string environmentID, string[] interfaces)
{
// connect to the client
this.client = client;
// set up environment Id and interfaces
this.environmentID = environmentID;
this.interfaces = interfaces;
}
#endregion
private Dictionary<int, Tuple<Type, SallyResponseHandler>> handlers = new Dictionary<int, Tuple<Type, SallyResponseHandler>>();
/// <param name="T"></param>
/// <param name="handler"></param>
/// <returns></returns>
public int handleType(Type T, SallyResponseHandler handler)
handlers.Add(new_handler_id, new Tuple<Type, SallyResponseHandler>(T, handler));
}
/// <summary>
/// <param name="id"></param>
/// <returns></returns>
public bool removeHandler(int id)
if(handlers.ContainsKey(id))
{
return handlers.Remove(id);
} else
{
return false;
}
}
/// <summary>
/// <param name="msg"></param>
private void handleMessage(IMessage msg)
// turn the message into a string.
System.Byte[] content = (msg as IBytesMessage).Content;
String xmlMsg = System.Text.Encoding.Default.GetString(content);
// iterate over all the handlers
foreach (KeyValuePair<int, Tuple<Type, SallyResponseHandler>> entry in handlers)
{
// get the handler
Tuple<Type, SallyResponseHandler> handler = entry.Value;
// try building a reader
XmlSerializer serializer = new XmlSerializer(handler.Item1);
XmlReader reader = XmlReader.Create(new StringReader(xmlMsg));
// try to decode the object
try
{
decoded = serializer.Deserialize(reader);
} catch (InvalidOperationException) { continue;
} catch (XmlException) { continue; }
// if it is null, continue
if (decoded == null){ continue; }
// and try to handle it.
handler.Item2(msg, decoded);
}
/// <param name="msg"></param>
/// <param name="response"></param>
/// <param name="onResponse"></param>
public void respondToMessage(IMessage msg, IMessage response, MessageListener onResponse = null)
// create a temporary queue and consumer
ITemporaryQueue tempQueue = this.client.getSallySession().CreateTemporaryQueue();
IMessageConsumer tempQueueConsumer = this.client.getSallySession().CreateConsumer(tempQueue);
// handle responses if needed
if(onResponse != null)
{
tempQueueConsumer.Listener += onResponse;
}
// set the reply to and ids for the message
response.NMSReplyTo = tempQueue;
response.NMSCorrelationID = msg.NMSCorrelationID;
// finally make a consumer and send the message.
IMessageProducer producer = this.client.getSallySession().CreateProducer(msg.NMSReplyTo);
producer.Send(response);
/// <param name="msg"></param>
/// <param name="to"></param>
/// <param name="onResponse"></param>
public void sendMessage(IMessage msg, IDestination to, MessageListener onResponse = null)
// create a temporary queue and consumer
ITemporaryQueue tempQueue = this.client.getSallySession().CreateTemporaryQueue();
IMessageConsumer tempQueueConsumer = this.client.getSallySession().CreateConsumer(tempQueue);
// handle responses if needed
if (onResponse != null)
tempQueueConsumer.Listener += onResponse;
msg.NMSReplyTo = tempQueue;
// set the reply to and ids for the message
msg.NMSDestination = to;
// finally make a consumer and send the message.
IMessageProducer producer = this.client.getSallySession().CreateProducer(to);
producer.Send(msg);
}
public void sendMessage(IMessage msg, MessageListener onResponse = null)
{
// send the message to the send Queue
sendMessage(msg, createDestination(sendQueueId), onResponse);
/// <param name="obj"></param>
/// <returns></returns>
public IMessage createMessage(object obj)
return this.client.getSallySession().CreateTextMessage(
Apache.NMS.Util.XmlUtil.Serialize(obj)
);
}
/// <summary>
/// Returns a destination
/// </summary>
/// <param name="queue"></param>
/// <returns></returns>
public IDestination createDestination(string queue)
{
return SessionUtil.GetDestination(this.client.getSallySession(), "queue://" + queue);
/// <param name="T"></param>
/// <param name="handler"></param>
/// <returns></returns>
public MessageListener createListener(Type T, SallyResponseHandler handler)
return new MessageListener(msg =>
{
logInfo("hello!!!");
//read the message text
System.Byte[] content = (msg as IBytesMessage).Content;
String xmlMsg = System.Text.Encoding.Default.GetString(content);
// deserialse the message
XmlSerializer serializer = new XmlSerializer(T);
XmlReader reader = XmlReader.Create(new StringReader(xmlMsg));
// and call the handler
handler(msg, serializer.Deserialize(reader));
});
}
/// <summary>
/// Contains the ID of the send Queue
/// </summary>
private string sendQueueId;
/// <summary>
/// Registers this document with Sally and starts listening to Heartbeats and messages.
/// </summary>
/// <param name="queueID"></param>
public void register(String queueID = null)
{
// if we do not have a queueId, then we generate one.
if(queueID == null)
{
queueID = "dotnet_client_" + Guid.NewGuid().ToString();
}
//register to the Queue#
IDestination docQueue = SessionUtil.GetDestination(this.client.getSallySession(), "queue://" + queueID);
// build and register a listener
IMessageConsumer regConsumer = this.client.getSallySession().CreateConsumer(docQueue);
regConsumer.Listener += new MessageListener(handleMessage);
// register as a listener
logInfo("Registered document queue listener. ");
//create a request to send
RegisterClientRequest regRequest = new RegisterClientRequest();
regRequest.ListenQueue = queueID;
regRequest.Schemas = interfaces;
regRequest.EnvironmentID = environmentID;
// prepare to send and receive nmessage
IMessage message = createMessage(regRequest);
IDestination dest = createDestination("sally_register");
MessageListener handler = createListener(
typeof(RegisterClientResponse),
new SallyResponseHandler(_handleSallyRegister)
);
// send it.
sendMessage(message, dest, handler);
logInfo("Sent registration message");
}
/// <param name="msg"></param>
/// <param name="decoded"></param>
private void _handleSallyRegister(IMessage msg, object decoded)
// get the response
RegisterClientResponse response = (RegisterClientResponse)decoded;
logInfo("Got registration response");
// listen to heart beats
logInfo("Registering heartbeat responder...");
handleType(typeof(HeartbeatRequest), new SallyResponseHandler(_handleHeartBeat));
// store the send Queue
sendQueueId = response.SendQueue;
/// <param name="msg"></param>
/// <param name="decoded"></param>
private void _handleHeartBeat(IMessage msg, object decoded)
// i take your request
HeartbeatRequest request = (HeartbeatRequest)decoded;
// and raise you a response
HeartbeatResponse response = new HeartbeatResponse();
// we send it
respondToMessage(msg, createMessage(response));
#region Logging
/// <summary>
/// A logger for this document.
/// </summary>
private static readonly ILog logger = LogManager.GetLogger(typeof(SallyDocument));
/// <summary>
/// Generates a log message
/// </summary>
/// <param name="message">Message content</param>
/// <returns></returns>
private string logMessage(string message)
{
return "SallyDocument <" + environmentID + ">: " + message;
/// <param name="message"></param>
private string logInfo(string message)
/// <summary>
/// Logs a warning message to console and to the log.
/// </summary>
/// <param name="message"></param>
private string logWarn(string message)
{
string msg = logMessage(message);
/// <summary>
/// Logs a fatal message to console and to the log.
/// </summary>
/// <param name="message"></param>
private string logFatal(string message)
{
string msg = logMessage(message);