Using SQL Service 2008 Service Broker for Integration Project Using C# .NET – Step by Step “Hello World” Guide

We had to create a service broker notification service for our integration project a few months back. It was quite hard to find something that actually has an end to end solution to showcase the capability of service broker. So, months after the project, I decided to create a mini Hello World level tutorial to help anyone to learn the capabilities of service broker. There is very little of in depth explanation on this tutorial.  There’s  already heaps of information and blog that have all the indepth explanation. My aim is to supplement those information, not replace it. This is pretty much aimed for all of us that has to learn by seeing the code fits together 🙂

So, our aim for this project is to create a service broker notification service that inform a windows app whenever our Message table gets updated which in turn notify the SQL Server database to create some sort of ‘audit trail’.

Create a sample database called “Publishing Test” . This database should contains two tables; Messages and Processed Messages as follow:

table diagram

Nothing complicated 🙂

We should then create a set of contract and message type that will be passed around by the service broker. Request is send by initiator and response is send by the target.

CREATE MESSAGE TYPE [https://lemeaow.wordpress.com/MessagesImportRequestMessage]
VALIDATION = WELL_FORMED_XML;

CREATE MESSAGE TYPE [https://lemeaow.wordpress.com/MessagesImportResponseMessage]
VALIDATION = WELL_FORMED_XML;

CREATE CONTRACT [https://lemeaow.wordpress.com/ImportContract]
(
[https://lemeaow.wordpress.com/MessagesImportRequestMessage] SENT BY INITIATOR,
[https://lemeaow.wordpress.com/MessagesImportResponseMessage] SENT BY TARGET
);

We will then create the corresponding queues for the initiator and target:

CREATE QUEUE InitiatorQueue
WITH STATUS = ON;

CREATE QUEUE TargetQueue
WITH STATUS = ON;

And the services that’s monitoring the queue:

CREATE SERVICE InitiatorService
ON QUEUE InitiatorQueue
(
[https://lemeaow.wordpress.com/ImportContract]
)

CREATE SERVICE TargetService
ON QUEUE TargetQueue
(
[https://lemeaow.wordpress.com/ImportContract]
)

We should then create the third queue (called EA queue for ExternalActivation queue. I’m not feeling very creative today). EAQueue acts as as middle man between the TargetQueue and our external service. While we’re at it, we will also create a corresponding service called EAService.

CREATE QUEUE EAQueue;

CREATE SERVICE EAService
ON QUEUE EAQueue
(
[http://schemas.microsoft.com/SQL/Notifications/PostEventNotification]
)

We will then create an event notification. This event notification will make sure that EAService gets notified every time TargetQueue gets activated. ‘current database’ is a special keyword in SQL server to refer to the current database (Captn’ Obvious!) so don’t bother to rename this string. Note that there the internal activation status of EAQueue should NOT be on.

CREATE EVENT NOTIFICATION EventNotificationTargetQueue
ON QUEUE TargetQueue
FOR QUEUE_ACTIVATION
TO SERVICE 'EAService', 'current database';

drop EVENT NOTIFICATION EventNotificationTargetQueue ON QUEUE TargetQueue

We should then create a trigger on our Message Table to start and produce a new notification every time a row is inserted:

CREATE TRIGGER OnMessageInserted ON Messages FOR INSERT
AS
BEGIN
BEGIN TRANSACTION;
DECLARE @ch UNIQUEIDENTIFIER
DECLARE @messageBody NVARCHAR(MAX);

BEGIN DIALOG CONVERSATION @ch
FROM SERVICE [InitiatorService]
TO SERVICE 'TargetService'
ON CONTRACT [https://lemeaow.wordpress.com/ImportContract]
WITH ENCRYPTION = OFF;

-- Construct the request message
SET @messageBody = (SELECT * from inserted FOR XML AUTO, ELEMENTS);

-- Send the message to the TargetService
;SEND ON CONVERSATION @ch
MESSAGE TYPE [https://lemeaow.wordpress.com/MessagesImportRequestMessage] (@messageBody);
COMMIT;
END
GO

Note: It’s not a ‘requirement’ to create an EAQueue/Service since it’s possible for us to specify the TargetQueue instead of EAQueue in our config later on but having an EAQueue is handy because we might want to monitor two or more queue in the future that is handled in similar way. In this instance, we can just create a new event notification on the corresponding queue to the EAService.

Lets test the notification!

insert into Messages(InterestingMessage) values ('Hello');

After this select statement, there should be one row in InitiatorQueue and another in the EAqueue. If EAQueue is still empty one or more reasons, you can drop and readd the event as follow:

drop EVENT NOTIFICATION EventNotificationTargetQueue ON QUEUE TargetQueue;

CREATE EVENT NOTIFICATION EventNotificationTargetQueue
ON QUEUE TargetQueue
FOR QUEUE_ACTIVATION
TO SERVICE 'EAService', 'current database';

We should then create a ProcessResponse procedure that will be run on the InitiatorQueue as follow:

CREATE PROCEDURE ProcessResponseMessages
AS
BEGIN
DECLARE @ch UNIQUEIDENTIFIER;
DECLARE @messagetypename NVARCHAR(256);
DECLARE @messagebody XML;
DECLARE @responsemessage XML;

WHILE (1=1)
BEGIN
BEGIN TRANSACTION

WAITFOR (
RECEIVE TOP(1)
@ch = conversation_handle,
@messagetypename = message_type_name,
@messagebody = CAST(message_body AS XML)
FROM
InitiatorQueue
), TIMEOUT 1000

IF (@@ROWCOUNT = 0)
BEGIN
ROLLBACK TRANSACTION
BREAK
END

IF (@messagetypename = 'https://lemeaow.wordpress.com/MessagesImportResponseMessage')
BEGIN

INSERT INTO ProcessedMessages (Id, SentTime) VALUES
(
@messagebody.value('(/inserted/Id)[1]', 'INT'),
GETDATE()
)
END

IF (@messagetypename = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
BEGIN
-- End the conversation
END CONVERSATION @ch;
END

COMMIT TRANSACTION
END
END
GO;

Attach the SP to the queue:

ALTER QUEUE InitiatorQueue
WITH ACTIVATION
(
PROCEDURE_NAME = ProcessResponseMessages,
STATUS = ON,
MAX_QUEUE_READERS = 1,
EXECUTE AS OWNER
)

And that’s all we need in the SQL Server side! On a side note, you can manage all of your messages, contract etc via sql server management studio:

management studio

We should then create a .net Winform app that handle all the notifications etc.

Note: This code has been updated with granadacoder’s suggestion. Thanks 🙂

using System.Collections.Generic;
using System.Data.Sql;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.IO;
using Microsoft.Samples.SqlServer;
using NotificationTest.MessagesService;

namespace NotificationTest
{
    class Program
    {
        private const string ApplicationName = "MyMessageApplication";
        private const int WaitforTimeout = 5000;
        private const string EndDialogMessageType = "http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog";
        private const string ErrorMessageType = "http://schemas.microsoft.com/SQL/ServiceBroker/Error";
        public static void LogMessageToFile(string message)
        {
            StreamWriter sw = File.AppendText(@"D:\test\NotificationTest.txt");
            string entry = String.Format("{0:G}: {1}.", System.DateTime.Now, message);
            sw.WriteLine(entry);
            sw.Close();
        }
        static void Main()
        {

            SqlConnectionStringBuilder csb = new SqlConnectionStringBuilder();
            csb.ApplicationName = ApplicationName;
            csb.DataSource = @"SERVERNAME\DBNAME";
            csb.InitialCatalog = "PublishingTest";
            csb.IntegratedSecurity = true;
            csb.MultipleActiveResultSets = true;</pre>
try
{
SqlConnection conn = new SqlConnection(csb.ToString());
conn.Open();
bool MsgRcv;
do
{
MsgRcv = false;
SqlTransaction tran = conn.BeginTransaction();
//Service service = new Service(“TargetService”, conn);
//service.Run(false, conn, tran);
using (tran)
{
SqlCommand comm = conn.CreateCommand();
comm.Transaction = tran;
comm.CommandText = string.Format(“WAITFOR (RECEIVE conversation_handle, message_type_name, message_body, convert(xml, message_body) FROM {0}), TIMEOUT {1}”,
“TargetQueue”, WaitforTimeout);
SqlDataReader reader = comm.ExecuteReader();
while (reader.Read())
{
MsgRcv = true;
System.Data.SqlTypes.SqlGuid handle = reader.GetSqlGuid(0);
System.Data.SqlTypes.SqlString TypeName = reader.GetSqlString(1);

byte[] btyeArray = null;

if (!(reader.IsDBNull(2)))
{
//item.Photo = dataReader.GetBytes(EmployeeDefaultLayout.Photo);
using (MemoryStream ms = new MemoryStream())
{
byte[] buff = new byte[8192];
long offset = 0L;
long n = 0L;
do
{
n = reader.GetBytes(2, offset, buff, 0, buff.Length);
ms.Write(buff, 0, (int)n);
offset += n;
} while (n >= buff.Length);
btyeArray = ms.ToArray();
}
}

System.Data.SqlTypes.SqlXml message = reader.GetSqlXml(3);

if (TypeName == EndDialogMessageType || TypeName == ErrorMessageType)
{
if (TypeName == ErrorMessageType)
{
LogMessageToFile(message.ToString());
}
LogMessageToFile(message.ToString());
SqlCommand endconv = conn.CreateCommand();
endconv.Transaction = tran;
endconv.CommandText = “End Conversation @handle”;
endconv.Parameters.Add(“@handle”, System.Data.SqlDbType.UniqueIdentifier);
endconv.Parameters[“@handle”].Value = handle;
endconv.ExecuteNonQuery();
}
else
{
try
{
//MessagesSoapClient client = new MessagesSoapClient();
//client.ReceiveMessage(new MessagesService.Message() { Value = message.Value });
}
catch (Exception e)
{
Console.WriteLine(e);
}

ServiceBrokerWrapper.BeginConversation(tran, “EAService”, “InitiatorService”, “https://lemeaow.wordpress.com/ImportContract”);

//ServiceBrokerWrapper.Send(tran, Guid.Parse(handle.ToString()), “https://lemeaow.wordpress.com/MessagesImportResponseMessage”, message.Value);

ServiceBrokerWrapper.Send(tran, Guid.Parse(handle.ToString()), “https://lemeaow.wordpress.com/MessagesImportResponseMessage”, btyeArray);

LogMessageToFile(message.Value);
}
}
tran.Commit();
}
} while (MsgRcv);
}
catch (Exception e)
{
LogMessageToFile(e.Message.ToString());
}
<pre>        }
    }
}

I’m actually using the service broker wrapper as a start up that is developed by someone from stackoverflow. You can download it in github: https://github.com/jdaigle/servicebroker.net/blob/master/ServiceBroker.Net/ServiceBrokerWrapper.cs

Alternatively, you can also use the service broker wrapper that is supplied by MS on its code example. It’s located in C:\Program Files\Microsoft SQL Server\100\Samples\Engine\ServiceBroker and the project name is “ServiceBrokerInterface”. I prefer to use the SO’s wrapper for the purpose of this post as it is  simpler.

Interesting point of the app:

                        comm.CommandText = string.Format(
                            "WAITFOR (RECEIVE conversation_handle, message_type_name, convert(xml,message_body) FROM {0}), TIMEOUT {1}",
                            "TargetQueue", WaitforTimeout);

This statement pretty much wait for the next available message and return it.


                                ServiceBrokerWrapper.BeginConversation(tran, "EAService", "InitiatorService", "https://lemeaow.wordpress.com/ImportContract");
                                ServiceBrokerWrapper.SendXML(tran, Guid.Parse(handle.ToString()), "https://lemeaow.wordpress.com/MessagesImportResponseMessage",message.Value);

These statement send the import response message to SQL Server. This will then be handled by the stored procedure we’ve created before.

You should then download the newest msi for service broker from Microsoft. As of today, the link is http://www.microsoft.com/en-au/download/details.aspx?id=6375 . We will have to modify its config after the installation to match our settings. The config is located in C:\Program Files\Service Broker\External Activator\Config\EAService.config.

My config is as follow:

<?xml version="1.0" encoding="utf-8"?>
<Activator xmlns="http://schemas.microsoft.com/sqlserver/2008/10/servicebroker/externalactivator"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://schemas.microsoft.com/sqlserver/2008/10/servicebroker/externalactivator EAServiceConfig.xsd"
           >
  <NotificationServiceList>
    <NotificationService name="EAService" id="100" enabled="true">
      <Description>My test notification service</Description>
      <ConnectionString>
        <!-- All connection string parameters except User Id and Password should be specificed here -->
        <Unencrypted>server=SERVER\DBINSTANCE;database=PublishingTest;Application Name=NotificationTest;Integrated Security=true;</Unencrypted>
      </ConnectionString>
    </NotificationService>
  </NotificationServiceList>
  <ApplicationServiceList>
    <ApplicationService name="NotificationTest" enabled="true">
      <OnNotification>
        <ServerName>SERVER\DBINSTANCE</ServerName>
        <DatabaseName>PublishingTest</DatabaseName>
        <SchemaName>dbo</SchemaName>
        <QueueName>TargetQueue</QueueName>
      </OnNotification>
      <LaunchInfo>
        <ImagePath>D:\Random .NET Codes\NotificationTest\NotificationTest\bin\Debug\NotificationTest.exe</ImagePath>
        <CmdLineArgs></CmdLineArgs>
        <WorkDir>D:\test\working</WorkDir>
      </LaunchInfo>
      <Concurrency min="1" max="1" />
    </ApplicationService>
  </ApplicationServiceList>
  <LogSettings>
    <LogFilter>
	<TraceFlag>All Levels</TraceFlag>
	<TraceFlag>All Modules</TraceFlag>
	<TraceFlag>All Entities</TraceFlag>
	<TraceFlag>Verbose</TraceFlag>
    </LogFilter>
  </LogSettings>
</Activator>

There are still other problems to consider such as Exception handling, poison message, etc for the code to be ‘reliable enough’ but that’s another long topic that probably shouldn’t be tackled on this already long post 🙂

Advertisements

2 comments

  1. “After this select statement, there should be one row in InitiatorQueue and another in the EAqueue.”

    I’ve used the code sample, and after I insert into the “Messages” table, I have one row in the TargetQueue table and one row in the EAqueue.

    There is something a little “off” in the above sample. Could you reverify it?

    The updated ServiceBrokerWrapper code is at:
    https://github.com/jdaigle/servicebroker.net/tree/master

    PS I had to change to Program.cs code to:

    try
    {
    SqlConnection conn = new SqlConnection(csb.ToString());
    conn.Open();
    bool MsgRcv;
    do
    {
    MsgRcv = false;
    SqlTransaction tran = conn.BeginTransaction();
    //Service service = new Service(“TargetService”, conn);
    //service.Run(false, conn, tran);
    using (tran)
    {
    SqlCommand comm = conn.CreateCommand();
    comm.Transaction = tran;
    comm.CommandText = string.Format(“WAITFOR (RECEIVE conversation_handle, message_type_name, message_body, convert(xml, message_body) FROM {0}), TIMEOUT {1}”,
    “TargetQueue”, WaitforTimeout);
    SqlDataReader reader = comm.ExecuteReader();
    while (reader.Read())
    {
    MsgRcv = true;
    System.Data.SqlTypes.SqlGuid handle = reader.GetSqlGuid(0);
    System.Data.SqlTypes.SqlString TypeName = reader.GetSqlString(1);

    byte[] btyeArray = null;

    if (!(reader.IsDBNull(2)))
    {
    //item.Photo = dataReader.GetBytes(EmployeeDefaultLayout.Photo);
    using (MemoryStream ms = new MemoryStream())
    {
    byte[] buff = new byte[8192];
    long offset = 0L;
    long n = 0L;
    do
    {
    n = reader.GetBytes(2, offset, buff, 0, buff.Length);
    ms.Write(buff, 0, (int)n);
    offset += n;
    } while (n >= buff.Length);
    btyeArray = ms.ToArray();
    }
    }

    System.Data.SqlTypes.SqlXml message = reader.GetSqlXml(3);

    if (TypeName == EndDialogMessageType || TypeName == ErrorMessageType)
    {
    if (TypeName == ErrorMessageType)
    {
    LogMessageToFile(message.ToString());
    }
    LogMessageToFile(message.ToString());
    SqlCommand endconv = conn.CreateCommand();
    endconv.Transaction = tran;
    endconv.CommandText = “End Conversation @handle”;
    endconv.Parameters.Add(“@handle”, System.Data.SqlDbType.UniqueIdentifier);
    endconv.Parameters[“@handle”].Value = handle;
    endconv.ExecuteNonQuery();
    }
    else
    {
    try
    {
    //MessagesSoapClient client = new MessagesSoapClient();
    //client.ReceiveMessage(new MessagesService.Message() { Value = message.Value });
    }
    catch (Exception e)
    {
    Console.WriteLine(e);
    }

    ServiceBrokerWrapper.BeginConversation(tran, “EAService”, “InitiatorService”, “https://lemeaow.wordpress.com/ImportContract”);

    //ServiceBrokerWrapper.Send(tran, Guid.Parse(handle.ToString()), “https://lemeaow.wordpress.com/MessagesImportResponseMessage”, message.Value);

    ServiceBrokerWrapper.Send(tran, Guid.Parse(handle.ToString()), “https://lemeaow.wordpress.com/MessagesImportResponseMessage”, btyeArray);

    LogMessageToFile(message.Value);
    }
    }
    tran.Commit();
    }
    } while (MsgRcv);
    }
    catch (Exception e)
    {
    LogMessageToFile(e.Message.ToString());
    }

  2. Luis Alexander Aldazabal Gil · · Reply

    Hi, great post, I have a question: is it necesary to use externas activador service?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: