Archive

Posts Tagged ‘NServiceBus’

SOA and Messages - What’s in a name

November 23rd, 2009 Simon Segal 1 comment

So the question is what’s in a name, when it comes to considering messages of the SOA variety, or to be more precise in a messaging system? Well I guess it depends on who you ask doesn’t it and let’s for argument sake extend the idea of the ‘who’ to include the implementing technologies of the day and any of their accompanying GBE (guidance by example). Consider this fragment of WCF code that declares the signature of a service. The example comes from the MSDN stock trader example application.

Listing 1.0

[ServiceContract(Name = "OrderProcessorService",
    Namespace = "http://Trade.TraderOrderHost")]
public interface IOrderProcessor
{
    [OperationContract(Action = "SubmitOrderTransactedQueue",
        IsOneWay = true)]
    void SubmitOrderTransactedQueue(OrderDataModel order);

    [OperationContract(Action = "SubmitOrder",
        IsOneWay = true)]
    void SubmitOrder(OrderDataModel order);
}

Self Evident?

The first thing I would like to highlight about this example is the name of the service itself, it has a very generic sounding name that does not say a lot about the domain in which we are processing orders. True enough there is some clue in the WCF ServiceContract attribute property assignments but is that sufficient? The next thing I want to point out is that when an order is submitted with the intent that it should be stored in a transactional queue, the service operation responsible has been named specifically to speak to that intent (of storing in a queue). The fact that the message is stored in a queue is an implementation detail and on the surface it appears  has absolutely no association to the business context of the operation, which is to “submit an order”. If on the other hand there is a business requirement to have *certain* messages stored in a durable transactional queue, we should probably be taking those business considerations into account with respect to naming the message handler (operation). Making some assumptions, perhaps this would a better naming scheme for that scenario:

Listing 2.0

[OperationContract(Action = "SubmitStrategicCustomerOrder", IsOneWay = true)]
void SubmitStrategicCustomerOrder(OrderDataModel order);

Next up lets focus on the name of the operations single argument OrderDataModel, which is significant for a couple of reasons. Firstly, is this operator really a model – its name says it is? Certainly not if we want to hold it up to the DDD definition, it carries no behaviour with it – it’s just data, information about the order that is being “submitted”. So is OrderDataModel a DTO? If feels like a DTO doesn’t it, thus we should ask what is the message? I am not going to get too heavily into a discussion of what constitutes the ‘message’ when it comes to SOAP services, only to reflect that many consider it to be the combination of the operations and their operators, whilst some consider it to be just the operators alone and others don’t speak of messages at all and consider an operations arguments as nothing more than DTO’s.

Let’s take a different approach to this problem, what if we were to name the contract for this service IProcessOrders and include handlers (methods) for cancelling the order amongst other things.

Listing 3.0

public class StockTradeOrderService : IProcessStockTradeOrders
{
    public void Submit(OrderMessage order)
    {
        //….etc
    }

    public void Cancel(OrderMessage order)
    {
        //….etc
    }
}

I cant say it feels right to me yet but gradually I do feel as though the names of this service and the intent expressed by it’s message handlers are becoming somewhat more reflective of the business problem it attempting to solve, making the code easier to reason about.

Lets have a look at how I might go about specifying the contract and implementation for such a service using NServiceBus.

Listing 4.0

public class SubmitOrderMessageHandler : IMessageHandler<SubmitOrderMessage>
{
    public IBus Bus { get; set; }

    public void Handle(SubmitOrderMessage message)
    {
        OrderSubmittedResponseMessage response =
            new OrderSubmittedResponseMessage ();

        //more processing to act on the order message

        this.Bus.Reply(response);
    }
}

What about cancelling the order then?

Listing 5.0

public class CancelOrderMessageHandler : IMessageHandler<CancelOrderMessage>
{
    public IBus Bus { get; set; }

    public void Handle(CancelOrderMessage)
    {
        //processing
    }
}

Structural differences aside, I still feel strange about the whole SubmitOrder aspect to the naming as it stands, it still feels a tad too generic for my liking. To be fair, an order service might not be something that would be modelled with singular discrete message handlers like this, perhaps it would better be modelled as a Saga (think persisted workflow), just as it is in the NServiceBus samples, but that’s not the focus here so we wont dwell on that either.

The SubmitOrderMessageHandler demonstrated above, handles messages of the type SubmitOrderMessage, where the IMessageHandler provides us (via the strategy pattern) a Handle<T> method, where T is the message type and the Handle methods body has the express purpose of implementing the logic for handling the message.

image By now you can probably see where this all going? The “data” contained in the SubmitOrderMessage is certainly different to the data contained in the CancelOrderMessage and if we shift our focus now back to the WCF stock trader example to consider the OrderDataModel (the argument to the SubmitOrder method) then its evident that the message carries some data used by the generic handler to examine more closely exactly what the intent is (beyond the ‘submission’ of the order). Again this is not plainly clear from the name of the message alone. The OrderDataModel data contract has a property (of string) called ‘orderType’ and looking at the component that has the responsibility of processing orders, we can see that it checks the value of the ‘orderType’ to discern whether or not to process the order to BUY or SELL.

Listing 6.0

if (order.orderType == StockTraderUtility.ORDER_TYPE_BUY)
{
    //…do some work for a buy order
}
else if (order.orderType == StockTraderUtility.ORDER_TYPE_SELL)
{
    //…do some work for a sell order
}
So up to this point we still don’t really have a good naming approach tied down, it’s clear that the intent to ‘BUY’ or ‘SELL’ is not represented in the name of the message at all. However what if the contents of a ‘BUY’ and ‘SELL’ message are semantically equivalent? Is that kind of generality enough to encourage us to become more lax and collapse these very different types of business events into one single message? Personally I don’t believe so. Let’s put on our versioning hat for just a minute and try and imagine what will start to happen when requirements for the system reveal cases that  require new fields (data). For example, a ‘BUY’ order message requires a new field and suddenly we have lost our semantic equivalence and our message starts to exhibit some of the frailty it posses by attempting to be so generic. It’s therefore important to put a solid abstraction on the message contract so that with variation over time the system doesn’t suffer under the weight of brittle message contract design.
image This small snippet of decisional code in listing 6.0 underlines how the naming of the Service Contract and Data Contracts in the example do not really make a good job in delivering code that is self describing from a business context and is therefore harder to read and reason about it. In the NServiceBus example however it’s plainly evident what to expect from each message and its handler. I should point out that it is possible to borrow the style encouraged in NServiceBus and implement it in WCF (to some degree); once such approach is using un-typed messages where the body of the message contains a serialised message <T> and Services receiving messages of type <T> can have registered message handlers that can be matched to deserialized messages of <T>, finally having their Handled method called. I covered this somewhat in another post.

Generic in the name of Reuse?

imageIn an article by Thomas Erl, he speaks of more ‘generic’ naming schemes when it comes to ‘utility services’ but I don’t have any genericbusiness’ operations because the domain in which I work is not typically generic. Therefore when I put on my AOP hat and start thinking about some of the cross cutting concerns that crop up in any number of business domains, such as the ubiquitous logging for example, shouldn’t I reasonably be expected to relax my naming sensibility? But that’s not really the problem under discussion here is it.

Lets take the code now from listing’s 4.0 and 5.0 and propose a new approach to the model suggested by the Stock Trader for the submission of buying and selling trades:

public class SubmitBuyOrderMessageHandler : IMessageHandler<BuyOrderMessage>
{
    public IBus Bus { get; set; }

    public void Handle(BuyOrderMessage)
    {
        BuyOrderResponseMessage response =
            new BuyOrderResponseMessage ();

        //more processing to act on the order message

        this.Bus.Reply(response);
    }
}

public class SubmitSellOrderMessageHandler : IMessageHandler<SellOrderMessage>
{
    public IBus Bus { get; set; }

    public void Handle(SellOrderMessage)
    {
        SellOrderResponseMessage response =
            new SellOrderResponseMessage();

        //more processing to act on the order message

        this.Bus.Reply(response);
    }
}

