Microservices with Dapr #1
This is the first post of the series in which I will show a "near-real-life" example of building a distributed system with Dapr (https://dapr.io/).
We are going to start this journey with a service that manages instant access savings accounts (savingsondapr). The main service is named SavingsOnDapr.Api.
There is a set of REST API endpoints exposed as the interface and a DDD-based core with InstantAccessSavingsAccount
aggregate. Part of the savings account implementation is moved into AccountAggregateRootBase<T>
base class which wouldn’t make sense if we had just one “account aggregate” class but we will add other derived classes to the hierarchy later.
The savings account implementation uses Marten Document DB to persist state, along with some other patterns (like Outbox, Optimistic Concurrency) that are built independently from what’s offered by Dapr. It’s done this way because after playing a bit with the Dapr implementation of state management building block (see Dapr docs) I simply came to a conclusion that it is overly limited right now.
InstantAccessSavingsAccount.cs
//...
namespace SavingsPlatform.Accounts.Aggregates.InstantAccess;
public class InstantAccessSavingsAccount :
AccountAggregateRootBase<InstantAccessSavingsAccountState>
{
private readonly SimulationConfig _simulationConfig;
public InstantAccessSavingsAccount(
IStateEntryRepository<InstantAccessSavingsAccountState> repository,
SimulationConfig simulationConfig,
InstantAccessSavingsAccountState? state = default)
: base(repository, state)
{
_simulationConfig = simulationConfig;
}
public async Task CreateAsync(CreateInstantSavingsAccountCommand request)
{
if (request is null || string.IsNullOrEmpty(request.ExternalRef))
{
throw new InvalidOperationException(
$"{nameof(request.ExternalRef)} cannot be null");
}
if (_state is not null)
{
throw new InvalidOperationException(
$"SavingsAccount with " +
$"{nameof(InstantAccessSavingsAccountState.ExternalRef)} = "+
$"{request.ExternalRef} already exists");
}
var accountId = Guid.NewGuid().ToString();
await ThrowIfAlreadyExists(accountId, request.ExternalRef);
var eventsToPub = new Collection<object>
{
new AccountCreated
{
Id = Guid.NewGuid().ToString(),
ExternalRef = request.ExternalRef,
AccountId = accountId,
AccountType = AccountType.SavingsAccount,
Timestamp = DateTime.UtcNow,
EventType = typeof(AccountCreated).Name,
PlatformId = request.PlatformId ?? string.Empty,
}
};
var state = new InstantAccessSavingsAccountState
{
Key = accountId,
ExternalRef = request.ExternalRef,
OpenedOn = DateTime.UtcNow,
InterestRate = request.InterestRate,
TotalBalance = 0m,
AccruedInterest = 0m,
PlatformId = request.PlatformId,
HasUnpublishedEvents = true,
UnpublishedEvents = Enumerable.Cast<object>(eventsToPub).ToList()
};
await CreateAsync(state);
}
//...
}
Transactional Outbox Pattern
Transactional Outbox Pattern is implemented in MartenStateEntryRepositoryBase<TEntry, TData>
which is an abstract base for aggregate state repository. Repository implements IStateEntryRepository<TEntry>
with following constraint TEntry : IAggregateStateEntry
.
Here are both of the mentioned interfaces:
public interface IStateEntryRepository<T> where T : IAggregateStateEntry
{
Task<T?> GetAccountAsync(string key);
Task<ICollection<T>> QueryAccountsByKeyAsync(
string[] keyName,
string[] keyValue,
bool isKeyValueAString = true);
Task AddAccountAsync(T account);
Task<bool> TryUpdateAccountAsync(
T account,
MessageProcessedEntry? msgEntry);
Task<bool> IsMessageProcessed(string msgId);
}
public interface IAggregateStateEntry : IEntry
{
string ExternalRef { get; init; }
bool HasUnpublishedEvents { get; set; }
ICollection<object>? UnpublishedEvents { get; set; }
}
As you can see the repository requires that the AggregateState Entry exposes the information about unpublished events. The main idea behind this implementation of Transactional Outbox is that we store aggregate state updates together with events in the same document entry.
The repository uses following dependencies to implement its logic:
private readonly IDocumentSession _documentSession;
private readonly IStateMapper<AggregateState<TData>, TEntry> _mapper;
private readonly IEventPublishingService _eventPublishingService;
That means our repository needs to know 3 things to do its job:
the instance of DocumentSession (part of Marten library) to interact with Document Store,
the mapper responsible for conversions between AggregateState type (representation of data suitable for DocumentStore ) and StateEntry type that is used directly by the Aggregate,
the instance of EventPublishingService that encapsulates logic of publishing events - in our case the implementation of this interface is a simple wrapper over DaprClient (this will be covered in more detail later).
The Aggregate Update algorithm could be described as follows:
map the aggregate state from its domain representation
store the aggregate state along with the unpublished events (if there are any) in document store
if there are unpublished events:
a) try publishing events
b) store the aggregate state again with the unpublished events collection set to empty
If for some reason an attempt to publish events fails, it’s going to be retried by the background job triggered on defined schedule. This approach gives us “at least once” delivery guarantee. Since we don’t want to persist the events indefinitely, the completion step writes an empty collection of events.
…back to Dapr
In this initial post the extent to which we use Dapr components is quite limited but it’s a good start. Basically, we are using Pub/Sub and input bindings. First of all, we need to provide components YAML files:
pubsub.yaml
------------
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: savings
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: "amqp://rabbitmq:5672"
- name: durable
value: "false"
- name: deletedWhenUnused
value: "false"
- name: autoAck
value: "false"
- name: reconnectWait
value: "45s"
- name: concurrency
value: "parallel"
- name: deliveryMode
value: "2"
- name: enableDeadLetter
value: "true"
- name: requeueInFailure
value: "true"
binding-publishevents.yaml
---------------------------
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: publish-events
spec:
type: bindings.cron
version: v1
metadata:
- name: schedule
value: "30 * * * * *"
- name: direction
value: "input"
- name: route
value: /api/platform/publish-events
pubsub.yaml declares RabbitMQ Publish&Subscribe broker with some configuration settings that make sense for local testing.
binding-publishevents.yaml declares a component based on Cron.Scheduler that makes Dapr sidecar call the specified endpoint (by route) with given interval (by schedule). This simple mechanism allows us to retry publishing events for accounts that failed to do so during the initial update. It might be worth noting that it is only a fallback measure and if it starts picking up too many accounts, it could easily overwhelm the messaging broker.
What makes Dapr approach really interesting is that these component files are passed to the sidecar container and all we need to know when implementing our service is the DaprClient interface (in case of Pub/Sub it is PublishEventAsync() method). In the implementation of DaprEventPublishingService you can notice there are separate methods for publishing commands (that are meant to be handled internally by the same service) and domain events that are published in topics named by the event type.
Keeping these infrastructural decisions separate from the services implementation means that we can use RabbitMQ broker for local dev env and provision Apache Kafka or Azure ServiceBus for staging/production without any changes in our code.
docker-compose.override.yml
----------------------------
# ...
savingsondapr.api-dapr:
command: ["./daprd",
"--app-id", "dapr-savings-api",
"--app-port", "80",
"--dapr-http-port", "3605",
"--resources-path", "/components",
"-config", "/configuration/config.yaml"
]
volumes:
- "./dapr/components/:/components"
- "./dapr/configuration:/configuration"
# ...
Here is the docker-compose file for SavingsOnDapr system:
version: '3.4'
services:
zipkin:
image: openzipkin/zipkin-slim
ports:
- "9411:9411" # allows us to access the web console
rabbitmq:
image: rabbitmq:3.13.7-management-alpine
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 10s
retries: 5
start_period: 10s
ports:
- "5672:5672" # AMQP protocol port
- "15672:15672" # management plugin port"
postgres:
image: postgres:14-alpine
ports:
- "5432:5432"
volumes:
- "~/apps/postgres:/var/lib/postgresql/data"
savingsondapr.api:
build:
context: .
dockerfile: ./SavingsOnDapr.Api/Dockerfile
depends_on:
- postgres
savingsondapr.api-dapr:
image: "daprio/daprd:1.14.2"
network_mode: "service:savingsondapr.api"
depends_on:
savingsondapr.api:
condition: service_started
rabbitmq:
condition: service_healthy
Zipkin is a service configured for distributed tracing in Dapr. Postgres is used by the Marten DocumentStore. One important detail to note in the file above is depends_on
section of savingsondapr.api-dapr
sidecar - we provide these conditions to prevent Dapr sidecar from failing when establishing connection to its dependencies.
The delay can be easily observed in Docker Desktop when the containers are starting:
If we don’t specify a condition for rabbitmq health, the sidecar will start too early and fail due to the pubsub component not being fully ready:
level=fatal msg="Fatal error from runtime: process component pubsub error: [INIT_COMPONENT_FAILURE]: initialization error occurred for pubsub (pubsub.rabbitmq/v1): [INIT_COMPONENT_FAILURE]: initialization error occurred for pubsub (pubsub.rabbitmq/v1): dial tcp 172.18.0.4:5672: connect: connection refused" app_id=dapr-savings-api instance=cedafda4f933 scope=dapr.runtime type=log ver=1.14.2
Anyway, that’s all for the initial take on Microservices with Dapr. In the next post I’ll try to cover optimistic concurrency and Pub/Sub component in more detail. See you!