Living in the Tech Avalanche Generation

A practitioner’s introspective on technology

A Linq To SQL Saga Persister for NServiceBus

NServiceBus is becoming a very popular Open Source development framework and out of the box it supports some very elegant feature extensibility points (thanks Udi) that have been designed very deliberately in a technology agnostic way. This allows us as developers to write our own implementations of certain features in the technology of our choice, and the subject of this post focuses on extending the Saga Persister. However at this point let me just say that it may well be worth your while to read Udi’s post on Saga’s before continuing and then come back here to finish.

The Saga Persister

It may be premature to discuss the Saga Persister without first touching on that which begat it, namely the Saga. I like to think of Saga’s as a long running stateful services, with methods for entry points that handle messages. You can also think of a Saga as a class that contains a set of methods, each one handling a different message type, where the entire Saga, it’s methods and all the logic contained within, accomplish a set of work over time. These methods make up the contract of what is ultimately a stateful service. For a lengthy discussion of Saga’s I recommend reading more from Udi here. In the meantime here’s a somewhat elided example:

public class RequestLeaveSaga : Saga<RequestLeaveSagaData>,
        ISagaStartedBy<IAnnualLeaveRequest>,
        IMessageHandler<IAuthorizationForAnnualLeaveResponse>,
        IMessageHandler<ICancelAnnualLeaveRequest>
{
    public override void Timeout(object state)
    {
        //…timeout logic here
    }

    public void Handle(IAnnualLeaveRequest message)
    {
        Data.Id = Guid.NewGuid();
        Data.LeaveRequestId = message.LeaveRequestId;
        Data.EmployeeId = message.EmployeeId;

        var leaveRequestForAuthorization =
            Bus.CreateInstance<IReqeustingAnnualLeaveAuthorization>(
            m =>
                {
                    m.EmployeeId = message.EmployeeId;
                    m.FirstDayOfLeave = message.LeaveStartsAt;
                    m.NumberOfDaysRequested =
                        Data.CalculateDays(message.LeaveStartsAt,
                            message.LeaveEndsAt);
                    m.TimeOfRequestSubmission = message.TimeOfRequest;
                    m.SagaId = Data.Id;
                });
        Bus.Send(leaveRequestForAuthorization);
    }

    public void Handle(IAuthorizationForAnnualLeaveResponse message)
    {
        //…handles the response message from a HR authorization service
    }

    public void Handle(ICancelAnnualLeaveRequest message)
    {
        //…handles cancel message to invalidate the request for leave
    }
}

The key to understanding the Saga Persister is linked directly to the statefulness of the Saga itself, where the job of the persister is to initially hydrate the Saga’s  state to storage (persistent or otherwise), so that when one of it’s subsequent message handlers (methods in its contract) are called, it can be re-hydrated from the storage. Saga’s contain a property named Data, which we can see (above) has been referred to in the body of the Handle method for the IAnnualLeaveRequest message. The underlying type for the Data property is determined generically in the signature for Saga<T> where T is an ISagaEntity. When persisting Saga state (to storage), NServiceBus will use the values it finds in the instance of <T> (the ISagaEntity reference in the Data property).

To better understand how this works lets look at the details specified in configuration of the Bus using the NServiceBus fluent interface for configuration, which resides in the process hosting the Saga. The following example is derived from the configuration of the Bus as demonstrated in the NServiceBus “Manufacturing” sample project.  In this example (below) I have substituted the “out of the box” persister (backed by NHibernate) with a Linq To SQL implementation.

try
{
    DataContext sessionFactory = GetContext();

    var bus = NServiceBus.Configure.With()
        .SpringBuilder(
            (cfg =>
            {
                cfg.ConfigureComponent<OrderSagaFinder>
                    (ComponentCallModelEnum.None)
                    .SessionFactory = sessionFactory;
            }))
        .XmlSerializer()
        .MsmqTransport()
            .IsTransactional(true)
            .PurgeOnStartup(false)
        .DbSubscriptionStorage()
            .Table(“Subscriptions”)
            .SubscriberEndpointColumnName(“SubscriberEndpoint”)
            .MessageTypeColumnName(“MessageType”)
        .Sagas()
        .LinqToSqlSagaPersister<OrderSagaData>(sessionFactory)
        .UnicastBus()
            .ImpersonateSender(false)
            .LoadMessageHandlers(
                First<GridInterceptingMessageHandler>
                    .Then<SagaMessageHandler>()
             )
        .CreateBus()
        .Start();
}
catch (Exception e)
{
    LogManager.GetLogger(“hello”).Fatal(“Exiting”, e);
}

