Microservices with Dapr #5: Event aggregations
This time we will look at event aggregations and how they can be used within the SavingsOnDapr project domain.
In the previous post I introduced an Event Store service that was set up to gather events emitted by the main service responsible for hosting the bank accounts aggregates. One way such extension could be useful is for providing analytics / statistics that are not collected immediately when processing transactions.
The logic of SavingsOnDapr.Accounts is focused around money transfers. DepositTransferActor orchestrates commands that in turn publish events after successful transactions. Our account aggregates don’t know anything about potential consumers of these events and the role of our main service ends with at-least-once delivery guarantee implemented with Transactional Outbox pattern.
Take a look at the current state of SavingsOnDapr Project visualised on a not-so-professional diagram:
Event Aggregations
To illustrate how events collected by the Event Store can be processed on demand to provide some useful statistics, I will use Live Aggregations offered by Marten Event Store (see details here).
Let’s start with following class that exposes Apply() methods for AccountCredited and AccountDebited events and a set of properties - AccountHierarchySummary:
AccountHierarchySummary is initialized with a date range and uses some knowledge of deposit transfers to collect stats like:
“total amount transferred to / from savings accounts”
“total amount of new deposits / withdrawals on the current account”
Aggregation is performed in the EventStore with only a couple lines of code:
public async Task<AccountHierarchySummaryDto?>
GetAccountHierarchySummary(
string streamId,
DateTime? fromDate,
DateTime? toDate)
{
using var session = _documentStore.LightweightSession();
var summary = await session.Events.AggregateStreamAsync(
streamId, 0, toDate,
new AccountHierarchySummary(fromDate, toDate));
if (summary is not null)
{
return new AccountHierarchySummaryDto(
fromDate, toDate, streamId,
summary.TotalAmountTransferredToSavings,
summary.TotalAmountWithdrawnFromSavings,
summary.TotalAmountOfNewDeposits,
summary.TotalAmountOfWithdrawals,
summary.TotalCountOfDepositTransfers);
}
return null;
}Notice how AccountHierarchySummary is separated from any project specific dependencies. It only defines how to apply relevant event types to its internal aggregation model, without knowing anything about the event store. One of the benefits of such a separation of concerns is friction-less unit testing - we simply need to provide a set of events and assertions for expected results.
Interest Accrual with event aggregations
Let’s move to a more practical example of how event aggregations might be helpful. For every savings account created and activated in the system we need to run a daily interest accrual process.
Each savings account has a property named
InterestApplicationFrequencywhich tells how frequently the amount of accrued interest should be applied (added) to the account total balance. Apart from that, there is alsoInterestAccrualDueOnwhich controls the accrual part itself which means updating theAccruedInterestamount regularly without changing the account balance. We want this process to be run for each account every ~24h.
The interest accrual will be triggered by Dapr Binding, similarly to how it was done for publish-events. So, it starts with yet another API endpoint:
In order to run some more advanced queries with Marten DB Repository, it now allows for specifying the operator for each condition by simply appending it to keyName entries:
["data.activatedOn LessThan", "data.interestAccruedOn LessThanOrEqual"]which is then parsed into a following PostgreSQL JsonPath query:
select d.id, d.data, d.mt_version from public.mt_doc_aggregatestateinstantaccesssavingsaccountdto as d where data @@ '$.data.activatedOn < "2024-10-11T09:47:03"' AND data @@ '$.data.interestAccruedOn <= "2024-10-11T13:47:03"'The /accrue-interest endpoint fetches savings accounts that meet the criteria, groups them by its CurrentAccountId and publishes a command for each of account hierarchy it has found. The next important part of this process is done in the command handler:
Using Dapr service invocation
The interesting part of this logic starts in the line 18 of the code snippet above. It uses EventStoreApiClient - which is a simple wrapper over DaprClient (added mainly to avoid direct dependency on Dapr where it’s possible) - to invoke a method of the EventStore service API:
public Task<IDictionary<string, AccountBalanceRangeEntry>> GetBalancesForAccountHierarchyAsync(
string currentAccountId,
DateTime? fromDate,
DateTime? toDate) =>
_daprClient.InvokeMethodAsync<IDictionary<string, AccountBalanceRangeEntry>>
(HttpMethod.Get,
_config.EventStoreApiServiceName,
PrepareDateRangeQuery(currentAccountId, fromDate, toDate));
private string PrepareDateRangeQuery(
string currentAccountId, DateTime? fromDate, DateTime? toDate)
{
var queryString = new StringBuilder(
string.Format(_config.TransactionsEndpoint, currentAccountId));
if (fromDate.HasValue)
{
queryString.Append($"?{_config.FromQueryParameter}={fromDate.Value:s}");
}
if (toDate.HasValue)
{
queryString.Append(fromDate.HasValue ? "&" : "?");
queryString.Append($"{_config.ToQueryParameter}={toDate.Value:s}");
}
return queryString.ToString();
}This service call is done as a sidecar-to-sidecar communication, as it’s described in the Dapr.io docs:
To invoke an application using Dapr, you can use the
invokeAPI on any Dapr instance. The sidecar programming model encourages each application to interact with its own instance of Dapr. The Dapr sidecars discover and communicate with one another.
/balances-summary endpoint uses Event Aggregation to extract account balance ranges (i.e. min and max amounts) based on all events recorded in the stream identified by CurrentAccountId. It responds with a dictionary which can be used to calculate the mean account balance that we use for interest accrual. If no transactions have been recorded for the given date range, current TotalBalance will be used to accrue the interest.
(balances?.ContainsKey(accId) ?? false) ?
(balances[accId].MinTotalBalance + balances[accId].MaxTotalBalance)*0.5m :
null…back to event aggregation
The event aggregation for generating /balances-summary response looks like this:
As you can see, it provides Apply(event) methods for 3 types of events: AccountCredited, AccountDebited, AccountInterestApplied. Since it processes only a fragment of entire account balance history, event.Amount is used to calculate the balance before the event occurred - this way it ensures the initial state before entering the date range is taken into account.
That’s all for now. Thanks for reading through!
The code for this post can be found here: savingsondapr github







