diff --git a/src/Foundatio/Jobs/IQueueJob.cs b/src/Foundatio/Jobs/IQueueJob.cs index 37f7e891..944eaf8d 100644 --- a/src/Foundatio/Jobs/IQueueJob.cs +++ b/src/Foundatio/Jobs/IQueueJob.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; using Foundatio.Queues; using Foundatio.Utility; @@ -18,6 +19,19 @@ public interface IQueueJob : IJob where T : class public static class QueueJobExtensions { + public static async Task RunUntilEmptyAsync(this IQueueJob job, TimeSpan acquireTimeout, + CancellationToken cancellationToken = default) where T : class + { + if (acquireTimeout <= TimeSpan.Zero) + throw new ArgumentException("Acquire timeout must be greater than zero", nameof(acquireTimeout)); + + using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + linkedCancellationTokenSource.CancelAfter(acquireTimeout); + + // NOTE: This has to be awaited otherwise the linkedCancellationTokenSource cancel timer will not fire. + await job.RunUntilEmptyAsync(linkedCancellationTokenSource.Token); + } + public static Task RunUntilEmptyAsync(this IQueueJob job, CancellationToken cancellationToken = default) where T : class { var logger = job.GetLogger(); diff --git a/src/Foundatio/Jobs/QueueJobBase.cs b/src/Foundatio/Jobs/QueueJobBase.cs index 46f583b4..bf08be5c 100644 --- a/src/Foundatio/Jobs/QueueJobBase.cs +++ b/src/Foundatio/Jobs/QueueJobBase.cs @@ -34,17 +34,16 @@ public virtual async Task RunAsync(CancellationToken cancellationToke { IQueueEntry queueEntry; - using (var timeoutCancellationTokenSource = new CancellationTokenSource(30000)) - using (var linkedCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCancellationTokenSource.Token)) + using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + linkedCancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(30)); + + try { - try - { - queueEntry = await _queue.Value.DequeueAsync(linkedCancellationToken.Token).AnyContext(); - } - catch (Exception ex) - { - return JobResult.FromException(ex, $"Error trying to dequeue message: {ex.Message}"); - } + queueEntry = await _queue.Value.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext(); + } + catch (Exception ex) + { + return JobResult.FromException(ex, $"Error trying to dequeue message: {ex.Message}"); } return await ProcessAsync(queueEntry, cancellationToken).AnyContext(); diff --git a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs index 7c826b1a..b5bf439d 100644 --- a/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs +++ b/src/Foundatio/Jobs/WorkItemJob/WorkItemJob.cs @@ -36,17 +36,16 @@ public virtual async Task RunAsync(CancellationToken cancellationToke { IQueueEntry queueEntry; - using (var timeoutCancellationTokenSource = new CancellationTokenSource(30000)) - using (var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCancellationTokenSource.Token)) + using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + linkedCancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(30)); + + try { - try - { - queueEntry = await _queue.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext(); - } - catch (Exception ex) - { - return JobResult.FromException(ex, $"Error trying to dequeue work item: {ex.Message}"); - } + queueEntry = await _queue.DequeueAsync(linkedCancellationTokenSource.Token).AnyContext(); + } + catch (Exception ex) + { + return JobResult.FromException(ex, $"Error trying to dequeue work item: {ex.Message}"); } return await ProcessAsync(queueEntry, cancellationToken).AnyContext(); diff --git a/src/Foundatio/Lock/CacheLockProvider.cs b/src/Foundatio/Lock/CacheLockProvider.cs index 25b9f753..499c1c69 100644 --- a/src/Foundatio/Lock/CacheLockProvider.cs +++ b/src/Foundatio/Lock/CacheLockProvider.cs @@ -141,15 +141,14 @@ public async Task AcquireAsync(string resource, TimeSpan? timeUntilExpire _logger.LogTrace("Will wait {Delay:g} before retrying to acquire lock {Resource} ({LockId})", delayAmount, resource, lockId); // wait until we get a message saying the lock was released or 3 seconds has elapsed or cancellation has been requested - using (var maxWaitCancellationTokenSource = new CancellationTokenSource(delayAmount)) - using (var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, maxWaitCancellationTokenSource.Token)) + using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + linkedCancellationTokenSource.CancelAfter(delayAmount); + + try { - try - { - await autoResetEvent.Target.WaitAsync(linkedCancellationTokenSource.Token).AnyContext(); - } - catch (OperationCanceledException) { } + await autoResetEvent.Target.WaitAsync(linkedCancellationTokenSource.Token).AnyContext(); } + catch (OperationCanceledException) { } Thread.Yield(); } while (!cancellationToken.IsCancellationRequested); diff --git a/src/Foundatio/Queues/InMemoryQueue.cs b/src/Foundatio/Queues/InMemoryQueue.cs index a73dc490..3b1ec9d3 100644 --- a/src/Foundatio/Queues/InMemoryQueue.cs +++ b/src/Foundatio/Queues/InMemoryQueue.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; @@ -202,10 +202,11 @@ protected override async Task> DequeueImplAsync(CancellationToken _logger.LogTrace("Waiting to dequeue item..."); var sw = Stopwatch.StartNew(); + using var dequeueCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(linkedCancellationToken); + dequeueCancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(10)); + try { - using var timeoutCancellationTokenSource = new CancellationTokenSource(10000); - using var dequeueCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(linkedCancellationToken, timeoutCancellationTokenSource.Token); await _autoResetEvent.WaitAsync(dequeueCancellationTokenSource.Token).AnyContext(); } catch (OperationCanceledException) { } diff --git a/src/Foundatio/Queues/QueueBase.cs b/src/Foundatio/Queues/QueueBase.cs index 6c22ba7f..8886473e 100644 --- a/src/Foundatio/Queues/QueueBase.cs +++ b/src/Foundatio/Queues/QueueBase.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; @@ -109,8 +109,8 @@ public async Task> DequeueAsync(CancellationToken cancellationTok await EnsureQueueCreatedAsync(_queueDisposedCancellationTokenSource.Token).AnyContext(); LastDequeueActivity = SystemClock.UtcNow; - using var linkedCancellationToken = GetLinkedDisposableCancellationTokenSource(cancellationToken); - return await DequeueImplAsync(linkedCancellationToken.Token).AnyContext(); + using var linkedCancellationTokenSource = GetLinkedDisposableCancellationTokenSource(cancellationToken); + return await DequeueImplAsync(linkedCancellationTokenSource.Token).AnyContext(); } public virtual async Task> DequeueAsync(TimeSpan? timeout = null) @@ -347,7 +347,7 @@ protected string GetFullMetricName(string customMetricName, string name) protected CancellationTokenSource GetLinkedDisposableCancellationTokenSource(CancellationToken cancellationToken) { - return CancellationTokenSource.CreateLinkedTokenSource(_queueDisposedCancellationTokenSource.Token, cancellationToken); + return CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _queueDisposedCancellationTokenSource.Token); } public override void Dispose() diff --git a/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs b/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs index 3ee0a37c..2f448dff 100644 --- a/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs +++ b/tests/Foundatio.Tests/Jobs/WorkItemJobTests.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -248,6 +249,28 @@ await queue.EnqueueAsync(new MyWorkItem Assert.Equal(2, stats.Completed); } + [Fact] + public async Task CanRunWorkItemJobUntilEmptyWithNoEnqueuedItems() + { + using var queue = new InMemoryQueue(o => o.LoggerFactory(Log)); + using var messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log)); + var handlerRegistry = new WorkItemHandlers(); + var job = new WorkItemJob(queue, messageBus, handlerRegistry, Log); + + handlerRegistry.Register(new MyWorkItemHandler(Log)); + + var sw = Stopwatch.StartNew(); + await job.RunUntilEmptyAsync(TimeSpan.FromMilliseconds(100)); + sw.Stop(); + + Assert.True(sw.Elapsed < TimeSpan.FromMilliseconds(500)); + + var stats = await queue.GetQueueStatsAsync(); + Assert.Equal(0, stats.Enqueued); + Assert.Equal(0, stats.Dequeued); + Assert.Equal(0, stats.Completed); + } + [Fact] public async Task CanRunBadWorkItem() { @@ -299,7 +322,7 @@ public override async Task HandleItemAsync(WorkItemContext context) for (int i = 1; i < 10; i++) { - await SystemClock.SleepAsync(100); + await SystemClock.SleepAsync(10); await context.ReportProgressAsync(10 * i); } }