A Linq To SQL Saga Persister for NServiceBus
NServiceBus is becoming a very popular Open Source development framework and out of the box it supports some very elegant feature extensibility points (thanks Udi) that have been designed very deliberately in a technology agnostic way. This allows us as developers to write our own implementations of certain features in the technology of our choice, and the subject of this post focuses on extending the Saga Persister. However at this point let me just say that it may well be worth your while to read Udi’s post on Saga’s before continuing and then come back here to finish.
The Saga Persister
It may be premature to discuss the Saga Persister without first touching on that which begat it, namely the Saga. I like to think of Saga’s as a long running stateful services, with methods for entry points that handle messages. You can also think of a Saga as a class that contains a set of methods, each one handling a different message type, where the entire Saga, it’s methods and all the logic contained within, accomplish a set of work over time. These methods make up the contract of what is ultimately a stateful service. For a lengthy discussion of Saga’s I recommend reading more from Udi here. In the meantime here’s a somewhat elided example:
public class RequestLeaveSaga : Saga<RequestLeaveSagaData>, ISagaStartedBy<IAnnualLeaveRequest>, IMessageHandler<IAuthorizationForAnnualLeaveResponse>, IMessageHandler<ICancelAnnualLeaveRequest> { public override void Timeout(object state) { //…timeout logic here } public void Handle(IAnnualLeaveRequest message) { Data.Id = Guid.NewGuid(); Data.LeaveRequestId = message.LeaveRequestId; Data.EmployeeId = message.EmployeeId; var leaveRequestForAuthorization = Bus.CreateInstance<IReqeustingAnnualLeaveAuthorization>( m => { m.EmployeeId = message.EmployeeId; m.FirstDayOfLeave = message.LeaveStartsAt; m.NumberOfDaysRequested = Data.CalculateDays(message.LeaveStartsAt, message.LeaveEndsAt); m.TimeOfRequestSubmission = message.TimeOfRequest; m.SagaId = Data.Id; }); Bus.Send(leaveRequestForAuthorization); } public void Handle(IAuthorizationForAnnualLeaveResponse message) { //…handles the response message from a HR authorization service } public void Handle(ICancelAnnualLeaveRequest message) { //…handles cancel message to invalidate the request for leave } }
The key to understanding the Saga Persister is linked directly to the statefulness of the Saga itself, where the job of the persister is to initially hydrate the Saga’s state to storage (persistent or otherwise), so that when one of it’s subsequent message handlers (methods in its contract) are called, it can be re-hydrated from the storage. Saga’s contain a property named Data, which we can see (above) has been referred to in the body of the Handle method for the IAnnualLeaveRequest message. The underlying type for the Data property is determined generically in the signature for Saga<T> where T is an ISagaEntity. When persisting Saga state (to storage), NServiceBus will use the values it finds in the instance of <T> (the ISagaEntity reference in the Data property).
To better understand how this works lets look at the details specified in configuration of the Bus using the NServiceBus fluent interface for configuration, which resides in the process hosting the Saga. The following example is derived from the configuration of the Bus as demonstrated in the NServiceBus “Manufacturing” sample project. In this example (below) I have substituted the “out of the box” persister (backed by NHibernate) with a Linq To SQL implementation.
try { DataContext sessionFactory = GetContext(); var bus = NServiceBus.Configure.With() .SpringBuilder( (cfg => { cfg.ConfigureComponent<OrderSagaFinder> (ComponentCallModelEnum.None) .SessionFactory = sessionFactory; })) .XmlSerializer() .MsmqTransport() .IsTransactional(true) .PurgeOnStartup(false) .DbSubscriptionStorage() .Table(“Subscriptions”) .SubscriberEndpointColumnName(“SubscriberEndpoint”) .MessageTypeColumnName(“MessageType”) .Sagas() .LinqToSqlSagaPersister<OrderSagaData>(sessionFactory) .UnicastBus() .ImpersonateSender(false) .LoadMessageHandlers( First<GridInterceptingMessageHandler> .Then<SagaMessageHandler>() ) .CreateBus() .Start(); } catch (Exception e) { LogManager.GetLogger(“hello”).Fatal(“Exiting”, e); }
There are two main areas of this configuration code that we should focus on. Firstly when we configure the nominated Inversion of Control container we can specify which Saga Finder to have configured and inject the required LINQ To SQL DataContext. Further down in this usage of the fluent interface I have specified that this endpoint supports Sagas and that they should use the LINQ To SQL Saga Persister and also that the same DataContext should be injected.
.Sagas() .LinqToSqlSagaPersister<OrderSagaData>(sessionFactory)
namespace NServiceBus { public static class ConfigureLinqToSqlSagaPersister { public static Configure LinqToSqlSagaPersister<T>(this Configure config, DataContext sessionFactory) where T : class, ISagaEntity { config.Configurer .ConfigureComponent<LinqToSqlSagaPersister<T>> (ComponentCallModelEnum.Singlecall) .SessionFactory = sessionFactory; return config; } } }
Implement an Interface
To create your custom Saga Persister you will need to implement an interface found in the NServiceBus library, unsurprisingly this Interface is named ISagaPersister and has four method signatures that make up it’s contract definition.
public class LinqToSqlSagaPersister<T> : ISagaPersister where T : class, ISagaEntity { public void Complete(ISagaEntity saga) { SessionFactory.GetTable<T>().DeleteOnSubmit(saga as T); SessionFactory.SubmitChanges(); } public ISagaEntity Get(Guid sagaId) { return SessionFactory.GetTable<T>() .Where(s => s.Id == sagaId).Single(); } public void Save(ISagaEntity saga) { SessionFactory.GetTable<T>().InsertOnSubmit(saga as T); SessionFactory.SubmitChanges(); } public void Update(ISagaEntity saga) { SessionFactory.SubmitChanges(); } public virtual DataContext SessionFactory { get; set; } }
The four methods of an ISagaPersister are responsible for interacting with the persistence store by getting a Saga entity by it’s ID, updating an existing saga entity, saving a saga entity and setting a saga as completed and removing it from the active saga list.
The Mapping File
In order to Map the ISagaEntity to the database tables we can take advantage of the fact that LINQ To SQL supports POCO’s and simply map our entities as required.
<?xml version=”1.0″ encoding=”utf-8″?> <Database Name=”Sagas” xmlns=”http://schemas.microsoft.com/linqtosql/mapping/2007″> <Table Name=”dbo.OrderSagaData” Member=”OrderSagaData”> <Type Name=”OrderService.OrderSagaData”> <Column Name=”Id” Member=”Id” DbType=”UniqueIdentifier NOT NULL” IsPrimaryKey=”true” CanBeNull=”false” /> <Column Name=”Originator” Member=”Originator” DbType=”VarChar(50)” CanBeNull=”true” /> <Column Name=”PurchaseOrderNumber” Member=”PurchaseOrderNumber” DbType=”VarChar(50) NOT NULL” CanBeNull=”false” /> <Column Name=”PartnerId” Member=”PartnerId” DbType=”UniqueIdentifier NOT NULL” CanBeNull=”false” /> <Column Name=”ProvideBy” Member=”ProvideBy” DbType=”DateTime NOT NULL” CanBeNull=”false” /> <Association Name=”OrderSagaData_OrderSagaDataLine” Member=”Lines” ThisKey=”Id” OtherKey=”OrderSagaDataId” Storage=”_orderLines”/> </Type> </Table> <Table Name=”dbo.OrderSagaDataLines” Member=”OrderSagaDataLines”> <Type Name=”OrderService.OrderLine”> <Column Name=”Id” Member=”Id” DbType=”UniqueIdentifier NOT NULL” IsPrimaryKey=”true” CanBeNull=”false” /> <Column Name=”OrderSagaDataId” Member=”OrderSagaDataId” DbType=”UniqueIdentifier NOT NULL” CanBeNull=”false” /> <Column Name=”ProductId” Member=”ProductId” DbType=”UniqueIdentifier NOT NULL” CanBeNull=”false” /> <Column Name=”Quantity” Member=”Quantity” DbType=”Float NOT NULL” CanBeNull=”false” /> <Column Name=”AuthorizedQuantity” Member=”AuthorizedQuantity” DbType=”Float NOT NULL” CanBeNull=”false” /> <Association Name=”OrderSagaData_OrderSagaDataLine” Member=”Order” ThisKey=”OrderSagaDataId” OtherKey=”Id” IsForeignKey=”true” Storage=”_order” /> </Type> </Table> </Database>
To make the mapping file work with the DataContext instantiation we need to do something like the following:
private static DataContext GetContext() { StreamReader sr = new StreamReader(“D:\\SagaL2SMapping.xml”); //set the mapping source up XmlMappingSource mapping = XmlMappingSource.FromStream(sr.BaseStream); DataContext db = null; //new up a persistance repository db = new DataContext(“Data Source=BOOMER\\BOOM09;initial “ + “catalog=Sagas;user id=sa;password=supremo”, mapping); var loadopts = new DataLoadOptions(); loadopts.LoadWith<OrderSagaData>(o => o.Lines); db.LoadOptions = loadopts; db.DeferredLoadingEnabled = false; db.Log = new DebuggerTextWriter(); return db; }
The Saga Finder
The final part of the puzzle is the Saga Finder which is dealt with by implementing the class IFindSagas<T>. The IFindSagas class contains a nested interface Using<M> which indicates to NServiceBus that implementers of IFindSagas<T> have the ability to find saga’s of the type <T> where the message being handled is of type <M>. Once again using the “manufacturing” sample from the NServiceBus download, I have implemented a Saga Finder using LINQ To SQL:
public class OrderSagaFinder : IFindSagas<OrderSagaData>.Using<OrderMessage>, IFindSagas<OrderSagaData>.Using<CancelOrderMessage> { public OrderSagaData FindBy(OrderMessage message) { return FindBy(message.PurchaseOrderNumber, message.PartnerId); } public OrderSagaData FindBy(CancelOrderMessage message) { return FindBy(message.PurchaseOrderNumber, message.PartnerId); } public OrderSagaData FindBy(string purchaseOrderNumber, Guid partnerId) { return sessionFactory.GetTable<OrderSagaData>().Where( o => o.PurchaseOrderNumber == purchaseOrderNumber && o.PartnerId == partnerId).FirstOrDefault(); } private DataContext sessionFactory; public virtual DataContext SessionFactory { get { return sessionFactory; } set { sessionFactory = value; } } }
The Using<M> interface in NServiceBus specifies a single method in it’s signature, which you probably guessed is:
T FindBy(M message)
3 Comments so far
Leave a reply









LINQ and Entity Framework Posts for 9/21/2009+…
Note: This post is updated weekly or more frequently, depending on the availability of new articles….
[...] a previous post I showed in part what it would take to create a Saga Persister using LINQ To SQL, however it [...]
[...] after two previous posts on the LINQ To SQL Persister I am not going to cover the same ground in terms of [...]