There are two main areas of this configuration code that we should focus on. Firstly when we configure the nominated Inversion of Control container we can specify which Saga Finder to have configured and inject the required LINQ To SQL DataContext. Further down in this usage of the fluent interface I have specified that this endpoint supports Sagas and that they should use the LINQ To SQL Saga Persister and also that the same DataContext should be injected.

.Sagas()
.LinqToSqlSagaPersister<OrderSagaData>(sessionFactory)
Another extensibility point that needs to be addressed in order for the LinqToSqlSagaPersister to show up in the fluent interface API, demands that an implementation of an Extension Method is required for any new custom persister. Let’s look at the Extension method that makes the LinqToSqlSagaPersister available:
namespace NServiceBus
{
    public static class ConfigureLinqToSqlSagaPersister
    {
        public static Configure LinqToSqlSagaPersister<T>(this Configure config,
            DataContext sessionFactory) where T : class, ISagaEntity
        {
            config.Configurer
                .ConfigureComponent<LinqToSqlSagaPersister<T>>
                        (ComponentCallModelEnum.Singlecall)
                .SessionFactory = sessionFactory;

            return config;
        }
    }
}
Out of the box NServiceBus comes with an NHibernate backed Saga Persister that developers can freely use with very predictable and stable results. However, its possible that you may wish to implement your own persister based on your favourite ORM technology or alternate storage.

Implement an Interface

To create your custom Saga Persister you will need to implement an interface found in the NServiceBus library, unsurprisingly this Interface is named ISagaPersister and has four method signatures that make up it’s contract definition.

public class LinqToSqlSagaPersister<T> :
        ISagaPersister where T : class, ISagaEntity
{
    public void Complete(ISagaEntity saga)
    {
        SessionFactory.GetTable<T>().DeleteOnSubmit(saga as T);
        SessionFactory.SubmitChanges();
    }

    public ISagaEntity Get(Guid sagaId)
    {
        return SessionFactory.GetTable<T>()
            .Where(s => s.Id == sagaId).Single();
    }

    public void Save(ISagaEntity saga)
    {
        SessionFactory.GetTable<T>().InsertOnSubmit(saga as T);
        SessionFactory.SubmitChanges();
    }

    public void Update(ISagaEntity saga)
    {
        SessionFactory.SubmitChanges();
    }

    public virtual DataContext SessionFactory { get; set; }
}

The four methods of an ISagaPersister are responsible for interacting with the persistence store by getting a Saga entity by it’s ID, updating an existing saga entity, saving a saga entity and setting a saga as completed and removing it from the active saga list.

The Mapping File

In order to Map the ISagaEntity to the database tables we can take advantage of the fact that LINQ To SQL supports POCO’s and simply map our entities as required.

<?xml version=”1.0″ encoding=”utf-8″?>
<Database Name=”Sagas”
          xmlns=”http://schemas.microsoft.com/linqtosql/mapping/2007″>
  <Table Name=”dbo.OrderSagaData” Member=”OrderSagaData”>
    <Type Name=”OrderService.OrderSagaData”>
      <Column Name=”Id” Member=”Id” DbType=”UniqueIdentifier NOT NULL”
              IsPrimaryKey=”true” CanBeNull=”false” />
      <Column Name=”Originator” Member=”Originator”
              DbType=”VarChar(50)” CanBeNull=”true” />
      <Column Name=”PurchaseOrderNumber” Member=”PurchaseOrderNumber”
              DbType=”VarChar(50) NOT NULL” CanBeNull=”false” />
      <Column Name=”PartnerId” Member=”PartnerId”
              DbType=”UniqueIdentifier NOT NULL” CanBeNull=”false” />
      <Column Name=”ProvideBy” Member=”ProvideBy”
              DbType=”DateTime NOT NULL” CanBeNull=”false” />
      <Association Name=”OrderSagaData_OrderSagaDataLine” Member=”Lines”
              ThisKey=”Id” OtherKey=”OrderSagaDataId”
              Storage=”_orderLines”/>
    </Type>
  </Table>
  <Table Name=”dbo.OrderSagaDataLines” Member=”OrderSagaDataLines”>
    <Type Name=”OrderService.OrderLine”>
      <Column Name=”Id” Member=”Id” DbType=”UniqueIdentifier NOT NULL”
              IsPrimaryKey=”true” CanBeNull=”false” />
      <Column Name=”OrderSagaDataId” Member=”OrderSagaDataId”
              DbType=”UniqueIdentifier NOT NULL” CanBeNull=”false” />
      <Column Name=”ProductId” Member=”ProductId”
              DbType=”UniqueIdentifier NOT NULL” CanBeNull=”false” />
      <Column Name=”Quantity” Member=”Quantity”
              DbType=”Float NOT NULL” CanBeNull=”false” />
      <Column Name=”AuthorizedQuantity” Member=”AuthorizedQuantity”
              DbType=”Float NOT NULL” CanBeNull=”false” />
      <Association Name=”OrderSagaData_OrderSagaDataLine” Member=”Order”
              ThisKey=”OrderSagaDataId” OtherKey=”Id”
              IsForeignKey=”true” Storage=”_order” />
    </Type>
  </Table>
