Skip to content

Commit

Permalink
Merge pull request #302 from FoundatioFx/feature/queue-job-run-until-…
Browse files Browse the repository at this point in the history
…empty-overload

Added QueueJob RunUntilEmptyAsync TimeSpan Overload, refactored cancellation tokens
  • Loading branch information
niemyjski authored Jul 22, 2024
2 parents b4c6781 + fc5ea63 commit 0f4e865
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 36 deletions.
16 changes: 15 additions & 1 deletion src/Foundatio/Jobs/IQueueJob.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using Foundatio.Queues;
using Foundatio.Utility;
Expand All @@ -18,6 +19,19 @@ public interface IQueueJob<T> : IJob where T : class

public static class QueueJobExtensions
{
public static async Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, TimeSpan acquireTimeout,
CancellationToken cancellationToken = default) where T : class
{
if (acquireTimeout <= TimeSpan.Zero)
throw new ArgumentException("Acquire timeout must be greater than zero", nameof(acquireTimeout));

using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
linkedCancellationTokenSource.CancelAfter(acquireTimeout);

// NOTE: This has to be awaited otherwise the linkedCancellationTokenSource cancel timer will not fire.
await job.RunUntilEmptyAsync(linkedCancellationTokenSource.Token);
}

public static Task RunUntilEmptyAsync<T>(this IQueueJob<T> job, CancellationToken cancellationToken = default) where T : class
{
var logger = job.GetLogger();
Expand Down
19 changes: 9 additions & 10 deletions src/Foundatio/Jobs/QueueJobBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,16 @@ public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToke
{
IQueueEntry<T> queueEntry;

using (var timeoutCancellationTokenSource = new CancellationTokenSource(30000))
using (var linkedCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCancellationTokenSource.Token))
using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
linkedCancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(30));

try
{
try
{
queueEntry = await _queue.Value.DequeueAsync(linkedCancellationToken.Token).AnyContext();
}
catch (Exception ex)
{
return JobResult.FromException(ex, $"Error trying to dequeue message: {ex.Message}");
}
queueEntry = await _queue.Value.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext();
}
catch (Exception ex)
{
return JobResult.FromException(ex, $"Error trying to dequeue message: {ex.Message}");
}

return await ProcessAsync(queueEntry, cancellationToken).AnyContext();
Expand Down
19 changes: 9 additions & 10 deletions src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,16 @@ public virtual async Task<JobResult> RunAsync(CancellationToken cancellationToke
{
IQueueEntry<WorkItemData> queueEntry;

using (var timeoutCancellationTokenSource = new CancellationTokenSource(30000))
using (var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCancellationTokenSource.Token))
using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
linkedCancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(30));

try
{
try
{
queueEntry = await _queue.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext();
}
catch (Exception ex)
{
return JobResult.FromException(ex, $"Error trying to dequeue work item: {ex.Message}");
}
queueEntry = await _queue.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext();
}
catch (Exception ex)
{
return JobResult.FromException(ex, $"Error trying to dequeue work item: {ex.Message}");
}

return await ProcessAsync(queueEntry, cancellationToken).AnyContext();
Expand Down
13 changes: 6 additions & 7 deletions src/Foundatio/Lock/CacheLockProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,14 @@ public async Task<ILock> AcquireAsync(string resource, TimeSpan? timeUntilExpire
_logger.LogTrace("Will wait {Delay:g} before retrying to acquire lock {Resource} ({LockId})", delayAmount, resource, lockId);

// wait until we get a message saying the lock was released or 3 seconds has elapsed or cancellation has been requested
using (var maxWaitCancellationTokenSource = new CancellationTokenSource(delayAmount))
using (var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, maxWaitCancellationTokenSource.Token))
using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
linkedCancellationTokenSource.CancelAfter(delayAmount);

try
{
try
{
await autoResetEvent.Target.WaitAsync(linkedCancellationTokenSource.Token).AnyContext();
}
catch (OperationCanceledException) { }
await autoResetEvent.Target.WaitAsync(linkedCancellationTokenSource.Token).AnyContext();
}
catch (OperationCanceledException) { }

Thread.Yield();
} while (!cancellationToken.IsCancellationRequested);
Expand Down
7 changes: 4 additions & 3 deletions src/Foundatio/Queues/InMemoryQueue.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
Expand Down Expand Up @@ -202,10 +202,11 @@ protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken
_logger.LogTrace("Waiting to dequeue item...");
var sw = Stopwatch.StartNew();

using var dequeueCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(linkedCancellationToken);
dequeueCancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(10));

try
{
using var timeoutCancellationTokenSource = new CancellationTokenSource(10000);
using var dequeueCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(linkedCancellationToken, timeoutCancellationTokenSource.Token);
await _autoResetEvent.WaitAsync(dequeueCancellationTokenSource.Token).AnyContext();
}
catch (OperationCanceledException) { }
Expand Down
8 changes: 4 additions & 4 deletions src/Foundatio/Queues/QueueBase.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
Expand Down Expand Up @@ -109,8 +109,8 @@ public async Task<IQueueEntry<T>> DequeueAsync(CancellationToken cancellationTok
await EnsureQueueCreatedAsync(_queueDisposedCancellationTokenSource.Token).AnyContext();

LastDequeueActivity = SystemClock.UtcNow;
using var linkedCancellationToken = GetLinkedDisposableCancellationTokenSource(cancellationToken);
return await DequeueImplAsync(linkedCancellationToken.Token).AnyContext();
using var linkedCancellationTokenSource = GetLinkedDisposableCancellationTokenSource(cancellationToken);
return await DequeueImplAsync(linkedCancellationTokenSource.Token).AnyContext();
}

public virtual async Task<IQueueEntry<T>> DequeueAsync(TimeSpan? timeout = null)
Expand Down Expand Up @@ -347,7 +347,7 @@ protected string GetFullMetricName(string customMetricName, string name)

protected CancellationTokenSource GetLinkedDisposableCancellationTokenSource(CancellationToken cancellationToken)
{
return CancellationTokenSource.CreateLinkedTokenSource(_queueDisposedCancellationTokenSource.Token, cancellationToken);
return CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _queueDisposedCancellationTokenSource.Token);
}

public override void Dispose()
Expand Down
25 changes: 24 additions & 1 deletion tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -248,6 +249,28 @@ await queue.EnqueueAsync(new MyWorkItem
Assert.Equal(2, stats.Completed);
}

[Fact]
public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems()
{
using var queue = new InMemoryQueue<WorkItemData>(o => o.LoggerFactory(Log));
using var messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log));
var handlerRegistry = new WorkItemHandlers();
var job = new WorkItemJob(queue, messageBus, handlerRegistry, Log);

handlerRegistry.Register<MyWorkItem>(new MyWorkItemHandler(Log));

var sw = Stopwatch.StartNew();
await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100));
sw.Stop();

Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(500));

var stats = await queue.GetQueueStatsAsync();
Assert.Equal(0, stats.Enqueued);
Assert.Equal(0, stats.Dequeued);
Assert.Equal(0, stats.Completed);
}

[Fact]
public async Task CanRunBadWorkItem()
{
Expand Down Expand Up @@ -299,7 +322,7 @@ public override async Task HandleItemAsync(WorkItemContext context)

for (int i = 1; i < 10; i++)
{
await SystemClock.SleepAsync(100);
await SystemClock.SleepAsync(10);
await context.ReportProgressAsync(10 * i);
}
}
Expand Down

0 comments on commit 0f4e865

Please sign in to comment.