Asynchronous Communication In Microservices Via C# and AWS SQS
Introduction
In a distributed system, services must communicate and exchange data to achieve the software objectives. For example, imagine an eCommerce store implemented with the microservices architecture; when a user places an order, the front-end app would call the gateway, which will then forward the request to the order service. The order service
will then subsequently call the notification service to send email notifications to the customer about the successful placement of an order. (This is an oversimplified example.)
There are two approaches to microservices communication: synchronous and asynchronous communication.
Synchronous Communication
In this approach to the data exchange, the caller calls a service and waits for the response from the service before proceeding with other aspects of the code. Referencing our earlier example, after the call is sent from the front-end app down to the notifications service, the order service would wait for a response from the notifications service and return a response to the client app.
Some key points of synchronous communication:
Request-Response Pattern: Synchronous communication involves a request-response pattern, where a client sends a request to a microservice and waits for a response before proceeding. This is akin to a traditional client-server interaction.
Blocking Nature: Synchronous communication is blocking by nature. The client waits for a response, which can lead to potential bottlenecks if the microservice being called experiences delays or becomes unresponsive.
Immediate Feedback: Synchronous communication provides immediate feedback to the client, making it suitable for scenarios where real-time responses are required.
Latency Impact: Synchronous communication can introduce latency as the client must wait for the response, especially if the microservice's response time is inconsistent.
Asynchronous Communication
In this approach to communication, the calling service publishes a message via a queue or topic and does not need to await a response from the receiving service. Referencing our example again, our order service would send a message to the notification service. The order service would then immediately return a response to the client app without waiting for a response from the notification service.
Some key points of asynchronous communication:
Event-Driven Pattern: Asynchronous communication involves an event-driven pattern, where microservices communicate by emitting events or messages to a message broker or queue. Other interested microservices can then consume these events at their own pace.
Non-Blocking: Asynchronous communication is non-blocking. Microservices can continue their operations without waiting for a response, improving overall system responsiveness.
Loose Coupling: Asynchronous communication promotes loose coupling between microservices. The producing microservice does not need to know which microservices are consuming its events, enabling flexibility and scalability.
Scalability: Asynchronous communication can help improve scalability, as consuming microservices can process events independently and at their rate.
Intro to AWS SQS
Amazon Simple Queue Service (SQS) is a fully managed message queuing service provided by Amazon Web Services (AWS). It helps reduce complexity because, as developers, we wouldn't be responsible for creating and managing our messaging queue.
Here are some useful terms related to AWS SQS:
Queue: A queue is a named container for holding messages waiting to be processed. It acts as a buffer between message producers and consumers.
Message: A message is the data you send to an SQS queue. It can be in various formats (JSON, XML, plain text, etc.) and contains the information you want to communicate between different application parts.
Dead Letter Queue (DLQ): A DLQ is a separate queue where messages cannot be processed successfully after a certain number of retries are sent. This helps in isolating and debugging failed messages.
FIFO Queue: A First-In-First-Out (FIFO) queue ensures that the order in which messages are sent and received is preserved. This is important for scenarios where message order matters, such as financial transactions.
Standard Queue: A standard queue provides a best-effort ordering of messages, but the order is not guaranteed. It offers higher throughput compared to FIFO queues.
Message Attributes: Message attributes are metadata key-value pairs that provide additional information about a message. Consumers can use these attributes to make processing decisions.
Enough of the theory! Let's get to some code!
Implementation
Creating our AWS Account
First and foremost, to use any AWS service, we need an AWS account. Go over to the website here and create an account.
Create SQS Queue
- Next, please search for the SQS on the search bar and click on it.
- Click on the Create Queue button in the top right corner.
- Give the queue a name (we'll name it "orders") and leave the other settings as they are.
- Now, our queue has been successfully created.
Publish to Queue
Now that we have successfully created a queue let's publish it from our orders service. We would create an end-to-end CRUD API implementation for our order service.
Create a .NET web API app with the following structure and classes
Add the following codes to the respective files
OrderController.cs
using AutoMapper; using Microsoft.AspNetCore.Mvc; using Orders.Api.Dtos; using Orders.Api.Entities; using Orders.Api.Services; namespace Orders.Api.Controllers; [Route("api/orders")] [ApiController] public class OrderController : ControllerBase { private readonly IOrdersService _ordersService; private readonly ILogger<OrderController> _logger; private readonly IMapper _mapper; public OrderController(IOrdersService ordersService, ILogger<OrderController> logger, IMapper mapper) { _ordersService = ordersService; _logger = logger; _mapper = mapper; } [HttpGet] public async Task<ActionResult<IEnumerable<OrderForReturnDto>>> GetOrders() { var orders = await _ordersService.GetOrdersAsync(); return Ok(_mapper.Map<IEnumerable<OrderForReturnDto>>(orders)); } [HttpGet("{orderId}", Name = nameof(GetOrder))] public async Task<IActionResult> GetOrder(Guid orderId) { var orders = await _ordersService.GetOrderByIdAsync(orderId); return Ok(_mapper.Map<OrderForReturnDto>(orders)); } [HttpPost] public async Task<IActionResult> CreateOrders([FromBody] OrderForCreateDto orderForCreateDto) { var order = await _ordersService.CreateOrderAsync(_mapper.Map<Order>(orderForCreateDto)); var orderDto = _mapper.Map<OrderForReturnDto>(order); return CreatedAtAction(nameof(GetOrder), new {orderId = order.Id}, new {orderDto}); } [HttpPut("{orderId}")] public async Task<IActionResult> UpdateOrders([FromBody] OrderForUpdateDto orderForUpdateDto, Guid orderId) { var order = _mapper.Map<Order>(orderForUpdateDto); order.Id = orderId; var updateOrder = await _ordersService.UpdateOrderAsync(order); var orderDto = _mapper.Map<OrderForReturnDto>(updateOrder); return Ok(orderDto); } [HttpDelete("{orderId}")] public async Task<IActionResult> DeleteOrder(Guid orderId) { var result = await _ordersService.DeleteAsync(orderId); return result ? NoContent() : BadRequest(); } }
AppDbContext.cs
using Microsoft.EntityFrameworkCore; using Orders.Api.Entities; namespace Orders.Api.Data; public class AppDbContext : DbContext { public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { } public DbSet<Order> Orders { get; set; } }
OrderForCreateDto.cs
namespace Orders.Api.Dtos; public class OrderForCreateDto { public decimal Amount { get; set; } public string Currency { get; set; } public string CustomerEmail { get; set; } }
OrderForReturnDto.cs
namespace Orders.Api.Dtos; public class OrderForReturnDto { public Guid Id { get; set; } public decimal Amount { get; set; } public string Currency { get; set; } public string CustomerEmail { get; set; } }
OrderForUpdateDto.cs
namespace Orders.Api.Dtos; public class OrderForUpdateDto { public decimal Amount { get; set; } public string Currency { get; set; } public string CustomerEmail { get; set; } }
Order.cs
namespace Orders.Api.Entities; public class Order { public Guid Id { get; set; } public decimal Amount { get; set; } public string Currency { get; set; } public string CustomerEmail { get; set; } }
IOrdersRepository.cs
using Orders.Api.Entities; namespace Orders.Api.Repository; public interface IOrdersRepository { Task<Order> CreateOrderAsync(Order entity); Task<Order> UpdateOrderAsync(Order entity); Task<Order?> GetOrderByIdAsync(Guid orderId); Task<bool> DeleteOrderByIdAsync(Order entity); Task<IEnumerable<Order>> GetOrdersAsync(); }
OrdersRepository.cs
using Ardalis.GuardClauses; using Orders.Api.Data; using Orders.Api.Entities; namespace Orders.Api.Repository; public class OrdersRepository : IOrdersRepository { private readonly AppDbContext _dbContext; public OrdersRepository(AppDbContext dbContext) { _dbContext = dbContext; } public async Task<Order> CreateOrderAsync(Order entity) { Guard.Against.Null(entity); await _dbContext.Orders.AddAsync(entity); await _dbContext.SaveChangesAsync(); return entity; } public async Task<Order> UpdateOrderAsync(Order entity) { Guard.Against.Null(entity); _dbContext.Orders.Update(entity); await _dbContext.SaveChangesAsync(); return entity; } public async Task<Order?> GetOrderByIdAsync(Guid orderId) { var result = await _dbContext.Orders.FindAsync(orderId); return result; } public async Task<bool> DeleteOrderByIdAsync(Order entity) { Guard.Against.Null(entity); _dbContext.Orders.Remove(entity); await _dbContext.SaveChangesAsync(); return true; } public Task<IEnumerable<Order>> GetOrdersAsync() { return Task.FromResult(_dbContext.Orders.AsEnumerable()); } }
IOrdersService.cs
using Orders.Api.Entities; namespace Orders.Api.Services; public interface IOrdersService { Task<Order> UpdateOrderAsync(Order order); Task<IEnumerable<Order>> GetOrdersAsync(); Task<Order> CreateOrderAsync(Order order); Task<Order> GetOrderByIdAsync(Guid orderId); }
AutoMapperProfile.cs
using AutoMapper; using Orders.Api.Dtos; using Orders.Api.Entities; namespace Orders.Api; public class AutoMapperProfile : Profile { public AutoMapperProfile() { CreateMap<Order, OrderForCreateDto>().ReverseMap(); CreateMap<Order, OrderForUpdateDto>().ReverseMap(); CreateMap<Order, OrderForReturnDto>().ReverseMap(); } }
Write the code responsible for calling AWS SQS
Install
AWSSDK.SQS
from NuGet package managerAdd the following files to the messages folder and the following lines of code in the respective files:
using Amazon.SQS.Model; namespace Orders.Api.Messaging; public interface ISqsMessenger { Task<SendMessageResponse> SendMessageAsync<T>(T message); }
namespace Orders.Api.Messaging; public class QueueSettings { public required string QueueName { get; set; } }
using System.Diagnostics.CodeAnalysis; using System.Text.Json; using Amazon.SQS; using Amazon.SQS.Model; using Microsoft.Extensions.Options; namespace Orders.Api.Messaging; public class SqsMessenger : ISqsMessenger { private readonly IAmazonSQS _amazonSqs; private readonly IOptions<QueueSettings> _queueSettings; private string? _queueUrl; public SqsMessenger(IAmazonSQS amazonSqs, IOptions<QueueSettings> queueSettings) { _amazonSqs = amazonSqs; _queueSettings = queueSettings; } public string? QueueUrl { get { if (_queueUrl != null) return _queueUrl; var queueUrl = _amazonSqs.GetQueueUrlAsync(_queueSettings.Value.QueueName).GetAwaiter().GetResult(); _queueUrl = queueUrl.QueueUrl; return _queueUrl; } } public async Task<SendMessageResponse> SendMessageAsync<T>([DisallowNull] T message) { if (message == null) throw new ArgumentNullException(nameof(message)); var queueUrl = await _amazonSqs.GetQueueUrlAsync(_queueSettings.Value.QueueName); var request = new SendMessageRequest() { QueueUrl = queueUrl.QueueUrl, MessageBody = JsonSerializer.Serialize(message), MessageAttributes = new Dictionary<string, MessageAttributeValue>() { { "MessageType", new MessageAttributeValue() { DataType = "String", StringValue = typeof(T).Name } } } }; return await _amazonSqs.SendMessageAsync(request); } }
Publish to SQS from our
OrdersService.cs
Let's now utilize the ISqsMessenger created to publish messages to our queue.
using Orders.Api.Entities; namespace Orders.Api.Services; public interface IOrdersService { Task<Order> UpdateOrderAsync(Order order); Task<IEnumerable<Order>> GetOrdersAsync(); Task<Order> CreateOrderAsync(Order order); Task<Order?> GetOrderByIdAsync(Guid orderId); Task<bool> DeleteAsync(Guid orderId); }
using Ardalis.GuardClauses; using Orders.Api.Entities; using Orders.Api.Messaging; using Orders.Api.Repository; namespace Orders.Api.Services; public class OrdersService : IOrdersService { private readonly IOrdersRepository _ordersRepository; private readonly ISqsMessenger _sqsMessenger; public OrdersService(IOrdersRepository ordersRepository, ISqsMessenger sqsMessenger) { _ordersRepository = ordersRepository; _sqsMessenger = sqsMessenger; } public async Task<Order> UpdateOrderAsync(Order order) { Guard.Against.Null(order); var result = await _ordersRepository.UpdateOrderAsync(order); await _sqsMessenger.SendMessageAsync(result.ToOrderUpdatedMessage()); return result; } public Task<IEnumerable<Order>> GetOrdersAsync() => _ordersRepository.GetOrdersAsync(); public async Task<Order> CreateOrderAsync(Order order) { Guard.Against.Null(order); var result = await _ordersRepository.CreateOrderAsync(order); await _sqsMessenger.SendMessageAsync(result.ToOrderCreatedMessage()); return result; } public Task<Order?> GetOrderByIdAsync(Guid orderId) => _ordersRepository.GetOrderByIdAsync(orderId); public async Task<bool> DeleteAsync(Guid orderId) { Guard.Against.Default(orderId); var order = await _ordersRepository.GetOrderByIdAsync(orderId); if (order is null) return false; var result = await _ordersRepository.DeleteOrderByIdAsync(order); if (result) { await _sqsMessenger.SendMessageAsync(new OrderDeletedMessage() { OrderId = orderId }); } return true; } }
Next, let's run our entity framework migrations to update our database.
Open the command prompt or terminal inside the
Orders.Api
folder path and type the following code.dotnet ef migrations add "Initial Commit"
And finally, our
Program.cs
class.using Amazon.SQS; using Microsoft.EntityFrameworkCore; using Orders.Api.Data; using Orders.Api.Messaging; using Orders.Api.Repository; using Orders.Api.Services; var builder = WebApplication.CreateBuilder(args); // Add services to the container. builder.Services.AddControllers(); builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); builder.Services.AddAutoMapper(typeof(Program)); builder.Services.AddDbContext<AppDbContext>(options => { options.UseSqlite(builder.Configuration.GetConnectionString("DefaultConnection")); }); builder.Services.AddScoped<IOrdersService, OrdersService>(); builder.Services.AddScoped<IOrdersRepository, OrdersRepository>(); builder.Services.AddSingleton<ISqsMessenger, SqsMessenger>(); builder.Services.AddSingleton<IAmazonSQS, AmazonSQSClient>(); builder.Services.Configure<QueueSettings>(builder.Configuration.GetSection("QueueSettings")); var app = builder.Build(); var dbContext = app.Services.GetRequiredService<AppDbContext>(); dbContext.Database.EnsureCreated(); // Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } app.UseHttpsRedirection(); app.UseAuthorization(); app.MapControllers(); app.Run();
Run the project and test the endpoints via Swagger UI to verify it all works as expected.
- Now, create, update, and delete some orders to send a message to our queue. Afterward, go to the AWS console to verify that you can see the messages.
Consume From Queue
Now, let's consume the messages published to the orders queue from our notifications service.
Create a new
Background Service
calledNotificationsService
Create a new class
NotificationsService
that inherits fromBackgroundService
.
This class would be responsible for processing the messages received from our messaging queue.
We would reuse the QueueSettings
class created in our API project, so copy them to this new project.
Add the following line of code to our
NotificationsService
class.We would use the mediator pattern implementation in
Mediatr
package to accurately dispatch the message to the correct handler.using System.Text.Json; using Amazon.SQS; using Amazon.SQS.Model; using MediatR; using Microsoft.Extensions.Options; using NotificationsService.Messaging; namespace NotificationsService; public class NotificationsService : BackgroundService { private readonly ILogger<NotificationsService> _logger; private readonly IAmazonSQS _amazonSqs; private readonly IOptions<QueueSettings> _queueSettingsOptions; private readonly IMediator _mediator; public NotificationsService(ILogger<NotificationsService> logger, IAmazonSQS amazonSqs, IOptions<QueueSettings> queueSettingsOptions, IMediator mediator) { _logger = logger; _amazonSqs = amazonSqs; _queueSettingsOptions = queueSettingsOptions; _mediator = mediator; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var queueUrl = await _amazonSqs.GetQueueUrlAsync(_queueSettingsOptions.Value.QueueName, stoppingToken); if (queueUrl == null) { throw new ApplicationException("cannot find the specified queue"); } var request = new ReceiveMessageRequest { AttributeNames = new List<string>(){"All"}, MessageAttributeNames = new List<string>(){"All"}, QueueUrl = queueUrl.QueueUrl, }; while (!stoppingToken.IsCancellationRequested) { var response = await _amazonSqs.ReceiveMessageAsync(request, stoppingToken); foreach (var message in response.Messages) { var messageType = $"NotificationsService.Messaging.{message.MessageAttributes["MessageType"].StringValue}"; var type = Type.GetType(messageType); var deserializedMessage = (IMessage) JsonSerializer.Deserialize(message.Body, type!)!; try { await _mediator.Send(deserializedMessage, stoppingToken); await _amazonSqs.DeleteMessageAsync(new DeleteMessageRequest { QueueUrl = queueUrl.QueueUrl, ReceiptHandle = message.ReceiptHandle }, stoppingToken); } catch (Exception e) { _logger.LogError(e, "an exception occurred handling the message: {message}"); } } } } }
And finally, let's bootstrap our dependencies in
Program.cs
using System.Reflection; using Amazon.SQS; using NotificationsService; using NotificationsService.Messaging; var builder = WebApplication.CreateBuilder(args); builder.Services.AddHostedService<NotificationsService.NotificationsService>(); builder.Services.AddSingleton<IAmazonSQS, AmazonSQSClient>(); builder.Services.Configure<QueueSettings>(builder.Configuration.GetSection("QueueSettings")); builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(Assembly.GetExecutingAssembly())); builder.Services.AddSingleton<INotifierService, ConsoleNotifierService>(); var app = builder.Build(); app.MapGet("/", () => "Hello World!"); app.Run();
Let's run our notifications service project to process our messages. As we can see in the result, we are successfully consuming the published messages.
Conclusion
This article examined asynchronous communication in distributed systems with AWS SQS. We successfully published our messages to SQS and consumed them in our client services. The code for this project is available here.