Messages and their names should map closely to business events modelled by the system; business events as such should have their names represented in non generic terms with more focus on their explicit context. I want to build self evident, self describing business systems where things like implementation details should certainly not figure in the naming of messages or their handlers, that’s something for ‘other’ documentation.

Share/Save/Bookmark

Categories: NServiceBus, SOA, WCF Tags: , ,

A Linq To SQL Saga Persister for NServiceBus

September 22nd, 2009 Simon Segal 3 comments

NServiceBus is becoming a very popular Open Source development framework and out of the box it supports some very elegant feature extensibility points (thanks Udi) that have been designed very deliberately in a technology agnostic way. This allows us as developers to write our own implementations of certain features in the technology of our choice, and the subject of this post focuses on extending the Saga Persister. However at this point let me just say that it may well be worth your while to read Udi’s post on Saga’s before continuing and then come back here to finish.

The Saga Persister

It may be premature to discuss the Saga Persister without first touching on that which begat it, namely the Saga. I like to think of Saga’s as a long running stateful services, with methods for entry points that handle messages. You can also think of a Saga as a class that contains a set of methods, each one handling a different message type, where the entire Saga, it’s methods and all the logic contained within, accomplish a set of work over time. These methods make up the contract of what is ultimately a stateful service. For a lengthy discussion of Saga’s I recommend reading more from Udi here. In the meantime here’s a somewhat elided example:

public class RequestLeaveSaga : Saga<RequestLeaveSagaData>,
        ISagaStartedBy<IAnnualLeaveRequest>,
        IMessageHandler<IAuthorizationForAnnualLeaveResponse>,
        IMessageHandler<ICancelAnnualLeaveRequest>
{
    public override void Timeout(object state)
    {
        //…timeout logic here
    }

    public void Handle(IAnnualLeaveRequest message)
    {
        Data.Id = Guid.NewGuid();
        Data.LeaveRequestId = message.LeaveRequestId;
        Data.EmployeeId = message.EmployeeId;

        var leaveRequestForAuthorization =
            Bus.CreateInstance<IReqeustingAnnualLeaveAuthorization>(
            m =>
                {
                    m.EmployeeId = message.EmployeeId;
                    m.FirstDayOfLeave = message.LeaveStartsAt;
                    m.NumberOfDaysRequested =
                        Data.CalculateDays(message.LeaveStartsAt,
                            message.LeaveEndsAt);
                    m.TimeOfRequestSubmission = message.TimeOfRequest;
                    m.SagaId = Data.Id;
                });
        Bus.Send(leaveRequestForAuthorization);
    }

    public void Handle(IAuthorizationForAnnualLeaveResponse message)
    {
        //…handles the response message from a HR authorization service
    }

    public void Handle(ICancelAnnualLeaveRequest message)
    {
        //…handles cancel message to invalidate the request for leave
    }
}

The key to understanding the Saga Persister is linked directly to the statefulness of the Saga itself, where the job of the persister is to initially hydrate the Saga’s  state to storage (persistent or otherwise), so that when one of it’s subsequent message handlers (methods in its contract) are called, it can be re-hydrated from the storage. Saga’s contain a property named Data, which we can see (above) has been referred to in the body of the Handle method for the IAnnualLeaveRequest message. The underlying type for the Data property is determined generically in the signature for Saga<T> where T is an ISagaEntity. When persisting Saga state (to storage), NServiceBus will use the values it finds in the instance of <T> (the ISagaEntity reference in the Data property).

To better understand how this works lets look at the details specified in configuration of the Bus using the NServiceBus fluent interface for configuration, which resides in the process hosting the Saga. The following example is derived from the configuration of the Bus as demonstrated in the NServiceBus “Manufacturing” sample project.  In this example (below) I have substituted the “out of the box” persister (backed by NHibernate) with a Linq To SQL implementation.