</Database>

To make the mapping file work with the DataContext instantiation we need to do something like the following:

private static DataContext GetContext()
{
    StreamReader sr =
        new StreamReader(“D:\\SagaL2SMapping.xml”);

    //set the mapping source up
    XmlMappingSource mapping = XmlMappingSource.FromStream(sr.BaseStream);

    DataContext db = null;

    //new up a persistance repository
    db = new DataContext(“Data Source=BOOMER\\BOOM09;initial “ +
                        “catalog=Sagas;user id=sa;password=supremo”,
                         mapping);

    var loadopts = new DataLoadOptions();
    loadopts.LoadWith<OrderSagaData>(o => o.Lines);
    db.LoadOptions = loadopts;
    db.DeferredLoadingEnabled = false;

    db.Log = new DebuggerTextWriter();

    return db;
}

The Saga Finder

The final part of the puzzle is the Saga Finder which is dealt with by implementing the class IFindSagas<T>. The IFindSagas class contains a nested interface Using<M> which indicates to NServiceBus that implementers of IFindSagas<T> have the ability to find saga’s of the type <T> where the message being handled is of type <M>. Once again using the “manufacturing” sample from the NServiceBus download, I have implemented a Saga Finder using LINQ To SQL:

public class OrderSagaFinder :
    IFindSagas<OrderSagaData>.Using<OrderMessage>,
    IFindSagas<OrderSagaData>.Using<CancelOrderMessage>
{
    public OrderSagaData FindBy(OrderMessage message)
    {
        return FindBy(message.PurchaseOrderNumber, message.PartnerId);
    }

    public OrderSagaData FindBy(CancelOrderMessage message)
    {
        return FindBy(message.PurchaseOrderNumber, message.PartnerId);
    }

    public OrderSagaData FindBy(string purchaseOrderNumber, Guid partnerId)
    {
        return sessionFactory.GetTable<OrderSagaData>().Where(
            o => o.PurchaseOrderNumber == purchaseOrderNumber &&
                o.PartnerId == partnerId).FirstOrDefault();
    }

    private DataContext sessionFactory;

    public virtual DataContext SessionFactory
    {
        get { return sessionFactory; }
        set { sessionFactory = value; }
    }
}

The Using<M> interface in NServiceBus specifies a single method in it’s signature, which you probably guessed is:

T FindBy(M message)
I hope this helps if your interested in implementing your own Saga Persister and I recommend that you read Udi’s post on Saga’s and play with the sample from the NServiceBus download to get a better idea about their uses.
For those interested I will be following up soon with an Entity Framework persister in a subsequent post however please note it will only be compatible with V 4.0 of Entity Framework (due to the lack of POCO support in the current version).

Share/Save/Bookmark

3 Comments so far

  1. LINQ in Action roller September 28th, 2009 5:04 am

    LINQ and Entity Framework Posts for 9/21/2009+…

    Note: This post is updated weekly or more frequently, depending on the availability of new articles….

  2. [...] a previous post I showed in part what it would take to create a Saga Persister using LINQ To SQL, however it [...]

  3. [...] after two previous posts on the LINQ To SQL Persister I am not going to cover the same ground in terms of [...]

Leave a reply

Creative Commons Attribution-ShareAlike 2.5 Australia
Creative Commons Attribution-ShareAlike 2.5 Australia