Skip to content

Commit

Permalink
perf: further BatchProcessor improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
alexyakunin committed Sep 14, 2023
1 parent 381513d commit 7bb1358
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 140 deletions.
3 changes: 3 additions & 0 deletions src/Stl.Fusion.EntityFramework/DbEntityResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ public record Options

private ConcurrentDictionary<Symbol, BatchProcessor<TKey, TDbEntity?>>? _batchProcessors;
private ITransientErrorDetector<TDbContext>? _transientErrorDetector;
private ILogger? _log;

protected Options Settings { get; }
protected (Func<TDbContext, TKey[], IAsyncEnumerable<TDbEntity>> Query, int BatchSize)[] Queries { get; init; }
protected ILogger Log => _log ??= Services.LogFor(GetType());

Check warning on line 67 in src/Stl.Fusion.EntityFramework/DbEntityResolver.cs

View workflow job for this annotation

GitHub Actions / build

'DbEntityResolver<TDbContext, TKey, TDbEntity>.Log' hides inherited member 'DbServiceBase<TDbContext>.Log'. Use the new keyword if hiding was intended.

public Func<TDbEntity, TKey> KeyExtractor { get; init; }
public Expression<Func<TDbEntity, TKey>> KeyExtractorExpression { get; init; }
Expand Down Expand Up @@ -192,6 +194,7 @@ protected Func<TDbContext, TKey[], IAsyncEnumerable<TDbEntity>> CreateCompiledQu
var batchProcessor = new BatchProcessor<TKey, TDbEntity?> {
BatchSize = Settings.BatchSize,
Implementation = (batch, cancellationToken) => ProcessBatch(tenant, batch, cancellationToken),
Log = Log,
};
Settings.ConfigureBatchProcessor?.Invoke(batchProcessor);
if (batchProcessor.BatchSize != Settings.BatchSize)
Expand Down
7 changes: 2 additions & 5 deletions src/Stl.Testing/Output/TestOutputHelperAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@

namespace Stl.Testing.Output;

public class TestOutputHelperAccessor : ITestOutputHelperAccessor
public class TestOutputHelperAccessor(ITestOutputHelper? output) : ITestOutputHelperAccessor
{
public ITestOutputHelper? Output { get; set; }

public TestOutputHelperAccessor(ITestOutputHelper? output)
=> Output = output;
public ITestOutputHelper? Output { get; set; } = output;
}
217 changes: 118 additions & 99 deletions src/Stl/Async/BatchProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Stl.Internal;
using Stl.OS;

namespace Stl.Async;

Expand All @@ -16,144 +15,130 @@ static file class BatchProcessor

public class BatchProcessor<T, TResult>(Channel<BatchProcessor<T, TResult>.Item> queue) : ProcessorBase
{
protected Channel<Item> Queue = queue;
private volatile IBatchProcessorWorkerPolicy _workerPolicy = BatchProcessorWorkerPolicy.Default;

protected readonly Channel<Item> Queue = queue;
protected int PlannedWorkerCount;
protected HashSet<Task> Workers = new();
protected Task? WorkerCollectorTask;
protected object WorkerLock => Workers;

// Metrics
protected readonly object MetricsLock = new();
protected int ProcessedItemCount;
protected long MinQueueDurationInTicks = long.MaxValue;
protected CpuTimestamp LastAdjustmentAt;
// Statistics
protected RingBuffer<TimeSpan> RecentReports = new(7);
protected CpuTimestamp CooldownEndsAt;

// Settings
public int MinWorkerCount { get; set; } = 1;
public int MaxWorkerCount { get; set; } = HardwareInfo.GetProcessorCountFactor(2);
public int AdjustmentInterval { get; set; } = 16;
public TimeSpan AdjustmentPeriod { get; set; } = TimeSpan.FromMilliseconds(100);
public TimeSpan KillWorkerAt { get; set; } = TimeSpan.FromMilliseconds(1);
public TimeSpan Kill8WorkersAt { get; set; } = TimeSpan.FromMilliseconds(0.1);
public TimeSpan AddWorkerAt { get; set; } = TimeSpan.FromMilliseconds(20);
public TimeSpan Add4WorkersAt { get; set; } = TimeSpan.FromMilliseconds(100);
public TimeSpan WorkerCollectionPeriod { get; set; } = TimeSpan.FromSeconds(5);
public int BatchSize { get; set; } = 256;
public Func<List<Item>, CancellationToken, Task> Implementation { get; set; } = (_, _) => Task.CompletedTask;
public IBatchProcessorWorkerPolicy WorkerPolicy {
get => _workerPolicy;
set => Interlocked.Exchange(ref _workerPolicy, value);
}
public ILogger? Log { get; set; }

public BatchProcessor()
: this(Channel.CreateUnbounded<Item>(BatchProcessor.DefaultChannelOptions))
{ }

protected override Task DisposeAsyncCore()
{
// This method starts inside lock (Lock) block, so no need to lock here
Queue.Writer.TryComplete();
lock (WorkerLock)
return Workers.Count != 0
? Task.WhenAll(Workers.ToArray())
: Task.CompletedTask;
return Workers.Count != 0
? Task.WhenAll(Workers.ToArray())
: Task.CompletedTask;
}

public async Task<TResult> Process(T input, CancellationToken cancellationToken = default)
public Task<TResult> Process(T input, CancellationToken cancellationToken = default)
{
if (PlannedWorkerCount == 0) {
lock (WorkerLock)
if (PlannedWorkerCount == 0 && !StopToken.IsCancellationRequested)
_ = UpdateWorkerCount(MinWorkerCount);
lock (Lock)
if (PlannedWorkerCount == 0 && !StopToken.IsCancellationRequested) {
var minWorkerCount = WorkerPolicy.MinWorkerCount;
if (minWorkerCount < 1)
throw Errors.InternalError("WorkerPolicy.MinWorkerCount < 1");
_ = AddOrRemoveWorkers(WorkerPolicy.MinWorkerCount);
}
}

var item = new Item(input, cancellationToken);
await Queue.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
var result = await item.ResultTask.ConfigureAwait(false);
var workerDelta = UpdateMetrics(item);
await UpdateWorkerCount(workerDelta).ConfigureAwait(false);
return result;
return Queue.Writer.TryWrite(item)
? item.ResultTask
: ProcessAsync(item, cancellationToken);

async Task<TResult> ProcessAsync(Item item1, CancellationToken cancellationToken1) {
await Queue.Writer.WriteAsync(item1, cancellationToken1).ConfigureAwait(false);
return await item.ResultTask.ConfigureAwait(false);
}
}

public int GetWorkerCount()
{
lock (WorkerLock)
lock (Lock)
return Workers.Count;
}

public int GetPlannedWorkerCount()
{
lock (WorkerLock)
lock (Lock)
return PlannedWorkerCount;
}

public async Task Reset(CancellationToken cancellationToken = default)
{
while (true) {
ValueTask updateTask;
lock (WorkerLock)
updateTask = UpdateWorkerCount(MinWorkerCount - PlannedWorkerCount);

await updateTask.ConfigureAwait(false);
lock (WorkerLock) {
if (Workers.Count == MinWorkerCount) {
lock (MetricsLock) {
ProcessedItemCount = 0;
MinQueueDurationInTicks = long.MaxValue;
}
var wp = WorkerPolicy;
await AddOrRemoveWorkers(wp.MinWorkerCount - GetPlannedWorkerCount(), true).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromSeconds(0.05), cancellationToken).ConfigureAwait(false);
lock (Lock) {
if (Workers.Count == wp.MinWorkerCount && PlannedWorkerCount == wp.MinWorkerCount) {
RecentReports.Clear();
CooldownEndsAt = default;
return;
}
}
await Task.Delay(TimeSpan.FromSeconds(0.05), cancellationToken).ConfigureAwait(false);
}
}

// Protected methods

protected int UpdateMetrics(Item? item = null)
protected void ProcessWorkerReport(TimeSpan workerMinQueueTime)
{
if (item != null && item.DequeuedAt == default)
return 0; // Something is off / the item was never processed

var queueDuration = item != null
? (item.DequeuedAt - item.QueuedAt).Ticks
: 0;
TimeSpan minQueueDuration;
lock (MetricsLock) {
ProcessedItemCount += 1;
MinQueueDurationInTicks = Math.Min(queueDuration, MinQueueDurationInTicks);
var minQueueTime = TimeSpan.MaxValue;
lock (Lock) {
RecentReports.PushHeadAndMoveTailIfFull(workerMinQueueTime);
var now = CpuTimestamp.Now;
if (item != null && ProcessedItemCount < AdjustmentInterval)
return 0;

minQueueDuration = TimeSpan.FromTicks(MinQueueDurationInTicks);
ProcessedItemCount = 0;
MinQueueDurationInTicks = long.MaxValue;
if (now - LastAdjustmentAt < AdjustmentPeriod)
return 0;
if (CooldownEndsAt > now)
return;

LastAdjustmentAt = now;
var recentReportCount = (PlannedWorkerCount >> 2).Clamp(1, RecentReports.Count);
for (var i = 0; i < recentReportCount; i++)
minQueueTime = TimeSpanExt.Min(minQueueTime, RecentReports[i]);
}

if (minQueueDuration > Add4WorkersAt)
return 4;
if (minQueueDuration > AddWorkerAt)
return 1;
if (minQueueDuration < Kill8WorkersAt)
return -8;
if (minQueueDuration < KillWorkerAt)
return -1;
return 0;
var delta = WorkerPolicy.GetWorkerCountDelta(minQueueTime);
_ = AddOrRemoveWorkers(delta);
}

protected async ValueTask UpdateWorkerCount(int delta)
protected async ValueTask AddOrRemoveWorkers(int delta, bool ignoreCooldown = false)
{
if (delta == 0)
return;

lock (WorkerLock) {
var wp = WorkerPolicy;
lock (Lock) {
var now = CpuTimestamp.Now;
if (CooldownEndsAt > now && !ignoreCooldown)
return;

CooldownEndsAt = now + WorkerPolicy.Cooldown;
var oldPlannedWorkerCount = PlannedWorkerCount;
PlannedWorkerCount = (oldPlannedWorkerCount + delta).Clamp(MinWorkerCount, MaxWorkerCount);
delta = PlannedWorkerCount - oldPlannedWorkerCount;
PlannedWorkerCount = (oldPlannedWorkerCount + delta).Clamp(wp.MinWorkerCount, wp.MaxWorkerCount);
if (StopToken.IsCancellationRequested) {
PlannedWorkerCount = 0;
Queue.Writer.TryComplete();
return;
}

delta = PlannedWorkerCount - oldPlannedWorkerCount;
}
if (delta == 0)
return;
Expand All @@ -168,43 +153,65 @@ protected async ValueTask UpdateWorkerCount(int delta)
}
}
else { // workerDelta > 0 -> add workers
int workerCount;
using var flowSuppressor = ExecutionContextExt.SuppressFlow();
lock (WorkerLock) {
lock (Lock) {
WorkerCollectorTask ??= Task.Run(RunWorkerCollector, CancellationToken.None);
for (; delta > 0; delta--) {
var workerTask = Task.Run(RunWorker, CancellationToken.None);
Workers.Add(workerTask);
_ = workerTask.ContinueWith(static (task, state) => {
var self = (BatchProcessor<T, TResult>)state!;
lock (self.WorkerLock)
lock (self.Lock)
self.Workers.Remove(task);
}, this, TaskScheduler.Default);
}
workerCount = Workers.Count;
}
if (workerCount >= wp.MaxWorkerCount)
Log?.LogWarning("{BatchProcessor}: High worker count: {WorkerCount}",
GetType().GetName(), workerCount);
}
}

protected virtual async Task RunWorkerCollector()
{
var longCycle = WorkerCollectionPeriod;
var shortCycle = TimeSpan.FromSeconds(WorkerCollectionPeriod.TotalSeconds / 4);
var nextCycle = longCycle;
while (!StopToken.IsCancellationRequested)
try {
await Task.Delay(nextCycle, StopToken).ConfigureAwait(false);
var workerDelta = UpdateMetrics();
await UpdateWorkerCount(workerDelta).ConfigureAwait(false);
nextCycle = workerDelta < 0 ? shortCycle : longCycle;
}
catch {
nextCycle = shortCycle;
}
while (!StopToken.IsCancellationRequested) {
var wp = WorkerPolicy;
var longCycle = wp.CollectorCycle;
var shortCycle = wp.Cooldown + TimeSpan.FromMilliseconds(50);
var nextCycle = longCycle;
while (!StopToken.IsCancellationRequested)
try {
await Task.Delay(nextCycle, StopToken).ConfigureAwait(false);

// Measure queue time
var item = new Item(default!, StopToken) { IsMeasureOnlyItem = true };
await Queue.Writer.WriteAsync(item).ConfigureAwait(false);
await item.ResultTask.ConfigureAwait(false);
var minQueueTime = item.DequeuedAt - item.QueuedAt;

// Adjust worker count
var delta = WorkerPolicy.GetWorkerCountDelta(minQueueTime);
await AddOrRemoveWorkers(delta, true).ConfigureAwait(false);

// Decide on how quickly to run the next cycle
nextCycle = delta < 0 ? shortCycle : longCycle;
if (!ReferenceEquals(WorkerPolicy, wp))
break;
}
catch {
nextCycle = shortCycle;
}
}
}

protected virtual async Task RunWorker()
{
var reader = Queue.Reader;
var batch = new List<Item>(BatchSize);
var minQueueTime = TimeSpan.MaxValue;
var reportCounter = 0;
try {
while (await reader.WaitToReadAsync().ConfigureAwait(false)) {
while (reader.TryRead(out var item)) {
Expand All @@ -213,13 +220,27 @@ protected virtual async Task RunWorker()
return;
}
item.DequeuedAt = CpuTimestamp.Now;
if (item.IsMeasureOnlyItem) {
item.SetResult(default(TResult)!);
continue;
}
var queueTime = item.DequeuedAt - item.QueuedAt;
minQueueTime = TimeSpanExt.Min(queueTime, minQueueTime);
if (++reportCounter >= BatchSize) {
ProcessWorkerReport(minQueueTime);
reportCounter = 0;
minQueueTime = TimeSpan.MaxValue;
}
batch.Add(item);
if (batch.Count >= BatchSize)
await ProcessBatch(batch).ConfigureAwait(false);
}
await ProcessBatch(batch).ConfigureAwait(false);
}
}
catch (Exception e) {
Log?.LogError(e, "{BatchProcessor}: Worker failed", GetType().GetName());
}
finally {
await ProcessBatch(batch).ConfigureAwait(false);
}
Expand Down Expand Up @@ -276,9 +297,7 @@ private async Task CompleteProcessBatchAsync(List<Item> batch, Task resultTask)

private Task CompleteProcessBatch(List<Item> batch, Exception? error = null)
{
var completedAt = CpuTimestamp.Now;
foreach (var item in batch) {
item.CompletedAt = completedAt;
if (error == null)
item.SetError(Errors.UnprocessedBatchItem());
else if (error is OperationCanceledException)
Expand All @@ -294,19 +313,19 @@ private Task CompleteProcessBatch(List<Item> batch, Exception? error = null)

public class Item(T input, TaskCompletionSource<TResult> resultSource, CancellationToken cancellationToken)
{
private static readonly TaskCompletionSource<TResult> KillSource
private static readonly TaskCompletionSource<TResult> WorkerKillSource
= TaskCompletionSourceExt.New<TResult>()
.WithException(Errors.InternalError("Something is off: you should never see the KillItem in batches."));
.WithException(Errors.InternalError("Something is off: you should never see the WorkerKiller item in batches."));

public static readonly Item WorkerKiller = new(default!, KillSource, default);
public static readonly Item WorkerKiller = new(default!, WorkerKillSource, default);

public readonly T Input = input;
public readonly TaskCompletionSource<TResult> ResultSource = resultSource;
public readonly CancellationToken CancellationToken = cancellationToken;
public readonly CpuTimestamp QueuedAt = CpuTimestamp.Now;
public CpuTimestamp DequeuedAt;
public CpuTimestamp CompletedAt;
public Task<TResult> ResultTask => ResultSource.Task;
public bool IsMeasureOnlyItem;

public Item(T input, CancellationToken cancellationToken)
: this(input, TaskCompletionSourceExt.New<TResult>(), cancellationToken)
Expand Down
Loading

0 comments on commit 7bb1358

Please sign in to comment.