try
{
    DataContext sessionFactory = GetContext();

    var bus = NServiceBus.Configure.With()
        .SpringBuilder(
            (cfg =>
            {
                cfg.ConfigureComponent<OrderSagaFinder>
                    (ComponentCallModelEnum.None)
                    .SessionFactory = sessionFactory;
            }))
        .XmlSerializer()
        .MsmqTransport()
            .IsTransactional(true)
            .PurgeOnStartup(false)
        .DbSubscriptionStorage()
            .Table(“Subscriptions”)
            .SubscriberEndpointColumnName(“SubscriberEndpoint”)
            .MessageTypeColumnName(“MessageType”)
        .Sagas()
        .LinqToSqlSagaPersister<OrderSagaData>(sessionFactory)
        .UnicastBus()
            .ImpersonateSender(false)
            .LoadMessageHandlers(
                First<GridInterceptingMessageHandler>
                    .Then<SagaMessageHandler>()
             )
        .CreateBus()
        .Start();
}
catch (Exception e)
{
    LogManager.GetLogger(“hello”).Fatal(“Exiting”, e);
}

There are two main areas of this configuration code that we should focus on. Firstly when we configure the nominated Inversion of Control container we can specify which Saga Finder to have configured and inject the required LINQ To SQL DataContext. Further down in this usage of the fluent interface I have specified that this endpoint supports Sagas and that they should use the LINQ To SQL Saga Persister and also that the same DataContext should be injected.

.Sagas()
.LinqToSqlSagaPersister<OrderSagaData>(sessionFactory)
Another extensibility point that needs to be addressed in order for the LinqToSqlSagaPersister to show up in the fluent interface API, demands that an implementation of an Extension Method is required for any new custom persister. Let’s look at the Extension method that makes the LinqToSqlSagaPersister available:
namespace NServiceBus
{
    public static class ConfigureLinqToSqlSagaPersister
    {
        public static Configure LinqToSqlSagaPersister<T>(this Configure config,
            DataContext sessionFactory) where T : class, ISagaEntity
        {
            config.Configurer
                .ConfigureComponent<LinqToSqlSagaPersister<T>>
                        (ComponentCallModelEnum.Singlecall)
                .SessionFactory = sessionFactory;

            return config;
        }
    }
}
Out of the box NServiceBus comes with an NHibernate backed Saga Persister that developers can freely use with very predictable and stable results. However, its possible that you may wish to implement your own persister based on your favourite ORM technology or alternate storage.

Implement an Interface

To create your custom Saga Persister you will need to implement an interface found in the NServiceBus library, unsurprisingly this Interface is named ISagaPersister and has four method signatures that make up it’s contract definition.

public class LinqToSqlSagaPersister<T> :
        ISagaPersister where T : class, ISagaEntity
{
    public void Complete(ISagaEntity saga)
    {
        SessionFactory.GetTable<T>().DeleteOnSubmit(saga as T);
        SessionFactory.SubmitChanges();
    }

    public ISagaEntity Get(Guid sagaId)
    {
        return SessionFactory.GetTable<T>()
            .Where(s => s.Id == sagaId).Single();
    }

    public void Save(ISagaEntity saga)
    {
        SessionFactory.GetTable<T>().InsertOnSubmit(saga as T);
        SessionFactory.SubmitChanges();
    }

    public void Update(ISagaEntity saga)
    {
        SessionFactory.SubmitChanges();
    }

    public virtual DataContext SessionFactory { get; set; }
}

The four methods of an ISagaPersister are responsible for interacting with the persistence store by getting a Saga entity by it’s ID, updating an existing saga entity, saving a saga entity and setting a saga as completed and removing it from the active saga list.

The Mapping File

In order to Map the ISagaEntity to the database tables we can take advantage of the fact that LINQ To SQL supports POCO’s and simply map our entities as required.

<?xml version=”1.0″ encoding=”utf-8″?>
<Database Name=”Sagas”
          xmlns=”http://schemas.microsoft.com/linqtosql/mapping/2007″>
  <Table Name=”dbo.OrderSagaData” Member=”OrderSagaData”>
    <Type Name=”OrderService.OrderSagaData”>
      <Column Name=”Id” Member=”Id” DbType=”UniqueIdentifier NOT NULL”
              IsPrimaryKey=”true” CanBeNull=”false” />
      <Column Name=”Originator” Member=”Originator”
              DbType=”VarChar(50)” CanBeNull=”true” />
      <Column Name=”PurchaseOrderNumber” Member=”PurchaseOrderNumber”
              DbType=”VarChar(50) NOT NULL” CanBeNull=”false” />
      <Column Name=”PartnerId” Member=”PartnerId”
              DbType=”UniqueIdentifier NOT NULL” CanBeNull=”false” />
      <Column Name=”ProvideBy” Member=”ProvideBy”
              DbType=”DateTime NOT NULL” CanBeNull=”false” />
      <Association Name=”OrderSagaData_OrderSagaDataLine” Member=”Lines”
              ThisKey=”Id” OtherKey=”OrderSagaDataId”
              Storage=”_orderLines”/>
    </Type>
  </Table>
  <Table Name=”dbo.OrderSagaDataLines” Member=”OrderSagaDataLines”>
    <Type Name=”OrderService.OrderLine”>
      <Column Name=”Id” Member=”Id” DbType=”UniqueIdentifier NOT NULL”
              IsPrimaryKey=”true” CanBeNull=”false” />
      <Column Name=”OrderSagaDataId” Member=”OrderSagaDataId”
              DbType=”UniqueIdentifier NOT NULL” CanBeNull=”false” />
      <Column Name=”ProductId” Member=”ProductId”
              DbType=”UniqueIdentifier NOT NULL” CanBeNull=”false” />
      <Column Name=”Quantity” Member=”Quantity”
              DbType=”Float NOT NULL” CanBeNull=”false” />
      <Column Name=”AuthorizedQuantity” Member=”AuthorizedQuantity”
              DbType=”Float NOT NULL” CanBeNull=”false” />
      <Association Name=”OrderSagaData_OrderSagaDataLine” Member=”Order”
              ThisKey=”OrderSagaDataId” OtherKey=”Id”
              IsForeignKey=”true” Storage=”_order” />
    </Type>
  </Table>
</Database>

To make the mapping file work with the DataContext instantiation we need to do something like the following:

private static DataContext GetContext()
{
    StreamReader sr =
        new StreamReader(“D:\\SagaL2SMapping.xml”);

    //set the mapping source up
    XmlMappingSource mapping = XmlMappingSource.FromStream(sr.BaseStream);

    DataContext db = null;

    //new up a persistance repository
    db = new DataContext(“Data Source=BOOMER\\BOOM09;initial “ +
                        “catalog=Sagas;user id=sa;password=supremo”,
                         mapping);

    var loadopts = new DataLoadOptions();
    loadopts.LoadWith<OrderSagaData>(o => o.Lines);
    db.LoadOptions = loadopts;
    db.DeferredLoadingEnabled = false;

    db.Log = new DebuggerTextWriter();

    return db;
}

The Saga Finder

The final part of the puzzle is the Saga Finder which is dealt with by implementing the class IFindSagas<T>. The IFindSagas class contains a nested interface Using<M> which indicates to NServiceBus that implementers of IFindSagas<T> have the ability to find saga’s of the type <T> where the message being handled is of type <M>. Once again using the “manufacturing” sample from the NServiceBus download, I have implemented a Saga Finder using LINQ To SQL:

public class OrderSagaFinder :
    IFindSagas<OrderSagaData>.Using<OrderMessage>,
    IFindSagas<OrderSagaData>.Using<CancelOrderMessage>
{
    public OrderSagaData FindBy(OrderMessage message)
    {
        return FindBy(message.PurchaseOrderNumber, message.PartnerId);
    }

    public OrderSagaData FindBy(CancelOrderMessage message)
    {
        return FindBy(message.PurchaseOrderNumber, message.PartnerId);
    }

    public OrderSagaData FindBy(string purchaseOrderNumber, Guid partnerId)
    {
        return sessionFactory.GetTable<OrderSagaData>().Where(
            o => o.PurchaseOrderNumber == purchaseOrderNumber &&
                o.PartnerId == partnerId).FirstOrDefault();
    }

    private DataContext sessionFactory;

    public virtual DataContext SessionFactory
    {
        get { return sessionFactory; }
        set { sessionFactory = value; }
    }
}

