Skip to content

Commit

Permalink
docs and final work on querying non stale projection data. Closes GH-…
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Dec 19, 2024
1 parent 282dacb commit e27b7aa
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 15 deletions.
45 changes: 45 additions & 0 deletions docs/events/projections/async-daemon.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,48 @@ using multi-tenancy through a database per tenant. On these spans will be these

There is also a counter metric called `marten.daemon.skipping` or `marten.[database name].daemon.skipping`
that just emits and update every time that Marten has to "skip" stale events.

## Querying for Non Stale Data

There are some potential benefits to running projections asynchronously, namely:

* Avoiding concurrent updates to aggregated documents so that the results are accurate, especially when the aggregation is "multi-stream"
* Putting the work of building aggregates into a background process so you don't take the performance "hit" of doing that work during requests from a client

All that being said, using asynchronous projections means you're going into the realm of [eventual consistency](https://en.wikipedia.org/wiki/Eventual_consistency), and sometimes
that's really inconvenient when your users or clients expect up to date information about the projected aggregate data.

Not to worry though, because Marten will allow you to "wait" for an asynchronous projection to catch up so that you
can query the latest information as all the events captured at the time of the query are processed through the asynchronous
projection like so:

<!-- snippet: sample_using_query_for_non_stale_data -->
<a id='snippet-sample_using_query_for_non_stale_data'></a>
```cs
var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection(builder.Configuration.GetConnectionString("marten"));
opts.Projections.Add<TripProjection>(ProjectionLifecycle.Async);
}).AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

// DocumentStore() is an extension method in Marten just
// as a convenience method for test automation
await using var session = host.DocumentStore().LightweightSession();

// This query operation will first "wait" for the asynchronous projection building the
// Trip aggregate document to catch up to at least the highest event sequence number assigned
// at the time this method is called
var latest = await session.QueryForNonStaleData<Trip>(5.Seconds())
.OrderByDescending(x => x.Started)
.Take(10)
.ToListAsync();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs#L133-L157' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_query_for_non_stale_data' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Do note that this can time out if the projection just can't catch up to the latest event sequence in time. You may need to
be both cautious with using this in general, and also cautious especially with the timeout setting.
2 changes: 1 addition & 1 deletion docs/scenarios/command_handler_workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ builder.Services.AddMarten(opts =>
// need identity map mechanics in your commands or query handlers
.UseLightweightSessions();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs#L532-L550' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_use_identity_map_for_inline_aggregates' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs#L611-L629' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_use_identity_map_for_inline_aggregates' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

It's pretty involved, but the key takeaway is that _if_ you are using lightweight sessions for a performance optimization
Expand Down
34 changes: 34 additions & 0 deletions src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
using Marten;
using Marten.Events;
using Marten.Events.Daemon;
using Marten.Events.Daemon.Resiliency;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Xunit;

Expand Down Expand Up @@ -124,4 +127,35 @@ public async Task try_to_query_for_non_stale_data_by_aggregate_type()

await task;
}

public static async Task ExampleUsage()
{
#region sample_using_query_for_non_stale_data

var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection(builder.Configuration.GetConnectionString("marten"));
opts.Projections.Add<TripProjection>(ProjectionLifecycle.Async);
}).AddAsyncDaemon(DaemonMode.HotCold);

using var host = builder.Build();
await host.StartAsync();

// DocumentStore() is an extension method in Marten just
// as a convenience method for test automation
await using var session = host.DocumentStore().LightweightSession();

// This query operation will first "wait" for the asynchronous projection building the
// Trip aggregate document to catch up to at least the highest event sequence number assigned
// at the time this method is called
var latest = await session.QueryForNonStaleData<Trip>(5.Seconds())
.OrderByDescending(x => x.Started)
.Take(10)
.ToListAsync();

#endregion
}
}


15 changes: 1 addition & 14 deletions src/Marten/Events/AsyncProjectionTestingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,26 +142,13 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase
var shards = database.As<MartenDatabase>().Options.Projections.AsyncShardsPublishingType(aggregationType);
if (!shards.Any()) throw new InvalidOperationException($"Cannot find any registered async projection shards for aggregate type {aggregationType.FullNameInCode()}");

var all = shards.Concat([ShardName.HighWaterMark]).ToArray();
var tracking = new Dictionary<string, long>();
foreach (var shard in shards)
{
tracking[shard.Identity] = 0;
}

long highWaterMark = long.MaxValue;
var initial = await database.FetchProjectionProgressFor(all, token).ConfigureAwait(false);
foreach (var state in initial)
{
if (state.ShardName == ShardState.HighWaterMark)
{
highWaterMark = state.Sequence;
}
else
{
tracking[state.ShardName] = state.Sequence;
}
}
var highWaterMark = await database.FetchHighestEventSequenceNumber(token).ConfigureAwait(false);

if (tracking.isComplete(highWaterMark)) return;

Expand Down
7 changes: 7 additions & 0 deletions src/Marten/Storage/IMartenDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ Task<long> ProjectionProgressFor(ShardName name,
/// <param name="token"></param>
/// <returns></returns>
Task<long?> FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, CancellationToken token);

/// <summary>
/// Fetch the highest assigned event sequence number in this database
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
Task<long> FetchHighestEventSequenceNumber(CancellationToken token = default);
}

public enum ConnectionUsage
Expand Down
18 changes: 18 additions & 0 deletions src/Marten/Storage/MartenDatabase.EventStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,24 @@ public partial class MartenDatabase
}
}

/// <summary>
/// Fetch the highest assigned event sequence number in this database
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
public async Task<long> FetchHighestEventSequenceNumber(CancellationToken token = default)
{
await EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false);
await using var conn = CreateConnection();
await conn.OpenAsync(token).ConfigureAwait(false);
var highest = (long)await conn
.CreateCommand($"select last_value from {Options.Events.DatabaseSchemaName}.mt_events_sequence;")
.ExecuteScalarAsync(token).ConfigureAwait(false);

return highest;
}


/// <summary>
/// Fetch the current size of the event store tables, including the current value
/// of the event sequence number
Expand Down
5 changes: 5 additions & 0 deletions src/Marten/Storage/StandinDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ public Task<long> ProjectionProgressFor(ShardName name, CancellationToken token
throw new NotImplementedException();
}

public async Task<long> FetchHighestEventSequenceNumber(CancellationToken token = default)
{
throw new NotImplementedException();
}

public NpgsqlConnection CreateConnection(ConnectionUsage connectionUsage = ConnectionUsage.ReadWrite)
{
throw new NotImplementedException();
Expand Down

0 comments on commit e27b7aa

Please sign in to comment.