Skip to content
Snippets Groups Projects
Commit 7bfe19f8 authored by Risav Karna's avatar Risav Karna
Browse files

multiple document support, usage example in testclass

parent 86306133
No related branches found
No related tags found
No related merge requests found
Showing
with 14065 additions and 197 deletions
......@@ -20,23 +20,17 @@ using SallyConnect;
namespace ConnectToSally
{
//A refers to default queue,producer, consumer. B refers to the new queue(s) in response from sally.
class DotNetSallyClient : SallyClient
class DotNetSallyClient
{
//protected static AutoResetEvent semaphore = new AutoResetEvent(false);
//protected static string xmlQResponse = null;
protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
private static readonly ILog logger = LogManager.GetLogger(typeof(DotNetSallyClient));
private bool responseQSubscribed;
public string user { get; private set; }
public string password { get; private set; }
public ISession session { get; private set; }
public IConnection connection { get; private set; }
private IMessageProducer sendToSallyProducer { get; set; }
protected void startConnection()
......@@ -61,6 +55,11 @@ namespace ConnectToSally
}
/// <summary>
/// Starts a DotNetSallyClient object with a session and connection with Sally.
/// </summary>
/// <param name="user"></param>
/// <param name="password"></param>
public DotNetSallyClient(string user="webclient", string password="webclient")
{
this.user = user;
......@@ -76,175 +75,15 @@ namespace ConnectToSally
}
/// <summary>
/// Register a new document with Sally so as to start communicating details about that document.
/// </summary>
/// <param name="docName"></param>
/// <param name="interfaces"></param>
public void registerDocument(string docName, string[] interfaces)
{
SallySchemas.registerdocument reg = new registerdocument();
reg.environmentid = docName;
reg.documentqueue = "dotnet_alex_" + Guid.NewGuid().ToString();
reg.interfaces = interfaces;
string xmlMessage = Apache.NMS.Util.XmlUtil.Serialize(reg);
IDestination docQueue = SessionUtil.GetDestination(this.session, "queue://" + reg.documentqueue);
//ActiveMQQueue myQueue = new ActiveMQQueue("tes"+reg.documentqueue);
IMessageConsumer regConsumer = this.session.CreateConsumer(docQueue);
regConsumer.Listener += new MessageListener(OnDocMessage); //TODO OnDocMessage
ITextMessage regRequest = this.session.CreateTextMessage(xmlMessage);
Console.WriteLine(xmlMessage); //Now sending it to sally_register with a temp queue for the replyTo
sendReceive(SessionUtil.GetDestination(this.session, "queue://sally_register"), regRequest);
}
protected void OnDocMessage(IMessage receivedMsg) {
Console.WriteLine("Received a message on docQueue");
OnTMessage(receivedMsg);
}
/// <summary>
/// Send a message to the given destination. A temporary queue is set as a replyTo for this transaction.
/// </summary>
/// <param name="destination"></param>
/// <param name="message"></param>
public void sendReceive(IDestination destination, IMessage message) {
ITemporaryQueue tempQueue = this.session.CreateTemporaryQueue();
IMessageConsumer tempQueueConsumer = this.session.CreateConsumer(tempQueue);
tempQueueConsumer.Listener += new MessageListener(OnTMessage);
message.NMSReplyTo = tempQueue;
message.NMSCorrelationID = Guid.NewGuid().ToString();
IMessageProducer producer = this.session.CreateProducer(destination);
producer.Send(message);
//semaphore.WaitOne((int)TimeSpan.FromSeconds(10).TotalMilliseconds, true);
//Console.WriteLine("Register document request sent");
}
protected Dictionary<String, String> xmlResponseReader(XmlReader xmlReader, String xmlNode = "heartbeatrequest")
{
Dictionary<String, String> sallyResponse = new Dictionary<string, string>();
while (xmlReader.Read())
{
if ((xmlReader.NodeType == XmlNodeType.Element) && (xmlReader.Name == xmlNode || xmlReader.Name == "sallyqueue"))
{
sallyResponse.Add(xmlReader.Name, xmlReader.ReadElementContentAsString());
return sallyResponse;
public ISession getSallySession(){
if (!this.connection.IsStarted) {
throw new Exception("No connection started yet. Start a connection and try again.");
}
return this.session;
}
sallyResponse.Add("noresponse", "null");
return sallyResponse;
}
protected void registerToResponseQueue(String responseQueue)
{
logger.Info("Reached to the registration of response queue..");
string prefixedQueue = responseQueue;
IDestination responseQDestination = SessionUtil.GetDestination(this.session, prefixedQueue);
this.sendToSallyProducer = session.CreateProducer(responseQDestination);
this.responseQSubscribed = true;
//semaphore.Set();
Console.WriteLine("Ready to send messages to Sally on the response queue");
}
protected void heartBeartResponder(IMessage receivedHeartBeat)
{
Console.WriteLine("Going to respond to heartbeat request");
SallySchemas.heartbeatresponse hbResponseContent = new heartbeatresponse();
ITextMessage hbResponse = this.session.CreateTextMessage(Apache.NMS.Util.XmlUtil.Serialize(hbResponseContent));
sendReceive(receivedHeartBeat.NMSReplyTo, hbResponse);
Console.WriteLine("Responded to heartbeat request");
}
protected void OnTMessage(IMessage receivedMsg)
{
IBytesMessage msg = (IBytesMessage)receivedMsg;
//semaphore.Set();
logger.Info("Received a response on ..");
Console.WriteLine("Received a response on non-doc queue..");
if (receivedMsg == null)
{
logger.Warn("No ITextMessage...");
}
else
{
System.Byte[] content = (receivedMsg as IBytesMessage).Content;
String xmlResponse = System.Text.Encoding.Default.GetString(content);
logger.Info("Received => " + receivedMsg);
logger.Info("ByteArray => " + content);
Console.WriteLine("ByteArray => " + content);
logger.Info("Init parsing the response: " + xmlResponse);
using (XmlReader reader = XmlReader.Create(new StringReader(xmlResponse)))
{
string res;
Dictionary<String, String> sallyResponse = xmlResponseReader(reader);
if (sallyResponse.ContainsKey("sallyqueue"))
{
//register to the queue in response - even if already registered before
res = sallyResponse["sallyqueue"];
registerToResponseQueue(res);
}
else if (sallyResponse.ContainsKey("heartbeatrequest"))
{
//respond to the hb request, deserialize first(?)
Console.WriteLine("Heartbeatrequest is getting heartbeat response..");
res = sallyResponse["heartbeatrequest"];
heartBeartResponder(receivedMsg);
}
else
{
//default handler
logger.Warn("Unknown message type..");
Console.WriteLine("Unknown message type..");
}
}
}
}
/// <summary>
/// Check whether or not the client is registered to the queue returned by Sally after the client's document was registered.
/// </summary>
/// <returns>boolean</returns>
public bool isRegistered()
{
return this.responseQSubscribed;
}
/// <summary>
/// Send a message to Sally on the specific queue provided by Sally after the registration of the document.
/// </summary>
/// <param name="xmlMessage"></param>
public void sendToSally(String xmlMessage)
{
//semaphore.WaitOne((int)TimeSpan.FromSeconds(1).TotalMilliseconds, true);
if (!responseQSubscribed)
{
logger.Fatal("Register to the queue from sally's response first...");
Console.WriteLine("Register to the queue from sally's response first...");
}
else
{
this.sendToSallyProducer.Send(xmlMessage);
Console.WriteLine("Sent a message on response queue..");
}
}
//public object deserializer(string xmlString)
//{
//var serializer = new System.Xml.Serialization.XmlSerializer(typeof(SallySchemas.heartbeatrequest));
//Stream contStream = new MemoryStream(content);
//SallySchemas.heartbeatrequest hbRequest = (SallySchemas.heartbeatrequest)serializer.Deserialize(contStream);
//}
}
}
......@@ -73,7 +73,7 @@
<Compile Include="TestUnit.cs" />
<Compile Include="DotNetSallyClient.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="SallyClient.cs" />
<Compile Include="Utilities.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
......
......@@ -5,44 +5,226 @@ using System.Text;
using System.Threading.Tasks;
using SallySchemas;
using Apache.NMS;
using Apache.NMS.Util;
using System.Xml;
using System.IO;
using ConnectToSally;
using log4net;
namespace SallyConnect
{
class SallyDocument
{
public string sallyQueue { get; private set; }
public bool subscribedToQueue { get; set; }
public bool subscribedToQueue { get; private set; }
public string environmentID { get; private set; }
public string[] interfaces { get; private set; }
public SallyDocument(string queue)
private ISession session { get; set; }
private IMessageProducer sender { get; set; }
private static readonly ILog logger = LogManager.GetLogger(typeof(SallyDocument));
public SallyDocument(string environmentID, string[] interfaces)
{
this.sallyQueue = queue;
this.environmentID = environmentID;
this.interfaces = interfaces;
this.subscribedToQueue = false;
this.sallyQueue = "dotnet_alex_" + Guid.NewGuid().ToString();
}
public void sendToSally()
protected SallySchemas.registerdocument getCoreRegisterDocument()
{
SallySchemas.registerdocument reg = new registerdocument();
reg.environmentid = this.environmentID;
reg.documentqueue = this.sallyQueue;
reg.interfaces = this.interfaces;
return reg;
}
/// <summary>
/// Register a new listener and handler to the given session so as to start communicating details about this document.
/// </summary>
/// <param name="docName"></param>
/// <param name="interfaces"></param>
protected void registerDocQueueListener()
{
IDestination docQueue = SessionUtil.GetDestination(this.session, "queue://" + this.sallyQueue);
IMessageConsumer regConsumer = this.session.CreateConsumer(docQueue);
regConsumer.Listener += new MessageListener(OnDocQueueMessage); //listen for heartbeats
}
/// <summary>
/// Register a new document with Sally so as to start communicating details about that document.
/// </summary>
/// <param name="docName"></param>
/// <param name="interfaces"></param>
public void registerDocument(DotNetSallyClient dns)
{
this.session = dns.getSallySession();
string xmlMessage = Apache.NMS.Util.XmlUtil.Serialize(getCoreRegisterDocument());
ITextMessage regRequest = session.CreateTextMessage(xmlMessage);
Console.WriteLine(xmlMessage); //Now sending it to sally_register with a temp queue for the replyTo
registerDocQueueListener();
sendReceive(SessionUtil.GetDestination(this.session, "queue://sally_register"), regRequest);
}
/// <summary>
/// Send a message to the given destination. A temporary queue is set as a replyTo for this transaction.
/// </summary>
/// <param name="destination"></param>
/// <param name="message"></param>
protected void sendReceive(IDestination destination, IMessage message)
{
ITemporaryQueue tempQueue = this.session.CreateTemporaryQueue();
IMessageConsumer tempQueueConsumer = this.session.CreateConsumer(tempQueue);
tempQueueConsumer.Listener += new MessageListener(OnResponseQueueMessage);
message.NMSReplyTo = tempQueue;
message.NMSCorrelationID = Guid.NewGuid().ToString();
IMessageProducer producer = this.session.CreateProducer(destination);
producer.Send(message);
}
//public string environmentID { get; private set; }
//public string[] interfaces { get; private set; }
protected void registerToResponseQueue(String responseQueue)
{
logger.Info("Reached to the registration of response queue..");
string prefixedQueue = responseQueue;
IDestination responseQDestination = SessionUtil.GetDestination(this.session, prefixedQueue);
this.sender = session.CreateProducer(responseQDestination);
this.subscribedToQueue = true;
//semaphore.Set();
Console.WriteLine("Ready to send messages to Sally on the response queue");
}
public bool isRegistered() {
return this.subscribedToQueue;
}
//public SallyDocument(string environmentID, string[] interfaces) {
// this.environmentID = environmentID;
// this.interfaces = interfaces;
// this.subscribedToQueue = false;
//}
/// <summary>
/// Send a message to Sally on the specific queue provided by Sally after the registration of the document. If wait is enabled it waits for 1 sec (being used during unit testing)
/// </summary>
/// <param name="xmlMessage"></param>
/// <param name="wait"></param>
public void sendToSally(String xmlMessage, bool wait = false)
{
//semaphore.WaitOne((int)TimeSpan.FromSeconds(1).TotalMilliseconds, true);
if (!this.subscribedToQueue)
{
if (wait) {
System.Threading.Thread.Sleep(1000);
sendToSally(xmlMessage, false);
}
logger.Fatal("Register to the queue from sally's response first...");
Console.WriteLine("Register to the queue from sally's response first...");
}
else
{
this.sender.Send(xmlMessage);
Console.WriteLine("Sent a message on response queue..");
}
}
/// <summary>
/// This one gets heartbeats
/// </summary>
/// <param name="receivedMsg"></param>
protected void OnDocQueueMessage(IMessage receivedMsg)
{
IBytesMessage msg = (IBytesMessage)receivedMsg;
//semaphore.Set();
logger.Info("Received a heartbeat on " + this.environmentID + "'s queue..");
Console.WriteLine("Received a heartbeat on " + this.environmentID + "'s queue..");
if (receivedMsg == null)
{
logger.Warn("No ITextMessage...");
}
else
{
System.Byte[] content = (receivedMsg as IBytesMessage).Content;
String xmlResponse = System.Text.Encoding.Default.GetString(content);
//public SallySchemas.registerdocument getCoreRegisterDocument(){
// SallySchemas.registerdocument reg = new registerdocument();
// reg.environmentid = this.environmentID;
// reg.documentqueue = "dotnet_alex_" + Guid.NewGuid().ToString();
// reg.interfaces = this.interfaces;
// return reg;
//}
logger.Info("Received => " + receivedMsg);
logger.Info("ByteArray => " + content);
Console.WriteLine("ByteArray => " + content);
logger.Info("Init parsing the response: " + xmlResponse);
using (XmlReader reader = XmlReader.Create(new StringReader(xmlResponse)))
{
string res;
Dictionary<String, String> sallyResponse = Utilities.xmlResponseReader(reader, "heartbeatrequest");
if (sallyResponse.ContainsKey("heartbeatrequest"))
{
//respond to the hb request
Console.WriteLine("Heartbeatrequest is getting heartbeat response..");
res = sallyResponse["heartbeatrequest"];
heartBeartResponder(receivedMsg);
}
else
{
//default handler
logger.Warn("Unknown message type..");
Console.WriteLine("Unknown message type..");
}
}
}
}
protected void heartBeartResponder(IMessage receivedHeartBeat)
{
Console.WriteLine("Going to respond to heartbeat request");
SallySchemas.heartbeatresponse hbResponseContent = new heartbeatresponse();
ITextMessage hbResponse = this.session.CreateTextMessage(Apache.NMS.Util.XmlUtil.Serialize(hbResponseContent));
sendReceive(receivedHeartBeat.NMSReplyTo, hbResponse);
Console.WriteLine("Responded to heartbeat request");
}
/// <summary>
/// The temporary one where sally gives back a queue in response for a document register request. Also used for other trivial sendReceives
/// </summary>
/// <param name="receivedMsg"></param>
protected void OnResponseQueueMessage(IMessage receivedMsg)
{
IBytesMessage msg = (IBytesMessage)receivedMsg;
//semaphore.Set();
logger.Info("Received a response on " + this.environmentID + "'s queue..");
Console.WriteLine("Received a response on " + this.environmentID + "'s queue..");
if (receivedMsg == null)
{
logger.Warn("No ITextMessage...");
}
else
{
System.Byte[] content = (receivedMsg as IBytesMessage).Content;
String xmlResponse = System.Text.Encoding.Default.GetString(content);
logger.Info("Received => " + receivedMsg);
logger.Info("ByteArray => " + content);
Console.WriteLine("ByteArray => " + content);
logger.Info("Init parsing the response: " + xmlResponse);
using (XmlReader reader = XmlReader.Create(new StringReader(xmlResponse)))
{
string res;
Dictionary<String, String> sallyResponse = Utilities.xmlResponseReader(reader, "sallyqueue");
if (sallyResponse.ContainsKey("sallyqueue"))
{
//register to the queue in response - even if already registered before(?)
res = sallyResponse["sallyqueue"];
registerToResponseQueue(res);
}
else
{
//default handler
logger.Warn("Unknown message type..");
Console.WriteLine("Unknown message type..");
}
}
}
}
}
......
......@@ -13,13 +13,13 @@ namespace SallyConnect
String[] interfaces = new string[] { "theo" };
String docName = "random_edit_1114717882592231.sdaf";
DotNetSallyClient sallyClient = new DotNetSallyClient();
sallyClient.registerDocument(docName, interfaces);
DotNetSallyClient dns = new DotNetSallyClient();
SallyDocument sD = new SallyDocument(docName, interfaces);
sD.registerDocument(dns);
System.Threading.Thread.Sleep(1000);
if (sallyClient.isRegistered()){
sallyClient.sendToSally("some message");
if (sD.isRegistered()){
sD.sendToSally("some message");
}
Console.Read();
}
}
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Xml;
namespace SallyConnect
{
class Utilities
{
public static Dictionary<String, String> xmlResponseReader(XmlReader xmlReader, String xmlNode = "heartbeatrequest")
{
Dictionary<String, String> sallyResponse = new Dictionary<string, string>();
while (xmlReader.Read())
{
if ((xmlReader.NodeType == XmlNodeType.Element) && (xmlReader.Name == xmlNode))
{
sallyResponse.Add(xmlReader.Name, xmlReader.ReadElementContentAsString());
return sallyResponse;
}
}
sallyResponse.Add("noresponse", "null");
return sallyResponse;
}
}
}
File added
File added
This diff is collapsed.
File added
File added
This diff is collapsed.
File added
File added
File added
File added
File added
File added
File added
File added
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment