Distributed Transactions via the Saga Pattern in C#

Distributed Transactions via the Saga Pattern in C#

Implementing Choreography-based Saga

Introduction

(The code for this article is available here for reference.)

Imagine we have a class called orderservice responsible for processing an order. The steps involved are i: creating the order, ii: processing the payment, and iii: triggering the shipping. Peradventure, a failure happens in step ii (while processing the payment); we would need to roll back the created order in step i and notify the user of the failure to process the order.

In a monolith architecture, this problem can be easily solved. We could simply wrap all the data layer code for performing these operations in a database transaction. That way, they all succeed together or fail together.

But in a distributed architecture where our system comprises multiple services, each potentially having its respective databases, creating such ACID database transactions becomes highly complex. This is the problem the saga pattern solves. It proposes a way to perform transactions in a system comprising distributed services.

The Saga pattern is a design pattern used in distributed systems and microservices architectures to manage complex business transactions without relying on distributed ACID transactions, which can be difficult to implement and scale in such environments.

A saga is a sequence of local transactions that are coordinated using a set of compensating transactions to ensure data consistency across services. Each service involved in the saga performs its part of the transaction and publishes events to notify other services about the state of the transaction. If a failure occurs at any point in the saga, compensating transactions are executed to undo the changes made by previous steps.

The Saga pattern relies on eventual consistency, meaning that although the system might be temporarily inconsistent during the saga execution, it will eventually reach a consistent state.

There are two main approaches to implementing sagas:

  1. Choreography-based Saga: In this approach, each service involved in the saga publishes events to communicate the state of the transaction. Other services subscribe to these events and perform their part of the transaction based on the received events.

  2. Orchestration-based Saga: In this approach, a central orchestrator coordinates the saga. The orchestrator is responsible for sequencing the steps of the saga, invoking the appropriate services, and handling compensating transactions if a failure occurs.

In this article, we will be implementing the choreography-based approach. Our implementation would be done in C#, with AWS SQS as our message broker.

Before proceeding, it's important to have a basic understanding of CQRS and AWS SQS. Please refer to my blog articles below for more information.

Let's get started!