The Using<M> interface in NServiceBus specifies a single method in it’s signature, which you probably guessed is:

T FindBy(M message)
I hope this helps if your interested in implementing your own Saga Persister and I recommend that you read Udi’s post on Saga’s and play with the sample from the NServiceBus download to get a better idea about their uses.
For those interested I will be following up soon with an Entity Framework persister in a subsequent post however please note it will only be compatible with V 4.0 of Entity Framework (due to the lack of POCO support in the current version).

Share/Save/Bookmark

Flexibility and Scalability with IronPython Plug-ins and NServiceBus

August 28th, 2009 Simon Segal No comments

Well it hasn’t taken long to get some excellent bang for buck from IronPython. The entire team with whom I currently work are all furiously learning Python courtesy of an almost Googlesque free learning time and I have already documented my learning path with respect to Python here before.

As I said at the top of this post, we are already reaping some benefit and there is certainly not an Python guru amongst us (yet). Most recently we were asked for a fairly trivial application (as far as complexity goes) but one that was very valuable to the business nonetheless. Without going into the gory details, what was requested was a desktop application that digitally processed documents (imaging) and produced the output fast, and I mean really fast. The last part of the request was for a design that would allow the document list to be compiled from almost any source, database, xml, Microsoft Office files, txt, etc etc. Let’s deal with each of these concerns in order of their being mentioned.

Steroidal Processing Speed

This part was really not that hard to design and the choice was a simple one, in one word, NServiceBus. What was needed essentially was many workers all doing the same work on a given set of files and this of course mapped brilliantly to one way messaging using NServiceBus and it’s distributor to manage balancing the load amongst the workers. By spreading out the work across any number of worker nodes, our project managers could scale up and down according to their needs to adhere to a given SLA. If you want to read more about the Distributor in NServiceBus please check out Ayende’s review.

ip_nsb_arch2

Convention to the Rescue

As I said earlier, we were asked to provide a method of very quickly adding or “plugging in” new ways to access data that would produce the list of files for processing  and furthermore this needed to work without needing to recompile any part of the application. Given these conditions and the convention that our messages were always derived from data that included a list of files to be processed, it was pretty clear that IronPython was a perfect fit.

IronPython Plugin UI

The UI itself was built using C# with the exception that the list box contents would be populated by an IronPython script and the same script would call-back into the C# code with the list of results (file paths). Having IronPython scripts call-back into managed code is trivial, you just need to pass a function from the executing managed assembly and have that function stored as a variable on the executing Script Scope. Then you just call the function. For example:

public void callback()
{
    Debug.WriteLine(“Test Callback method got called”);
}

public string funcback(string calledwith)
{
    Console.WriteLine(“Test Callback function got called with “ +
        calledwith);
    return calledwith;
}
private void SetDefaultScopeVariables(ScriptScope scope)
{
    scope.SetVariable(“test_callback_method”,
        new Action(callback));
    scope.SetVariable(“test_callback_function”,
        new Func<string, string>(funcback));
}
Call the function call-back from the code from IronPython.

ip_nsb_callback_nsb

What the sketch of the UI (above) points out (see the red arrow), is that by dropping scripts into the application we were able to load up any number of ways in reading data to produce the list of files that would be used in creating our messages (sent via the distributor). So by simply dropping in a new IronPython script, another item would be loaded into the list box, offering a new method of accessing data requiring processing. Every scripted plug-in would provide an input, a description and it’s own particular method of accessing data (which could be anything).

On a side note, the UI sketch was produced using SketchFlow in Expression Blend 3 which looks quite promising.

Share/Save/Bookmark

Creative Commons Attribution-ShareAlike 2.5 Australia
Creative Commons Attribution-ShareAlike 2.5 Australia