CQRS and Event Sourcing In C# .NET (Part 2)

CQRS and Event Sourcing In C# .NET (Part 2)

The Application of Event Sourcing in Real-world Scenarios.

Introduction to Event Sourcing

In part one of this series, we explored CQRS. This article continues from where we stopped in part one (CQRS and Event Sourcing in C# .NET (Part 1)) and builds upon the code developed there. Kindly go through part one, available here, before proceeding.

Event Sourcing is a pattern that captures all changes to an application's state as a sequence of immutable events. These events are stored in an event log or journal, and the current state of the application is determined by replaying these events. Here's how it works:

  • Event Creation: Events are generated from commands (e.g., user actions) that modify the application's state. Each event represents a discrete change, including all the necessary information to reconstruct that change.

  • Event Storage: Events are stored in an event store, typically an append-only log or database.

  • Event Replay: To reconstruct the current state of the application, you replay the events in the order they occurred. This can be done by reading events from the event store and applying them to an initial state or snapshot.

Event Sourcing offers several advantages:

  • Full Audit Trail: Since all changes to the system are recorded as events, you have a complete audit trail of how and why the system's state evolved over time.

  • Temporal Querying: You can query the application's state at any point in time, which is useful for historical analysis and debugging.

  • Scalability: Event Sourcing naturally supports distributed and event-driven architectures, making it suitable for serverless environments where AWS Lambda can process events and update event stores.

  • Flexibility: It allows for evolving and changing the system's behavior by replaying events with different business logic or data transformations.

In our implementation, we would use Redis Streams as our event store. An event store is optimized for writing but less efficient in reading data. This is one reason for having a separate read database in CQRS, which is synched either synchronously or asynchronously; the synchronous approach involves immediately calling our read database repository to save the data after persisting to the event store, and asynchronously by eventual consistency from the event store to the read database. For our read database, we are using Sqlite.


Implementation

Let's get started!

  1. Install docker and docker desktop on your machine.

    Instruction for installing docker is available on the official website: https://www.docker.com/

  2. Pull the Redis image from the docker hub.

    Run the below command on the terminal or cmd.

     docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
    
  3. Install the StackExchange.Redis in your project from the NuGet package manager.

  4. Add the following files and file structure to your project. (The files are highlighted in green.)

  5. Add the following implementations to your code files.

    • IEvent.cs

      Marker interface to identify all event objects.

        namespace Orders.Api.Events;
      
        public interface IEvent
        {
      
        }
      
    • OrderCreatedEvent.cs

        using Orders.Api.Entities;
      
        namespace Orders.Api.Events;
      
        public class OrderCreatedEvent : IEvent
        {
            public Order Order { get; }
      
            public OrderCreatedEvent(Order order)
            {
                Order = order;
            }
        }
      
    • OrderUpdatedEvent.cs

        using Orders.Api.Entities;
      
        namespace Orders.Api.Events;
      
        public class OrderUpdatedEvent : IEvent
        {
            public Order Order { get; }
      
            public OrderUpdatedEvent(Order order)
            {
                Order = order;
            }
        }
      
    • IEventHandler.cs An interface to be implemented by all event handlers.

        namespace Orders.Api.Events.EventHandlers;
      
        public interface IEventHandler<in T> where T : IEvent
        {
            Task HandleAsync(T @event);
        }
      
    • OrderCreatedEventHandler.cs

        using Ardalis.GuardClauses;
        using Orders.Api.Repositories;
      
        namespace Orders.Api.Events.EventHandlers;
      
        public class OrderCreatedEventHandler : IEventHandler<OrderCreatedEvent>
        {
            private readonly IWriteOrdersRepository _repository;
            private readonly IReadOrdersRepository _readOrdersRepository;
      
            public OrderCreatedEventHandler(IWriteOrdersRepository repository, IReadOrdersRepository readOrdersRepository)
            {
                _repository = repository;
                _readOrdersRepository = readOrdersRepository;
            }
            public async Task HandleAsync(OrderCreatedEvent @event)
            {
                Guard.Against.Null(@event);
      
                var order = await _readOrdersRepository.GetOrderByIdAsync(@event.Order.Id);
                if (order is not {})
                {
                    await _repository.CreateOrderAsync(@event.Order);
                }
                else
                {
                    await _repository.UpdateOrderAsync(@event.Order);
                }
            }
        }
      
    • OrderUpdatedEventHandler.cs

        using Ardalis.GuardClauses;
        using Orders.Api.Repositories;
      
        namespace Orders.Api.Events.EventHandlers;
      
        public class OrderUpdatedEventHandler : IEventHandler<OrderUpdatedEvent>
        {
            private readonly IWriteOrdersRepository _repository;
      
            public OrderUpdatedEventHandler(IWriteOrdersRepository repository)
            {
                _repository = repository;
            }
      
            public async Task HandleAsync(OrderUpdatedEvent @event)
            {
                Guard.Against.Null(@event);
      
                await _repository.UpdateOrderAsync(@event.Order);
            }
        }
      
    • Now, let's create our events store repository class, which would be responsible for integrating with our Redis Event store.

      First, we define an interface IEventStoreRepository.cs and then implement the interface in RedisEventStoreRepository.cs

        using Orders.Api.Events;
        using System.Text.Json;
        using Ardalis.GuardClauses;
        using Microsoft.Extensions.Options;
        using Newtonsoft.Json;
        using Orders.Api.Events;
        using StackExchange.Redis;

        namespace Orders.Api.Repositories;

        public interface IEventStoreRepository
        {
            Task PublishAsync(IEvent message);
        }

        public class RedisEventStoreRepository : IEventStoreRepository
        {
            private readonly IOptions<RedisConfig> _redisConfigOptions;
            private readonly ILogger<RedisEventStoreRepository> _logger;
            private readonly IDatabase _redisDatabase;


            public RedisEventStoreRepository(IOptions<RedisConfig> redisConfigOptions,
                ILogger<RedisEventStoreRepository> logger,
                IDatabase redisDatabase)
            {
                _redisConfigOptions = redisConfigOptions;
                _logger = logger;
                _redisDatabase = redisDatabase;
            }

            public async Task PublishAsync(IEvent message)
            {
                Guard.Against.Null(message);
                var @event = new[] { new NameValueEntry(message.GetType().FullName,  JsonConvert.SerializeObject(message)) };

                await _redisDatabase.StreamAddAsync(_redisConfigOptions.Value.StreamName, @event);
            }
        }
  • Instead of directly persisting to our database in our command handlers, we would persist to our event store. Modify your command handlers as follows:

      using Ardalis.GuardClauses;
      using Orders.Api.Events;
      using Orders.Api.Repositories;
    
      namespace Orders.Api.Commands.CommandHandlers;
    
      public class UpdateOrderCommandHandler : ICommandHandler<UpdateOrderCommand>
      {
          private readonly IEventStoreRepository _eventStoreRepository;
    
          public UpdateOrderCommandHandler(IEventStoreRepository eventStoreRepository)
          {
              _eventStoreRepository = eventStoreRepository;
          }
          public async Task HandleAsync(UpdateOrderCommand command)
          {
              Guard.Against.Null(command?.Order);
    
              await _eventStoreRepository.PublishAsync(new OrderUpdatedEvent(command.Order));
          }
      }
    
      using Ardalis.GuardClauses;
      using Orders.Api.Events;
      using Orders.Api.Repositories;
    
      namespace Orders.Api.Commands.CommandHandlers;
    
      public class CreateOrderCommandHandler : ICommandHandler<CreateOrderCommand>
      {
          private readonly IEventStoreRepository _eventStoreRepository;
    
          public CreateOrderCommandHandler(IEventStoreRepository eventStoreRepository)
          {
              _eventStoreRepository = eventStoreRepository;
          }
          public async Task HandleAsync(CreateOrderCommand command)
          {
              Guard.Against.Null(command);
    
              // persisting data to our event store
              await _eventStoreRepository.PublishAsync(new OrderCreatedEvent(command.Order));
          }
      }
    
  • Now, let's create configuration parameters for our connection to the Redis Event stream.

    • add the following lines to your appsettings.json file

        "RedisConfig": {
          "Url": "127.0.0.1:6379,abortConnect=false,connectTimeout=30000,responseTimeout=30000",
          "StreamName": "orders.api"
        }
      
    • Create a class RedisConfig.cs to map to the section on app config.

        namespace Orders.Api;
      
        public class RedisConfig
        {
            public string Url { get; set; }
            public string StreamName { get; set; }
        }
      
  • Now, let's write code that will constantly listen to our Redis Event store for new entries: interface IEventListener and implementation EventListener

      using Microsoft.Extensions.Options;
      using Newtonsoft.Json;
      using Orders.Api.Events;
      using Orders.Api.Events.EventHandlers;
      using StackExchange.Redis;
    
      namespace Orders.Api;
    
      public class EventListener : IEventListener
      {
          private readonly IDatabase _redisDatabase;
          private readonly IOptions<RedisConfig> _redisConfig;
          private readonly ILogger<EventListener> _logger;
          private readonly IServiceProvider _serviceProvider;
    
          public EventListener(
              IDatabase redisDatabase,
              IOptions<RedisConfig> redisConfig,
              ILogger<EventListener> logger,
              IServiceProvider serviceProvider)
          {
              _redisDatabase = redisDatabase;
              _redisConfig = redisConfig;
              _logger = logger;
              _serviceProvider = serviceProvider;
          }
    
          public async Task Listen(CancellationToken token)
          {
              try
              {
                  // Read events from the stream
                  var lastId = "-";
                  while (!token.IsCancellationRequested)
                  {
                      var result = await _redisDatabase.StreamRangeAsync(_redisConfig.Value.StreamName, lastId, "+");
                      if (!result.Any() || lastId == result.Last().Id) continue;
    
                      lastId = result.Last().Id;
    
                      foreach (var entry in result)
                      foreach (var field in entry.Values)
                      {
                          var type = Type.GetType(field.Name!);
                          var body = (IEvent)JsonConvert.DeserializeObject(field.Value!, type!)!;
    
                          // initialize and call our event handler 
                          var messageHandlerType = typeof(IEventHandler<>).MakeGenericType(type!);
                          using var scope = _serviceProvider.GetRequiredService<IServiceScopeFactory>().CreateScope();
                          var handler = scope.ServiceProvider.GetRequiredService(messageHandlerType);
    
                          handler.GetType().GetMethod("HandleAsync", new[] { type! })?.Invoke(handler, new[] { body });
                      }
                  }
              }
              catch(Exception e)
              {
                  _logger.LogError(e, "an error occured processing events.");
              }
          }
      }
    
      public interface IEventListener
      {
          Task Listen(CancellationToken token);
      }
    
  • Let's create an extension method that will connect our event listener to our running application.

    Add the following code to the Extensions.cs file.

      public static IApplicationBuilder ListenForRedisEvents(this WebApplication builder)
      {
          if (builder == null) throw new ArgumentNullException(nameof(builder));
    
          using var serviceScope = builder.Services.CreateScope();
          var eventListener = serviceScope.ServiceProvider.GetRequiredService<IEventListener>();
          Task.Run(() => eventListener.Listen(new CancellationToken()));
    
          return builder;
      }
    
  • Add the following lines of code to our Program.cs to set up our dependencies.

      builder.Services.AddEventHandlers(typeof(Program));
      builder.Services.AddSingleton<IEventListener>((provider => 
          new EventListener(provider.GetRequiredService<IDatabase>(),
          provider.GetRequiredService<IOptions<RedisConfig>>(),
          provider.GetRequiredService<ILogger<EventListener>>(), 
          provider)));
    
      builder.Services.Configure<RedisConfig>(builder.Configuration.GetSection("RedisConfig"));
      var redis = ConnectionMultiplexer.Connect(builder.Configuration.GetSection("RedisConfig:Url").Value!);
      var redisDatabase = redis.GetDatabase();
      builder.Services.AddSingleton(redisDatabase);
    

    And after the builder.Build() is called:

      app.ListenForRedisEvents();
    

    And that's it! Now, let's test our implementation.


Testing Our Implementation

  1. Run our application by pressing f5

  2. Create or update an order via Swagger

  3. Confirm in the Redis Event store the presence of the event.

    Ensure your Redis container is running on docker and navigate to UI via http://localhost:8001/redis-stack/browser.

    We can see our events present.

  4. Confirm in our Read database the presence of the order.

And that's it!

Conclusion

In this article, we thoroughly examined the idea of event streaming and presented a comprehensive guide on applying it in practice using C# and Redis Event Store. We also showcased the benefits of implementing event streaming, its potential use cases, and how it can improve system architecture and performance.