Implementation

  1. Set up a new messaging queue on AWS SQS called. kindly see the tutorial Asynchronous Communication In Microservices Via C# and AWS SQS for more information on how to do so

  2. Create new Solution and Projects. I'll be using Rider IDE, you can also use Visual Studio or the most preferred IDE. Create three new ASP.NET web API projects: OrdersService, PaymentService, and ShippingService

  3. Create a new library SagaPattern.Commons . Which will have some code that will be shared across all our services. Add the following files to the library.

    Now, let's fill out the files in our shared library.

    • Extensions.cs Contains some extension methods to facilitate our code.

        using System.Reflection;
        using Microsoft.AspNetCore.Builder;
        using Microsoft.Extensions.DependencyInjection;
      
        namespace SagaPattern.Commons;
      
        public static class Extensions
        {
            public static IApplicationBuilder ListenForSqsEvents(this IApplicationBuilder builder, string[] events)
            {
                var serviceProvider = builder.ApplicationServices;
                var eventListener = serviceProvider.GetRequiredService<IEventListener>();
                Task.Run(() => eventListener.Listen(events, new CancellationToken()));
      
                return builder;
            }
            public static IServiceCollection AddCommandHandlers(this IServiceCollection collection, Type assemblyType)
            {
                if (assemblyType == null) throw new ArgumentNullException(nameof(assemblyType));
                var assembly = assemblyType.Assembly;
                var scanType = typeof(ICommandHandler<>);
      
                RegisterScanTypeWithImplementations(collection, assembly, scanType, ServiceLifetime.Scoped);
      
                return collection;
            }
      
            public static IServiceCollection AddEventHandlers(this IServiceCollection collection, Type assemblyType)
            {
                if (assemblyType == null) throw new ArgumentNullException(nameof(assemblyType));
                var assembly = assemblyType.Assembly;
                var scanType = typeof(IEventHandler<>);
      
                RegisterScanTypeWithImplementations(collection, assembly, scanType, ServiceLifetime.Scoped);
      
                return collection;
            }
      
            private static void RegisterScanTypeWithImplementations(IServiceCollection collection, Assembly assembly, Type scanType,  ServiceLifetime lifetime)
            {
                var commandHandlers = ScanTypes(assembly, scanType);
      
                foreach (var handler in commandHandlers)
                {
                    var abstraction = handler.GetTypeInfo().ImplementedInterfaces
                        .First(type => type.IsGenericType && type.GetGenericTypeDefinition() == scanType);
      
                    switch (lifetime)
                    {
                        case ServiceLifetime.Singleton:
                            collection.AddSingleton(abstraction, handler);
                            break;
                        case ServiceLifetime.Scoped:
                            collection.AddScoped(abstraction, handler);
                            break;
                        case ServiceLifetime.Transient:
                            collection.AddTransient(abstraction, handler);
                            break;
                        default:
                            throw new ArgumentException("Invalid service lifetime specified.");
                    }
                }
            }
      
            private static IEnumerable<Type> ScanTypes(Assembly assembly, Type typeToScanFor)
            {
                return assembly.GetTypes()
                    .Where(type => type is
                                   {
                                       IsClass: true,
                                       IsAbstract: false
                                   } &&
                                   type.GetInterfaces()
                                       .Any(i=>i.IsGenericType && i.GetGenericTypeDefinition() == typeToScanFor));
            }
        }
      
    • ICommand.cs , IMessage, and IEvent.cs are marker interfaces.

        namespace SagaPattern.Commons;
      
        public interface ICommand : IMessage
        {
        }
      
        namespace SagaPattern.Commons;
      
        public interface IEvent:IMessage
        {
        }
      
        namespace SagaPattern.Commons;
      
        public interface IMessage
        {
        }
      
    • ICommandHandler.cs , IEventHandler , ISqsMessenger.cs , IEventListener.cs

      The purposes of these interfaces are self-explanatory from their names.

        namespace SagaPattern.Commons;
      
        public interface IEventListener
        {
            Task Listen(string[] events, CancellationToken token);
        }
      
        namespace SagaPattern.Commons;
      
        public interface ICommandHandler<in TCommand> where TCommand: ICommand
        {
            Task HandleAsync(TCommand command);
        }
      
        namespace SagaPattern.Commons;
      
        public interface IEventHandler<in T> where T : IEvent
        {
            Task HandleAsync(T @event);
        }
      
        using Amazon.SQS.Model;
      
        namespace SagaPattern.Commons;
      
        public interface ISqsMessenger
        {
            Task<SendMessageResponse> SendMessageAsync<T>(T message);
        }
      
    • We will now implement our ISqsMessager interface in SqsMessenger .This class would also implement IEventListener , and would be responsible for receiving and transmitting messages to AWS SQS.

        using System.Diagnostics.CodeAnalysis;
        using System.Reflection;
        using Amazon.SQS;
        using Amazon.SQS.Model;
        using Ardalis.GuardClauses;
        using Microsoft.Extensions.DependencyInjection;
        using Microsoft.Extensions.Logging;
        using Microsoft.Extensions.Options;
        using JsonSerializer = System.Text.Json.JsonSerializer;
      
        namespace SagaPattern.Commons;
      
        public class SqsMessenger : ISqsMessenger, IEventListener
        {
            private readonly IAmazonSQS _amazonSqs;
            private readonly IOptions<QueueSettings> _queueSettings;
            private readonly IServiceProvider _serviceProvider;
            private readonly ILogger<SqsMessenger> _logger;
      
            public SqsMessenger(
                IAmazonSQS amazonSqs,
                IOptions<QueueSettings> queueSettings,
                IServiceProvider serviceProvider,
                ILogger<SqsMessenger> logger
            )
            {
                _amazonSqs = amazonSqs;
                _queueSettings = queueSettings;
                _serviceProvider = serviceProvider;
                _logger = logger;
            }
      
            public async Task<SendMessageResponse> SendMessageAsync<T>([DisallowNull] T message)
            {
                if (message == null) throw new ArgumentNullException(nameof(message));
      
                var queueUrl = await _amazonSqs.GetQueueUrlAsync(_queueSettings.Value.QueueName);
                var request = new SendMessageRequest()
                {
                    QueueUrl = queueUrl.QueueUrl,
                    MessageBody = JsonSerializer.Serialize(message),
                    MessageAttributes = new Dictionary<string, MessageAttributeValue>
                    {
                        {
                            "MessageType", new MessageAttributeValue()
                            {
                                DataType = "String",
                                StringValue = typeof(T).Name
                            }
                        }
                    }
                };
                var result = await _amazonSqs.SendMessageAsync(request);
                _logger.LogInformation($"published {typeof(T).Name} event. Message: {request.MessageBody} to sqs");
      
                return result;
            }
      
            public async Task Listen(string[] eventsListenedFor, CancellationToken stoppingToken)
            {
                Guard.Against.Null(eventsListenedFor);
      
                var queueUrl = await _amazonSqs.GetQueueUrlAsync(_queueSettings.Value.QueueName, stoppingToken);
                if (queueUrl == null) throw new ApplicationException("cannot find the specified queue");
      
                var request = new ReceiveMessageRequest
                {
                    AttributeNames = new List<string>() { "All" },
                    MessageAttributeNames = new List<string>() { "All" },
                    QueueUrl = queueUrl.QueueUrl,
                };
      
                while (!stoppingToken.IsCancellationRequested)
                {
                    var response = await _amazonSqs.ReceiveMessageAsync(request, stoppingToken);
      
                    foreach (var message in response.Messages)
                    {
                        var eventType = $"{message.MessageAttributes["MessageType"].StringValue}";
                        if (!eventsListenedFor.Contains(eventType, StringComparer.InvariantCultureIgnoreCase))
                            continue;
      
                        _logger.LogInformation($"received {eventType} event. Message: {message.Body} from sqs");
      
                        var type = Assembly.GetEntryAssembly()?.GetTypes().FirstOrDefault(t => t.Name == eventType);
      
                        try
                        {
                            var body = (IMessage)JsonSerializer.Deserialize(message.Body, type!)!;
      
                            // initialize and call our event handler 
                            var messageHandlerType = typeof(IEventHandler<>).MakeGenericType(type!);
                            using var scope = _serviceProvider.GetRequiredService<IServiceScopeFactory>().CreateScope();
      
                            var handler = scope.ServiceProvider.GetRequiredService(messageHandlerType);
                            var task = (Task)handler.GetType().GetMethod("HandleAsync", new[] { type! })
                                ?.Invoke(handler, new[] { body });
      
                            await task!.ConfigureAwait(false);
                                _ = task.GetType().GetProperty("Result");
      
                            await _amazonSqs.DeleteMessageAsync(new DeleteMessageRequest
                            {
                                QueueUrl = queueUrl.QueueUrl,
                                ReceiptHandle = message.ReceiptHandle
                            }, stoppingToken);
                        }
                        catch (Exception e)
                        {
                            // _logger.LogError(e, "an exception occurred handling the message: {message}");
                        }
                    }
                }
            }
        }
      
    • Our QueueSettings class represents our configuration for our AWS SQS queue.

        namespace SagaPattern.Commons;
      
        public class QueueSettings
        {
            public required string QueueName { get; set; }
        }
      
  4. Now, let's build our services.

    • We would commence first with our Orders service: Orders.Api

      • Add the following folders and files (where they don't exist) to our Orders.Api project.

      • The Commands folder contains classes that are Command and their handlers, respectively.

          using SagaPattern.Commons;
          using OrdersService.Entities;
        
          namespace OrdersService.Commands;
        
          public class CreateOrderCommand : ICommand
          {
              public Order Order { get; }
        
              public CreateOrderCommand(Order order)
              {
                  Order = order;
              }
          }
        
          using Ardalis.GuardClauses;
          using SagaPattern.Commons;
          using OrdersService.Data;
          using OrdersService.Events;
        
          namespace OrdersService.Commands;
        
          public class CreateOrderCommandHandler : ICommandHandler<CreateOrderCommand>
          {
              private readonly IOrdersRepository _ordersRepository;
              private readonly ISqsMessenger _sqsMessenger;
              private readonly ILogger<CreateOrderCommandHandler> _logger;
        
              public CreateOrderCommandHandler(IOrdersRepository ordersRepository, ISqsMessenger sqsMessenger, ILogger<CreateOrderCommandHandler> logger)
              {
                  _ordersRepository = ordersRepository;
                  _sqsMessenger = sqsMessenger;
                  _logger = logger;
              }
              public async Task HandleAsync(CreateOrderCommand command)
              {
                  Guard.Against.Null(command);
                  command.Order.Id = Guid.NewGuid();
        
                  await _ordersRepository.CreateOrderAsync(command.Order);
                  _logger.LogInformation($"inserted order with id: {command.Order.Id} into db");
                  // persisting data to our event store
                  await _sqsMessenger.SendMessageAsync(new OrderCreatedEvent
                  {
                      Currency = command.Order.Currency,
                      Amount = command.Order.Amount,
                      OrderId = command.Order.Id
                  });
              }
          }
        
          using SagaPattern.Commons;
        
          namespace OrdersService.Commands;
        
          public class DeleteOrderCommand : ICommand
          {
              public DeleteOrderCommand(Guid orderId)
              {
                  OrderId = orderId;
              }
        
              public Guid OrderId { get; }
          }
        
          using Ardalis.GuardClauses;
          using SagaPattern.Commons;
          using OrdersService.Data;
        
          namespace OrdersService.Commands;
        
          public class DeleteOrderCommandHandler : ICommandHandler<DeleteOrderCommand>
          {
              private readonly IOrdersRepository _ordersRepository;
              private readonly ILogger<DeleteOrderCommandHandler> _logger;
        
              public DeleteOrderCommandHandler(IOrdersRepository ordersRepository, ILogger<DeleteOrderCommandHandler> logger)
              {
                  _ordersRepository = ordersRepository;
                  _logger = logger;
              }
        
              public async Task HandleAsync(DeleteOrderCommand command)
              {
                  Guard.Against.Null(command);
        
                  await _ordersRepository.DeleteOrderAsync(command.OrderId);
                  _logger.LogInformation($"deleted order with id: {command.OrderId} from db");
              }
          }
        
      • The Data contains code related to integration and access to our database layer.

          using Microsoft.EntityFrameworkCore;
          using OrdersService.Entities;
        
          namespace OrdersService.Data;
        
          public class AppDbContext : DbContext
          {
              public AppDbContext(DbContextOptions<AppDbContext> options) : base(options)
              {
              }
        
              public DbSet<Order> Orders { get; set; }
          }
        
          using OrdersService.Entities;
        
          namespace OrdersService.Data;
        
          public interface IOrdersRepository
          {
              Task CreateOrderAsync(Order order);
              Task DeleteOrderAsync(Guid orderId);
          }
        
          using Ardalis.GuardClauses;
          using OrdersService.Entities;
        
          namespace OrdersService.Data;
        
          public class OrdersRepository : IOrdersRepository
          {
              private readonly AppDbContext _dbContext;
        
              public OrdersRepository(AppDbContext dbContext)
              {
                  _dbContext = dbContext;
              }
              public async Task CreateOrderAsync(Order order)
              {
                  Guard.Against.Null(order);
        
                  await _dbContext.Orders.AddAsync(order);
                  await _dbContext.SaveChangesAsync();
              }
        
              public async Task DeleteOrderAsync(Guid orderId)
              {
                  Guard.Against.Default(orderId);
        
                  var order = _dbContext.Orders.FirstOrDefault(order => order.Id == orderId);
                  if (order is {})
                  {
                      _dbContext.Orders.Remove(order);
                      await _dbContext.SaveChangesAsync();
                  }
              }
          }
        
      • The Dtos folder contains POCO classes, which act as data transfer objects.

          namespace OrdersService.Dtos;
        
          public class OrderForCreateDto
          {
              public decimal Amount { get; set; }
              public string Currency { get; set; }
              public string CustomerEmail { get; set; }
          }
        
      • The Entities folder contains the entities for our services.

          namespace OrdersService.Entities;
        
          public class Order
          {
              public Guid Id { get; set; }
              public decimal Amount { get; set; }
              public string Currency { get; set; }
              public string CustomerEmail { get; set; }
          }
        
      • The Events folder contains classes that can be triggered as events.

          using SagaPattern.Commons;
        
          namespace OrdersService.Events;
        
          public class OrderCreatedEvent : IEvent
          {
              public string Currency { get; set; }
              public decimal Amount { get; set; }
              public Guid OrderId { get; set; }
          }
        
      • The Events/ExternalEvents folder contains classes that correspond to events that occurred outside of this service.

          using SagaPattern.Commons;
        
          namespace OrdersService.Events.ExternalEvents;
        
          public class PaymentCancelledEvent : IEvent
          {
              public Guid OrderId { get; set; }
              public Guid PaymentDetailsId { get; set; }
          }
        
          using Ardalis.GuardClauses;
          using OrdersService.Commands;
          using SagaPattern.Commons;
        
          namespace OrdersService.Events.ExternalEvents;
        
          public class PaymentDeletedEventHandler : IEventHandler<PaymentCancelledEvent>
          {
              private readonly ICommandHandler<DeleteOrderCommand> _commandHandler;
        
              public PaymentDeletedEventHandler(ICommandHandler<DeleteOrderCommand> commandHandler)
              {
                  _commandHandler = commandHandler;
              }
        
              public async Task HandleAsync(PaymentCancelledEvent @event)
              {
                  Guard.Against.Null(@event);
        
                  await _commandHandler.HandleAsync(new DeleteOrderCommand(@event.OrderId));
              }
          }
        
      • AppSettings.json contains relevant configuration settings.

          {
            "Logging": {
              "LogLevel": {
                "Default": "Information",
                "Microsoft.AspNetCore": "Warning"
              }
            },
            "AllowedHosts": "*",
            "QueueSettings":{
              "QueueName": "saga-pattern"
            },
            "ConnectionStrings": {
              "DefaultConnection": "Data Source = ./orders.db"
            }
          }
        
      • MappingProfile.cs defines rules for AutoMapper to map our entities to DTOs and vice-versa.

          using AutoMapper;
          using OrdersService.Dtos;
          using OrdersService.Entities;
        
          namespace OrdersService;
        
          public class MappingProfile: Profile
          {
              public MappingProfile()
              {
                  CreateMap<Order, OrderForCreateDto>().ReverseMap();
              }
          }
        
      • Finally Program.cs , our application's entry point, which contains configuration for our DI pipeline, middleware, and endpoints created as .NET minimal APIs.

          using Amazon.SQS;
          using AutoMapper;
          using SagaPattern.Commons;
          using Microsoft.AspNetCore.Mvc;
          using Microsoft.EntityFrameworkCore;
          using OrdersService.Commands;
          using OrdersService.Data;
          using OrdersService.Dtos;
          using OrdersService.Entities;
          using OrdersService.Events.ExternalEvents;
        
          var builder = WebApplication.CreateBuilder(args);
        
          // Add services to the container.
        
          builder.Services.AddControllers();
          builder.Services.AddEventHandlers(typeof(Program));
          builder.Services.AddCommandHandlers(typeof(Program));
        
          // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
          builder.Services.AddEndpointsApiExplorer();
          builder.Services.AddSwaggerGen();
        
          builder.Services.AddAutoMapper(typeof(Program));
          builder.Services.AddDbContext<AppDbContext>(options =>
          {
              options.UseSqlite(builder.Configuration.GetConnectionString("DefaultConnection"));
          });
          builder.Services.AddScoped<IOrdersRepository, OrdersRepository>();
          builder.Services.AddScoped<ISqsMessenger, SqsMessenger>();
          builder.Services.AddSingleton<IAmazonSQS, AmazonSQSClient>();
          builder.Services.AddSingleton<IEventListener, SqsMessenger>();
          builder.Services.Configure<QueueSettings>(builder.Configuration.GetSection("QueueSettings"));
        
          var app = builder.Build();
        
          using var serviceScope = app.Services.CreateScope();
          var dbContext = serviceScope.ServiceProvider.GetRequiredService<AppDbContext>();
          await dbContext.Database.MigrateAsync();
        
          app.ListenForSqsEvents(new[] { nameof(PaymentCancelledEvent) });
        
          // Configure the HTTP request pipeline.
          if (app.Environment.IsDevelopment())
          {
              app.UseSwagger();
              app.UseSwaggerUI();
          }
        
          app.UseHttpsRedirection();
          app.UseAuthorization();
        
          app.MapPost("api/orders",
              async (
                  [FromBody] OrderForCreateDto orderForCreateDto, 
                  [FromServices] IMapper mapper,
                  [FromServices] ICommandHandler<CreateOrderCommand> commandHandler) =>
              {
                  var order = mapper.Map<Order>(orderForCreateDto);
                  await commandHandler.HandleAsync(new CreateOrderCommand(order));
        
                  return StatusCodes.Status201Created;
              });
        
          app.MapDelete("api/orders/{orderId:guid}",
              async (
                  [FromServices] ICommandHandler<DeleteOrderCommand> commandHandler,
                  [FromRoute] Guid orderId) =>
              {
                  await commandHandler.HandleAsync(new DeleteOrderCommand(orderId));
        
                  return Results.NoContent();
              });
        
          app.Run();
        
    • Next, we would implement our Payments service: Payments.Api

      • Add the following folders and files (where they don't exist) to our Orders.Api project.

        The explanation of the folders (and files where applicable): Commands, Events, Data, etc., is the same as those stated in the Orders.Api.

      • Let's add the following code to their respective files.

          using SagaPattern.Commons;
        
          namespace Payments.Api.Commands;
        
          public class DeletePaymentCommand : ICommand
          {
              public Guid PaymentId { get; }
        
              public DeletePaymentCommand(Guid paymentId)
              {
                  PaymentId = paymentId;
              }
          }
        
          using Ardalis.GuardClauses;
          using Payments.Api.Events;
          using SagaPattern.Commons;
          using Payments.Api.Interfaces;
        
          namespace Payments.Api.Commands;
        
          public class DeletePaymentCommandHandler : ICommandHandler<DeletePaymentCommand>
          {
              private readonly IPaymentRepository _paymentRepository;
              private readonly ISqsMessenger _sqsMessenger;
              private readonly ILogger<DeletePaymentCommandHandler> _logger;
        
              public DeletePaymentCommandHandler(
                  IPaymentRepository paymentRepository, 
                  ISqsMessenger sqsMessenger,
                  ILogger<DeletePaymentCommandHandler> logger)
              {
                  _paymentRepository = paymentRepository;
                  _sqsMessenger = sqsMessenger;
                  _logger = logger;
              }
              public async Task HandleAsync(DeletePaymentCommand command)
              {
                  Guard.Against.Null(command);
        
                  await _paymentRepository.DeletePaymentDetailsAsync(command.PaymentId);
        
                  _logger.LogInformation($"deleted payment with id: {command.PaymentId} from db");
              }
          }
        
          using Payments.Api.Entities;
          using SagaPattern.Commons;
        
          namespace Payments.Api.Commands;
        
          public class ProcessPaymentCommand : ICommand
          {
              public PaymentDetail PaymentDetails { get; }
              public ProcessPaymentCommand(PaymentDetail paymentDetails)
              {
                  PaymentDetails = paymentDetails;
              }
          }
        
          using Ardalis.GuardClauses;
          using SagaPattern.Commons;
          using Payments.Api.Enums;
          using Payments.Api.Events;
          using Payments.Api.Interfaces;
        
          namespace Payments.Api.Commands;
        
          public class ProcessPaymentCommandHandler : ICommandHandler<ProcessPaymentCommand>
          {
              private readonly IPaymentRepository _paymentRepository;
              private readonly IPaymentProcessor _paymentProcessor;
              private readonly ISqsMessenger _sqsMessenger;
              private readonly ILogger<ProcessPaymentCommandHandler> _logger;
        
              public ProcessPaymentCommandHandler(IPaymentRepository paymentRepository,
                  IPaymentProcessor paymentProcessor,
                  ISqsMessenger sqsMessenger,
                  ILogger<ProcessPaymentCommandHandler> logger)
              {
                  _paymentRepository = paymentRepository;
                  _paymentProcessor = paymentProcessor;
                  _sqsMessenger = sqsMessenger;
                  _logger = logger;
              }
        
              public async Task HandleAsync(ProcessPaymentCommand command)
              {
                  Guard.Against.Null(command);
        
                  try
                  {
                      command.PaymentDetails.Status = PaymentStatus.UnPaid;
        
                      var paymentProcessingResult = await _paymentProcessor.ProcessPaymentAsync(command.PaymentDetails);
                      if (!paymentProcessingResult) throw new ApplicationException();
        
                      command.PaymentDetails.Status = PaymentStatus.Paid;
        
                      await _paymentRepository.CreatePaymentDetailsAsync(command.PaymentDetails);
                      _logger.LogInformation($"inserted payment with id: {command.PaymentDetails.Id} into db");
        
                      await _sqsMessenger.SendMessageAsync(new PaymentProcessedEvent(command.PaymentDetails.Id,
                          command.PaymentDetails.OrderId));
                  }
                  catch (Exception e)
                  {
                      _logger.LogError(e, "an error has occurred while processing payment");
                      await _paymentRepository.DeletePaymentDetailsAsync(command.PaymentDetails.Id);
                      await _sqsMessenger.SendMessageAsync(new PaymentCancelledEvent()
                      {
                          OrderId = command.PaymentDetails.OrderId,
                      });
                  }
              }
          }
        
          using Microsoft.EntityFrameworkCore;
          using Payments.Api.Entities;
        
          namespace Payments.Api.Data;
        
          public class AppDbContext : DbContext
          {
              public AppDbContext(DbContextOptions<AppDbContext> options) : base(options)
              {
              }
        
              public DbSet<PaymentDetail> PaymentDetails { get; set; }
          }
        
          namespace Payments.Api.Dtos;
        
          public class PaymentDetailsForCreateDto
          {
              public Guid OrderId { get; set; }
              public double Amount { get; set; }
              public string Currency { get; set; }
          }
        
          using Payments.Api.Enums;
        
          namespace Payments.Api.Entities;
        
          public class PaymentDetail
          {
              public Guid Id { get; set; }
              public Guid OrderId { get; set; }
              public double Amount { get; set; }
              public string Currency { get; set; }
              public PaymentStatus Status { get; set; }
          }
        
          namespace Payments.Api.Enums;
        
          public enum PaymentStatus
          {
              UnPaid,
              Paid,
          }
        
          using SagaPattern.Commons;
        
          namespace Payments.Api.Events.ExternalEvents;
        
          public class OrderCreatedEvent : IEvent
          {
              public string Currency { get; set; }
              public double Amount { get; set; }
              public Guid OrderId { get; set; }
          }
        
          using Payments.Api.Commands;
          using Payments.Api.Entities;
          using Payments.Api.Enums;
          using SagaPattern.Commons;
        
          namespace Payments.Api.Events.ExternalEvents;
        
          public class OrderCreatedEventHandler : IEventHandler<OrderCreatedEvent>
          {
              private readonly ICommandHandler<ProcessPaymentCommand> _handler;
        
              public OrderCreatedEventHandler(ICommandHandler<ProcessPaymentCommand> handler)
              {
                  _handler = handler;
              }
        
              public async Task HandleAsync(OrderCreatedEvent @event)
              {
                  await _handler.HandleAsync(new ProcessPaymentCommand(new PaymentDetail
                  {
                      OrderId = @event.OrderId,
                      Amount = @event.Amount,
                      Currency = @event.Currency,
                      Status = PaymentStatus.UnPaid
                  }));
              }
          }
        
          using SagaPattern.Commons;
        
          namespace Payments.Api.Events.ExternalEvents;
        
          public class ShippingCancelledEvent : IEvent
          {
              public Guid OrderId { get; set; }
              public Guid PaymentDetailsId { get; set; }
          }
        
          using Payments.Api.Commands;
          using SagaPattern.Commons;
        
          namespace Payments.Api.Events.ExternalEvents;
        
          public class ShippingCancelledEventHandler : IEventHandler<ShippingCancelledEvent>
          {
              private readonly ICommandHandler<DeletePaymentCommand> _deletePaymentCommandHandler;
              private readonly ISqsMessenger _sqsMessenger;
        
              public ShippingCancelledEventHandler(ICommandHandler<DeletePaymentCommand> deletePaymentCommandHandler, ISqsMessenger sqsMessenger)
              {
                  _deletePaymentCommandHandler = deletePaymentCommandHandler;
                  _sqsMessenger = sqsMessenger;
              }
              public async Task HandleAsync(ShippingCancelledEvent @event)
              {
                  await _deletePaymentCommandHandler.HandleAsync(new DeletePaymentCommand(@event.PaymentDetailsId));
        
                  await _sqsMessenger.SendMessageAsync(new PaymentCancelledEvent
                  {
                      OrderId = @event.OrderId,
                      PaymentDetailsId = @event.PaymentDetailsId
                  });
              }
          }
        
          namespace Payments.Api.Events;
        
          public class PaymentCancelledEvent
          {
              public Guid OrderId { get; set; }
              public Guid PaymentDetailsId { get; set; }
          }
        
          namespace Payments.Api.Events;
        
          public class PaymentProcessedEvent
          {
              public Guid OrderId { get; set; }
              public Guid PaymentId { get; set; }
        
              public PaymentProcessedEvent(Guid paymentId, Guid orderId)
              {
                  PaymentId = paymentId;
                  OrderId = orderId;
              }
          }
        
          using Payments.Api.Entities;
        
          namespace Payments.Api.Interfaces;
        
          public interface IPaymentProcessor
          {
              Task<bool> ProcessPaymentAsync(PaymentDetail paymentDetail);
          }
        
          using Payments.Api.Entities;
        
          namespace Payments.Api.Interfaces;
        
          public interface IPaymentRepository
          {
              Task CreatePaymentDetailsAsync(PaymentDetail payment);
              Task DeletePaymentDetailsAsync(Guid paymentId);
          }
        
          using Payments.Api.Entities;
          using Payments.Api.Enums;
          using Payments.Api.Interfaces;
        
          namespace Payments.Api.Service;
        
          public class PaymentProcessor : IPaymentProcessor
          {
              public async Task<bool> ProcessPaymentAsync(PaymentDetail paymentDetail)
              {
                  await Task.Run((() =>
                  {
                      // simulating delay in a payment charge
                      Thread.Sleep(1000);
                      paymentDetail.Status = PaymentStatus.Paid;
                  }));
        
                  return true;
              }
          }
        
          using Ardalis.GuardClauses;
          using Payments.Api.Data;
          using Payments.Api.Entities;
          using Payments.Api.Interfaces;
        
          namespace Payments.Api.Service;
        
          public class PaymentRepository : IPaymentRepository
          {
              private readonly AppDbContext _dbContext;
        
              public PaymentRepository(AppDbContext dbContext)
              {
                  _dbContext = dbContext;
              }
              public async Task CreatePaymentDetailsAsync(PaymentDetail payment)
              {
                  Guard.Against.Null(payment);
        
                  await _dbContext.PaymentDetails.AddAsync(payment);
                  await _dbContext.SaveChangesAsync();
              }
        
              public async Task DeletePaymentDetailsAsync(Guid paymentId)
              {
                  Guard.Against.Default(paymentId);
        
                  var payment = await _dbContext.PaymentDetails.FindAsync(paymentId);
                  if (payment is {})
                  {
                      _dbContext.PaymentDetails.Remove(payment);
                  }
        
                  await _dbContext.SaveChangesAsync();
              }
          }
        
          {
            "Logging": {
              "LogLevel": {
                "Default": "Information",
                "Microsoft.AspNetCore": "Warning"
              }
            },
            "AllowedHosts": "*",
            "QueueSettings":{
              "QueueName": "saga-pattern"
            },
            "ConnectionStrings": {
              "DefaultConnection": "Data Source = ./payment.db"
            }
          }
        
          using AutoMapper;
          using Payments.Api.Dtos;
          using Payments.Api.Entities;
        
          namespace Payments.Api;
        
          public class MappingProfile: Profile
          {
              public MappingProfile()
              {
                  CreateMap<PaymentDetail, PaymentDetailsForCreateDto>().ReverseMap();
              }
          }
        

        ```csharp using Amazon.SQS; using AutoMapper; using SagaPattern.Commons; using Microsoft.AspNetCore.Mvc; using Microsoft.EntityFrameworkCore; using Payments.Api.Commands; using Payments.Api.Data; using Payments.Api.Dtos; using Payments.Api.Entities; using Payments.Api.Events.ExternalEvents; using Payments.Api.Interfaces; using Payments.Api.Service;

        var builder = WebApplication.CreateBuilder(args);

        // Add services to the container. builder.Services.AddControllers(); // Learn more about configuring Swagger/OpenAPI at aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen();

        builder.Services.AddAutoMapper(typeof(Program)); builder.Services.AddDbContext(options => { options.UseSqlite(builder.Configuration.GetConnectionString("DefaultConnection")); }); builder.Services.AddScoped(); builder.Services.AddScoped(); builder.Services.AddEventHandlers(typeof(Program)); builder.Services.AddCommandHandlers(typeof(Program)); builder.Services.AddScoped(); builder.Services.AddSingleton(); builder.Services.AddSingleton(); builder.Services.Configure(builder.Configuration.GetSection("QueueSettings"));

        var app = builder.Build();

        using var serviceScope = app.Services.CreateScope(); var dbContext = serviceScope.ServiceProvider.GetRequiredService(); await dbContext.Database.MigrateAsync();

        // Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); }

        app.UseHttpsRedirection(); app.UseAuthorization(); app.MapControllers();

        app.ListenForSqsEvents(new[] { nameof(OrderCreatedEvent), nameof(ShippingCancelledEvent) });

        app.MapPost("api/payments", async ([FromServices] IMapper mapper, [FromServices] ICommandHandler commandHandler, [FromBody] PaymentDetailsForCreateDto paymentDetails) => { var payment = mapper.Map(paymentDetails); await commandHandler.HandleAsync(new ProcessPaymentCommand(payment));

        return StatusCodes.Status201Created; });

app.MapDelete("api/payments/{paymentId:guid}", async ( [FromRoute] Guid paymentId, [FromServices] ICommandHandler commandHandler) => { await commandHandler.HandleAsync(new DeletePaymentCommand(paymentId));

return StatusCodes.Status204NoContent; });

app.Run();


    * Next, we would implement our Shipping service: `Shippings.Api`

        * Add the following folders and files (where they don't exist) to our `Orders.Api` project.

            ![](https://cdn.hashnode.com/res/hashnode/image/upload/v1705608518892/22c1962b-cac2-4c05-a89f-2fef39d76f85.png align="center")

        * Let's add the following code to their respective files.

            ```csharp
            using SagaPattern.Commons;
            using Shipping.Api.Entities;

            namespace Shipping.Api.Commands;

            public class CreateShippingRequestCommand : ICommand
            {
                public ShippingRequest ShippingRequest { get; }

                public CreateShippingRequestCommand(ShippingRequest shippingRequest)
                {
                    ShippingRequest = shippingRequest;
                }
            }
            using Ardalis.GuardClauses;
            using SagaPattern.Commons;
            using Shipping.Api.Events;
            using Shipping.Api.Interfaces;

            namespace Shipping.Api.Commands;

            public class CreateShippingRequestCommandHandler : ICommandHandler<CreateShippingRequestCommand>
            {
                private readonly IShippingRequestRepository _repository;
                private readonly ISqsMessenger _sqsMessenger;
                private readonly ILogger<CreateShippingRequestCommandHandler> _logger;

                public CreateShippingRequestCommandHandler(
                    IShippingRequestRepository repository, 
                    ISqsMessenger sqsMessenger,
                    ILogger<CreateShippingRequestCommandHandler> logger)
                {
                    _repository = repository;
                    _sqsMessenger = sqsMessenger;
                    _logger = logger;
                }
                public async Task HandleAsync(CreateShippingRequestCommand command)
                {
                    Guard.Against.Null(command);

                    try
                    {
                        await _repository.CreateShippingRequestAsync(command.ShippingRequest);
                        _logger.LogInformation($"inserted shipping request with id: {command.ShippingRequest.Id} into db");
                    }
                    catch
                    {
                        await _sqsMessenger.SendMessageAsync(new ShippingCancelledEvent()
                        {
                            OrderId = command.ShippingRequest.OrderId,
                            PaymentDetailsId = command.ShippingRequest.PaymentId
                        });
                    }
                }
            }
            using SagaPattern.Commons;

            namespace Shipping.Api.Commands;

            public class DeleteShippingRequestCommand : ICommand
            {
                public Guid ShippingRequestId { get; }

                public DeleteShippingRequestCommand(Guid shippingRequestId)
                {
                    ShippingRequestId = shippingRequestId;
                }
            }
            using Ardalis.GuardClauses;
            using SagaPattern.Commons;
            using Shipping.Api.Interfaces;

            namespace Shipping.Api.Commands;

            public class DeleteShippingRequestCommandHandler : ICommandHandler<DeleteShippingRequestCommand>
            {
                private readonly IShippingRequestRepository _shippingRequestRepository;
                private readonly ILogger<DeleteShippingRequestCommandHandler> _logger;

                public DeleteShippingRequestCommandHandler(
                        IShippingRequestRepository shippingRequestRepository, 
                        ILogger<DeleteShippingRequestCommandHandler> logger
                    )
                {
                    _shippingRequestRepository = shippingRequestRepository;
                    _logger = logger;
                }
                public async Task HandleAsync(DeleteShippingRequestCommand command)
                {
                    Guard.Against.Null(command);

                    await _shippingRequestRepository.DeleteShippingRequestAsync(command.ShippingRequestId);
                    _logger.LogInformation($"deleted shipping request with id: {command.ShippingRequestId} from db");
                }
            }
            using Microsoft.EntityFrameworkCore;
            using Shipping.Api.Entities;

            namespace Shipping.Api.Data;

            public class AppDbContext : DbContext
            {
                public AppDbContext(DbContextOptions<AppDbContext> options) : base(options)
                {
                }

                public DbSet<ShippingRequest> ShippingRequests { get; set; }
            }
            using Shipping.Api.Enums;

            namespace Shipping.Api.Dtos
            {
                public class ShippingRequestForCreateDto
                {
                    public Guid PaymentId { get; set; }
                    public Guid OrderId { get; set; }
                    public ShippingRequestStatus ShippingRequestStatus { get; set; }
                }
            }
            using Shipping.Api.Enums;

            namespace Shipping.Api.Entities;

            public class ShippingRequest
            {
                public Guid Id { get; set; }
                public Guid PaymentId { get; set; }
                public Guid OrderId { get; set; }
                public ShippingRequestStatus ShippingRequestStatus { get; set; }
            }
            namespace Shipping.Api.Enums;

            public enum ShippingRequestStatus
            {
                Open,
                Shipped
            }
            using SagaPattern.Commons;

            namespace Shipping.Api.Events.ExternalEvents;

            public class PaymentProcessedEvent : IEvent
            {
                public Guid OrderId { get; set; }
                public Guid PaymentId { get; set; }
            }
            using Ardalis.GuardClauses;
            using SagaPattern.Commons;
            using Shipping.Api.Commands;
            using Shipping.Api.Entities;
            using Shipping.Api.Enums;

            namespace Shipping.Api.Events.ExternalEvents;

            public class PaymentProcessedEventHandler : IEventHandler<PaymentProcessedEvent>
            {
                private readonly ICommandHandler<CreateShippingRequestCommand> _commandHandler;

                public PaymentProcessedEventHandler(ICommandHandler<CreateShippingRequestCommand> commandHandler)
                {
                    _commandHandler = commandHandler;
                }

                public async Task HandleAsync(PaymentProcessedEvent @event)
                {
                    Guard.Against.Null(@event);
                    await _commandHandler.HandleAsync(new(new ShippingRequest
                        {
                            PaymentId = @event.PaymentId,
                            OrderId = @event.OrderId,
                            ShippingRequestStatus = ShippingRequestStatus.Open
                        })
                    );
                }
            }
            using SagaPattern.Commons;

            namespace Shipping.Api.Events;

            public class ShippingCancelledEvent : IEvent
            {
                public Guid OrderId { get; set; }
                public Guid PaymentDetailsId { get; set; }
            }
            using Shipping.Api.Entities;

            namespace Shipping.Api.Interfaces;

            public interface IShippingRequestRepository
            {
                Task CreateShippingRequestAsync(ShippingRequest shippingRequest);
                Task DeleteShippingRequestAsync(Guid shippingRequestId);
            }
            using Ardalis.GuardClauses;
            using Shipping.Api.Data;
            using Shipping.Api.Entities;
            using Shipping.Api.Interfaces;

            namespace Shipping.Api.Services;

            public class ShippingRequestRepository : IShippingRequestRepository
            {
                private readonly AppDbContext _dbContext;

                public ShippingRequestRepository(AppDbContext dbContext)
                {
                    _dbContext = dbContext;
                }
                public async Task CreateShippingRequestAsync(ShippingRequest shippingRequest)
                {
                    Guard.Against.Null(shippingRequest);

                    await _dbContext.ShippingRequests.AddAsync(shippingRequest);
                    await _dbContext.SaveChangesAsync();
                }

                public async Task DeleteShippingRequestAsync(Guid shippingRequestId)
                {
                    Guard.Against.Default(shippingRequestId);

                    var shippingRequest = await _dbContext.FindAsync<ShippingRequest>(shippingRequestId);
                    if (shippingRequest is {})
                    {
                        _dbContext.ShippingRequests.Remove(shippingRequest);
                    }
                    await _dbContext.SaveChangesAsync();
                }
            }
            {
              "Logging": {
                "LogLevel": {
                  "Default": "Information",
                  "Microsoft.AspNetCore": "Warning"
                }
              },
              "AllowedHosts": "*",
              "QueueSettings":{
                "QueueName": "saga-pattern"
              },
              "ConnectionStrings": {
                "DefaultConnection": "Data Source = ./shipping.db"
              }
            }
            using AutoMapper;
            using Shipping.Api.Dtos;
            using Shipping.Api.Entities;

            namespace Shipping.Api;

            public class MappingProfile: Profile
            {
                public MappingProfile()
                {
                    CreateMap<ShippingRequest, ShippingRequestForCreateDto>().ReverseMap();
                }
            }
            using Amazon.SQS;
            using AutoMapper;
            using Microsoft.AspNetCore.Mvc;
            using Microsoft.EntityFrameworkCore;
            using SagaPattern.Commons;
            using Shipping.Api.Commands;
            using Shipping.Api.Data;
            using Shipping.Api.Dtos;
            using Shipping.Api.Entities;
            using Shipping.Api.Events.ExternalEvents;
            using Shipping.Api.Interfaces;
            using Shipping.Api.Services;

            var builder = WebApplication.CreateBuilder(args);

            // Add services to the container.

            builder.Services.AddControllers();
            // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
            builder.Services.AddEndpointsApiExplorer();
            builder.Services.AddSwaggerGen();

            builder.Services.AddAutoMapper(typeof(Program));
            builder.Services.AddDbContext<AppDbContext>(options =>
            {
                options.UseSqlite(builder.Configuration.GetConnectionString("DefaultConnection"));
            });
            builder.Services.AddScoped<IShippingRequestRepository, ShippingRequestRepository>();
            builder.Services.AddScoped<ISqsMessenger, SqsMessenger>();
            builder.Services.AddSingleton<IAmazonSQS, AmazonSQSClient>();
            builder.Services.AddSingleton<IEventListener, SqsMessenger>();
            builder.Services.AddEventHandlers(typeof(Program));
            builder.Services.AddCommandHandlers(typeof(Program));
            builder.Services.Configure<QueueSettings>(builder.Configuration.GetSection("QueueSettings"));

            var app = builder.Build();
            using var serviceScope = app.Services.CreateScope();
            var dbContext = serviceScope.ServiceProvider.GetRequiredService<AppDbContext>();
            await dbContext.Database.MigrateAsync();

            // Configure the HTTP request pipeline.
            if (app.Environment.IsDevelopment())
            {
                app.UseSwagger();
                app.UseSwaggerUI();
            }

            app.UseHttpsRedirection();
            app.UseAuthorization();
            app.MapControllers();
            app.ListenForSqsEvents(new[] { nameof(PaymentProcessedEvent) });

            app.MapPost("api/shipping",
                async (
                    [FromServices] IMapper mapper,
                    [FromServices] ICommandHandler<CreateShippingRequestCommand> commandHandler,
                    [FromBody] ShippingRequestForCreateDto shippingRequestDto) =>
                {
                    var shippingRequest = mapper.Map<ShippingRequest>(shippingRequestDto);
                    await commandHandler.HandleAsync(new CreateShippingRequestCommand(shippingRequest));

                    return StatusCodes.Status201Created;
                });

            app.MapDelete("api/shipping/{shippingRequestId:guid}",
                async ([FromServices] ICommandHandler<DeleteShippingRequestCommand> commandHandler,
                    [FromRoute] Guid shippingRequestId) =>
                {
                    await commandHandler.HandleAsync(new DeleteShippingRequestCommand(shippingRequestId));

                    return StatusCodes.Status204NoContent;
                });

            app.Run();
  1. Add Entity Framework migrations to create and update the database. Navigate to each project directory and run the following lines of code. This will create the Migrations folder and the relevant code in it.

     dotnet ef migrations add InitialCommit --verbose
    
     dotnet ef database update
    

Testing Our Implementation

  1. Run all three projects.

  2. Simulate the creation of a new order by calling the POST endpoint on the orders API.

    We can see our OrderCreatedEvent published to AWS SQS.

    We also confirm the presence of the order in our database.

  3. Confirm the presence of messages in AWS SQS .

  4. Test the happy path to confirm that the services are reactive to the events published by the other services they integrate.

    In the image above, we see our Payments.Api responding to the OrderCreatedEvent generated by the Orders.Api. And generating a PaymentProcessedEvent.

    Finally, we see our Shippings.Api responding to the PaymentProcessedEvent generated by our Payments.Api.

  5. Trigger error somewhere in the processing.

    Now, let's test the fail path. We would intentionally trigger an error in our Shippings.Api to see if the rollback is triggered and proliferated back to our Orders.Api.

    In the CreateShippingRequestCommandHandler modify it to throw an exception.

     using Ardalis.GuardClauses;
     using SagaPattern.Commons;
     using Shipping.Api.Events;
     using Shipping.Api.Interfaces;
    
     namespace Shipping.Api.Commands;
    
     public class CreateShippingRequestCommandHandler : ICommandHandler<CreateShippingRequestCommand>
     {
         private readonly IShippingRequestRepository _repository;
         private readonly ISqsMessenger _sqsMessenger;
         private readonly ILogger<CreateShippingRequestCommandHandler> _logger;
    
         public CreateShippingRequestCommandHandler(
             IShippingRequestRepository repository, 
             ISqsMessenger sqsMessenger,
             ILogger<CreateShippingRequestCommandHandler> logger)
         {
             _repository = repository;
             _sqsMessenger = sqsMessenger;
             _logger = logger;
         }
         public async Task HandleAsync(CreateShippingRequestCommand command)
         {
             Guard.Against.Null(command);
    
             try
             {
                 await _repository.CreateShippingRequestAsync(command.ShippingRequest);
                 _logger.LogInformation($"inserted shipping request with id: {command.ShippingRequest.Id} into db");
                 //todo: uncomment to trigger compensating transactions.
                 throw new ApplicationException("an error has occured.");
             }
             catch
             {
                 await _sqsMessenger.SendMessageAsync(new ShippingCancelledEvent()
                 {
                     OrderId = command.ShippingRequest.OrderId,
                     PaymentDetailsId = command.ShippingRequest.PaymentId
                 });
             }
         }
     }
    

    We see a ShippingCancelledEvent raised by our Shippings.Api.

    In the image below, we see our Payments.Api receiving the ShippingCancelledEvent , processing it, and responding with a PaymentCancelledEvent.

    Subsequently, we see our Orders.Api receiving the PaymentCancelledEvent and responding to it by deleting the order.


Conclusion

This article explored the Saga pattern, including its implementation approaches and benefits. We have elaborately examined implementing this pattern in .NET using AWS SQS as our message broker.

The code is available here for reference. It provides a nuanced understanding of the pattern and offers readers a clear overview of its key features.