diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index be4849d7c..44d6731d5 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -202,21 +202,15 @@ proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) = template cancelAndSchedule*(future: FutureBase) = cancelAndSchedule(future, getSrcLocation()) -proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = - ## Request that Future ``future`` cancel itself. +proc tryCancel(future: FutureBase, loc: ptr SrcLoc): bool = + ## Perform attempt to cancel ``future``. ## - ## This arranges for a `CancelledError` to be thrown into procedure which - ## waits for ``future`` on the next cycle through the event loop. - ## The procedure then has a chance to clean up or even deny the request - ## using `try/except/finally`. - ## - ## This call do not guarantee that the ``future`` will be cancelled: the - ## exception might be caught and acted upon, delaying cancellation of the - ## ``future`` or preventing cancellation completely. The ``future`` may also - ## return value or raise different exception. - ## - ## Immediately after this procedure is called, ``future.cancelled()`` will - ## not return ``true`` (unless the Future was already cancelled). + ## This procedure iterates through all the ``future`` children and tries to + ## cancel the latest one. This will propagate CancelledError from latest + ## child to the parent ``future``. Procedure returns ``true`` if latest child + ## was successfully cancelled (e.g. it was pending before this call) and + ## returns ``false`` if latest child is already finished (completed, failed or + ## cancelled before this call). if future.cancelled(): return true if future.finished(): @@ -229,7 +223,7 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = doAssert future.internalCancelcb.isNil, "futures returned from `{.async.}` functions must not use " & "`cancelCallback`" - cancel(future.internalChild, loc) + tryCancel(future.internalChild, loc) else: if not(isNil(future.internalCancelcb)): future.internalCancelcb(cast[pointer](future)) @@ -237,13 +231,8 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = cancelAndSchedule(future, loc) future.cancelled() -template cancel*(future: FutureBase) = - ## Cancel ``future``. - discard cancel(future, getSrcLocation()) - -template checkedCancel*(future: FutureBase): bool = - ## Cancel ``future`` and return cancellation result. - cancel(future, getSrcLocation()) +template tryCancel*(future: FutureBase): bool = + tryCancel(future, getSrcLocation()) proc clearCallbacks(future: FutureBase) = future.internalCallbacks = default(seq[AsyncCallback]) @@ -788,19 +777,14 @@ proc oneValue*[T](futs: varargs[Future[T]]): Future[T] {. return retFuture -proc cancelAndWait*(fut: FutureBase, loc: ptr SrcLoc): Future[void] = - ## Initiate cancellation process for Future ``fut`` and wait until ``fut`` is - ## done e.g. changes its state (become completed, failed or cancelled). - ## - ## If ``fut`` is already finished (completed, failed or cancelled) result - ## Future[void] object will be returned complete. - let retFuture = newFuture[void]("chronos.cancelAndWait(FutureBase)", - {FutureFlag.OwnCancelSchedule}) - +proc cancelSoon(fut: FutureBase, aftercb: CallbackFunc, udata: pointer, + loc: ptr SrcLoc) = + ## Initiate cancellation process for Future ``fut`` and call ``acb`` when + ## the ``fut`` become finished (completed, failed or cancelled). proc checktick(udata: pointer) {.gcsafe.} = # We trying to cancel Future on more time, and if `cancel()` succeeds we # return early. - if cancel(fut, loc): + if tryCancel(fut, loc): return # Cancellation signal was not delivered, so we trying to deliver it one # more time after one tick. But we need to check situation when child @@ -809,45 +793,55 @@ proc cancelAndWait*(fut: FutureBase, loc: ptr SrcLoc): Future[void] = callTick(checktick, nil) proc continuation(udata: pointer) {.gcsafe.} = - retFuture.complete() - - proc cancellation(udata: pointer) {.gcsafe.} = - # We are not going to change the state of `retFuture` to cancelled, so we - # will prevent the entire sequence of Futures from being cancelled one more - # time. - discard + # We do not use `callSoon` here because we was just scheduled from `poll()`. + if not(isNil(aftercb)): + aftercb(udata) if fut.finished(): - retFuture.complete() - else: - fut.addCallback(continuation) - retFuture.cancelCallback = cancellation - # Initiate cancellation process. - if not(cancel(fut, loc)): - # Cancellation signal was not delivered, so we trying to deliver it one - # more time after async tick. But we need to check case, when future was - # finished but our completion callback is not yet invoked. - if not(fut.finished()): - callTick(checktick, nil) + # We could not schedule callback directly otherwise we could fall into + # recursion problem. + if not(isNil(aftercb)): + let loop = getThreadDispatcher() + loop.callbacks.addLast(AsyncCallback(function: aftercb, udata: udata)) + return + + fut.addCallback(continuation) + # Initiate cancellation process. + if not(tryCancel(fut, loc)): + # Cancellation signal was not delivered, so we trying to deliver it one + # more time after async tick. But we need to check case, when future was + # finished but our completion callback is not yet invoked. + if not(fut.finished()): + callTick(checktick, nil) - retFuture +template cancelSoon*(fut: FutureBase, cb: CallbackFunc, udata: pointer) = + cancelSoon(fut, cb, udata, getSrcLocation()) -template cancelAndWait*(future: FutureBase): Future[void] = +template cancelSoon*(fut: FutureBase, cb: CallbackFunc) = + cancelSoon(fut, cb, nil, getSrcLocation()) + +template cancelSoon*(fut: FutureBase, acb: AsyncCallback) = + cancelSoon(fut, acb.function, acb.udata, getSrcLocation()) + +template cancelSoon*(fut: FutureBase) = + cancelSoon(fut, nil, nil, getSrcLocation()) + +template cancel*(future: FutureBase) {. + deprecated: "Please use cancelSoon() or cancelAndWait() instead".} = ## Cancel ``future``. - cancelAndWait(future, getSrcLocation()) + cancelSoon(future, nil, nil, getSrcLocation()) -proc checkedCancelAndWait*(fut: FutureBase): Future[bool] = +proc cancelAndWait*(fut: FutureBase, loc: ptr SrcLoc): Future[void] = ## Initiate cancellation process for Future ``fut`` and wait until ``fut`` is ## done e.g. changes its state (become completed, failed or cancelled). ## ## If ``fut`` is already finished (completed, failed or cancelled) result ## Future[void] object will be returned complete. - let retFuture = newFuture[bool]("chronos.checkedCancelAndWait(FutureBase)", + let retFuture = newFuture[void]("chronos.cancelAndWait(FutureBase)", {FutureFlag.OwnCancelSchedule}) - var res: bool = false proc continuation(udata: pointer) {.gcsafe.} = - retFuture.complete(res) + retFuture.complete() proc cancellation(udata: pointer) {.gcsafe.} = # We are not going to change the state of `retFuture` to cancelled, so we @@ -855,17 +849,18 @@ proc checkedCancelAndWait*(fut: FutureBase): Future[bool] = # time. discard - if fut.cancelled(): - retFuture.complete(true) - elif fut.finished(): - retFuture.complete(false) + if fut.finished(): + retFuture.complete() else: - fut.addCallback(continuation) retFuture.cancelCallback = cancellation - # Initiate cancellation process. - res = fut.checkedCancel() + cancelSoon(fut, continuation, cast[pointer](retFuture), loc) + retFuture +template cancelAndWait*(future: FutureBase): Future[void] = + ## Cancel ``future``. + cancelAndWait(future, getSrcLocation()) + proc noCancelWait*[T](future: Future[T]): Future[T] = let retFuture = newFuture[T]("chronos.noCancelWait(T)", {FutureFlag.OwnCancelSchedule}) diff --git a/chronos/asyncloop.nim b/chronos/asyncloop.nim index ecb0ed97f..ba63ba10a 100644 --- a/chronos/asyncloop.nim +++ b/chronos/asyncloop.nim @@ -1417,7 +1417,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = # Timer exceeded first, we going to cancel `fut` and wait until it # not completes. timeouted = true - fut.cancel() + fut.cancelSoon() else: # Future `fut` completed/failed/cancelled first. if not(isNil(timer)): @@ -1430,7 +1430,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] = if not(fut.finished()): if not isNil(timer): clearTimer(timer) - fut.cancel() + fut.cancelSoon() else: fut.completeFuture() @@ -1488,7 +1488,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = if not(fut.finished()): # Timer exceeded first. timeouted = true - fut.cancel() + fut.cancelSoon() else: # Future `fut` completed/failed/cancelled first. if not(isNil(timer)): @@ -1500,7 +1500,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] = if not(fut.finished()): if not(isNil(timer)): clearTimer(timer) - fut.cancel() + fut.cancelSoon() else: fut.completeFuture() diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index 791050ebf..0feb51e17 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -748,6 +748,7 @@ proc closeWait*(ab: AsyncEventQueue): Future[void] {.raises: [].} = ab.close() # Schedule `continuation` to be called only after all the `reader` # notifications will be scheduled and processed. + retFuture.cancelCallback = cancellation callSoon(continuation) retFuture diff --git a/chronos/ratelimit.nim b/chronos/ratelimit.nim index 4147db788..ad66c067e 100644 --- a/chronos/ratelimit.nim +++ b/chronos/ratelimit.nim @@ -88,8 +88,8 @@ proc worker(bucket: TokenBucket) {.async.} = #buckets sleeper = sleepAsync(milliseconds(timeToTarget)) await sleeper or eventWaiter - sleeper.cancel() - eventWaiter.cancel() + sleeper.cancelSoon() + eventWaiter.cancelSoon() else: await eventWaiter diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index a03f0124e..4698e8358 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -913,7 +913,7 @@ proc close*(rw: AsyncStreamRW) = callSoon(continuation) else: rw.future.addCallback(continuation) - rw.future.cancel() + rw.future.cancelSoon() elif rw is AsyncStreamWriter: if isNil(rw.wsource) or isNil(rw.writerLoop) or isNil(rw.future): callSoon(continuation) @@ -922,7 +922,7 @@ proc close*(rw: AsyncStreamRW) = callSoon(continuation) else: rw.future.addCallback(continuation) - rw.future.cancel() + rw.future.cancelSoon() proc closeWait*(rw: AsyncStreamRW): Future[void] = ## Close and frees resources of stream ``rw``. diff --git a/chronos/streams/tlsstream.nim b/chronos/streams/tlsstream.nim index e0c9022b0..6432a10d4 100644 --- a/chronos/streams/tlsstream.nim +++ b/chronos/streams/tlsstream.nim @@ -267,19 +267,15 @@ template readAndReset(fut: untyped) = break proc cancelAndWait*(a, b, c, d: Future[TLSResult]): Future[void] = - var waiting: seq[Future[TLSResult]] + var waiting: seq[FutureBase] if not(isNil(a)) and not(a.finished()): - a.cancel() - waiting.add(a) + waiting.add(a.cancelAndWait()) if not(isNil(b)) and not(b.finished()): - b.cancel() - waiting.add(b) + waiting.add(b.cancelAndWait()) if not(isNil(c)) and not(c.finished()): - c.cancel() - waiting.add(c) + waiting.add(c.cancelAndWait()) if not(isNil(d)) and not(d.finished()): - d.cancel() - waiting.add(d) + waiting.add(d.cancelAndWait()) allFutures(waiting) proc dumpState*(state: cuint): string = diff --git a/tests/testbugs.nim b/tests/testbugs.nim index cf18a13c9..1f2a932d0 100644 --- a/tests/testbugs.nim +++ b/tests/testbugs.nim @@ -14,7 +14,7 @@ suite "Asynchronous issues test suite": const HELLO_PORT = 45679 const TEST_MSG = "testmsg" const MSG_LEN = TEST_MSG.len() - const TestsCount = 500 + const TestsCount = 100 type CustomData = ref object diff --git a/tests/testfut.nim b/tests/testfut.nim index d8352b409..27ab572f5 100644 --- a/tests/testfut.nim +++ b/tests/testfut.nim @@ -965,7 +965,7 @@ suite "Future[T] behavior test suite": let discarded {.used.} = await fut1 check res - asyncTest "cancel() async procedure test": + asyncTest "tryCancel() async procedure test": var completed = 0 proc client1() {.async.} = @@ -985,7 +985,7 @@ suite "Future[T] behavior test suite": inc(completed) var fut = client4() - fut.cancel() + discard fut.tryCancel() # Future must not be cancelled immediately, because it has many nested # futures. @@ -1036,7 +1036,7 @@ suite "Future[T] behavior test suite": var fut1 = client2() var fut2 = client2() - fut1.cancel() + discard fut1.tryCancel() await fut1 await cancelAndWait(fut2) check: @@ -1059,17 +1059,17 @@ suite "Future[T] behavior test suite": if not(retFuture.finished()): retFuture.complete() - proc cancel(udata: pointer) {.gcsafe.} = + proc cancellation(udata: pointer) {.gcsafe.} = inc(cancelled) if not(retFuture.finished()): removeTimer(moment, completion, cast[pointer](retFuture)) - retFuture.cancelCallback = cancel + retFuture.cancelCallback = cancellation discard setTimer(moment, completion, cast[pointer](retFuture)) return retFuture var fut = client1(100.milliseconds) - fut.cancel() + discard fut.tryCancel() await sleepAsync(500.milliseconds) check: fut.cancelled() @@ -1154,7 +1154,7 @@ suite "Future[T] behavior test suite": someFut = newFuture[void]() var raceFut3 = raceProc() - someFut.cancel() + discard someFut.tryCancel() await cancelAndWait(raceFut3) check: @@ -1561,7 +1561,7 @@ suite "Future[T] behavior test suite": var future = newFuture[void]("last.child.future") var someFut = testFoo(future) future.complete() - someFut.cancel() + discard someFut.tryCancel() await someFut asyncTest "wait() cancellation undefined behavior test #2": @@ -1588,7 +1588,7 @@ suite "Future[T] behavior test suite": var future = newFuture[void]("last.child.future") var someFut = testFoo(future) future.complete() - someFut.cancel() + discard someFut.tryCancel() await someFut asyncTest "withTimeout() cancellation undefined behavior test #1": @@ -1616,7 +1616,7 @@ suite "Future[T] behavior test suite": var future = newFuture[void]("last.child.future") var someFut = testFoo(future) future.complete() - someFut.cancel() + discard someFut.tryCancel() await someFut asyncTest "withTimeout() cancellation undefined behavior test #2": @@ -1648,7 +1648,7 @@ suite "Future[T] behavior test suite": var future = newFuture[void]("last.child.future") var someFut = testFoo(future) future.complete() - someFut.cancel() + discard someFut.tryCancel() await someFut asyncTest "Cancellation behavior test": @@ -1664,18 +1664,16 @@ suite "Future[T] behavior test suite": block: # Cancellation of pending Future let future = newFuture[void]("last.child.pending.future") - let res = await checkedCancelAndWait(future) + await cancelAndWait(future) check: - res == true future.cancelled() == true block: # Cancellation of completed Future let future = newFuture[void]("last.child.completed.future") future.complete() - let res = await checkedCancelAndWait(future) + await cancelAndWait(future) check: - res == false future.cancelled() == false future.completed() == true @@ -1683,9 +1681,8 @@ suite "Future[T] behavior test suite": # Cancellation of failed Future let future = newFuture[void]("last.child.failed.future") future.fail(newException(ValueError, "ABCD")) - let res = await checkedCancelAndWait(future) + await cancelAndWait(future) check: - res == false future.cancelled() == false future.failed() == true @@ -1693,18 +1690,16 @@ suite "Future[T] behavior test suite": # Cancellation of already cancelled Future let future = newFuture[void]("last.child.cancelled.future") future.cancelAndSchedule() - let res = await checkedCancelAndWait(future) + await cancelAndWait(future) check: - res == true future.cancelled() == true block: # Cancellation of Pending->Pending->Pending->Pending sequence let future = newFuture[void]("last.child.pending.future") let testFut = testOuterFoo(future) - let res = await checkedCancelAndWait(testFut) + await cancelAndWait(testFut) check: - res == true testFut.cancelled() == true block: @@ -1712,9 +1707,8 @@ suite "Future[T] behavior test suite": let future = newFuture[void]("last.child.completed.future") let testFut = testOuterFoo(future) future.complete() - let res = await checkedCancelAndWait(testFut) + await cancelAndWait(testFut) check: - res == false testFut.cancelled() == false testFut.completed() == true @@ -1723,9 +1717,8 @@ suite "Future[T] behavior test suite": let future = newFuture[void]("last.child.failed.future") let testFut = testOuterFoo(future) future.fail(newException(ValueError, "ABCD")) - let res = await checkedCancelAndWait(testFut) + await cancelAndWait(testFut) check: - res == false testFut.cancelled() == false testFut.failed() == true @@ -1734,9 +1727,8 @@ suite "Future[T] behavior test suite": let future = newFuture[void]("last.child.cancelled.future") let testFut = testOuterFoo(future) future.cancelAndSchedule() - let res = await checkedCancelAndWait(testFut) + await cancelAndWait(testFut) check: - res == true testFut.cancelled() == true block: @@ -1748,7 +1740,7 @@ suite "Future[T] behavior test suite": future.cancelCallback = cancellation # Note, future will never be finished in such case, until we manually not # finish it - let cancelFut = checkedCancelAndWait(future) + let cancelFut = cancelAndWait(future) await sleepAsync(100.milliseconds) check: cancelFut.finished() == false @@ -1756,12 +1748,11 @@ suite "Future[T] behavior test suite": # Now we manually changing Future's state, so `cancelAndWait` could # finish future.complete() - let res = await cancelFut + await cancelFut check: cancelFut.finished() == true future.cancelled() == false future.finished() == true - res == false block: # Cancellation of pending Future, which will fail Future on cancellation, @@ -1773,9 +1764,8 @@ suite "Future[T] behavior test suite": future.cancelCallback = cancellation # Note, future will never be finished in such case, until we manually not # finish it - let res = await checkedCancelAndWait(future) + await cancelAndWait(future) check: - res == false future.cancelled() == false future.completed() == true @@ -1789,9 +1779,8 @@ suite "Future[T] behavior test suite": future.cancelCallback = cancellation # Note, future will never be finished in such case, until we manually not # finish it - let res = await checkedCancelAndWait(future) + await cancelAndWait(future) check: - res == false future.cancelled() == false future.failed() == true @@ -1805,9 +1794,8 @@ suite "Future[T] behavior test suite": future.cancelCallback = cancellation # Note, future will never be finished in such case, until we manually not # finish it - let res = await checkedCancelAndWait(future) + await cancelAndWait(future) check: - res == true future.cancelled() == true block: @@ -1822,7 +1810,7 @@ suite "Future[T] behavior test suite": # Note, future will never be finished in such case, until we manually not # finish it let testFut = testOuterFoo(future) - let cancelFut = checkedCancelAndWait(testFut) + let cancelFut = cancelAndWait(testFut) await sleepAsync(100.milliseconds) check: cancelFut.finished() == false @@ -1831,14 +1819,13 @@ suite "Future[T] behavior test suite": # Now we manually changing Future's state, so `cancelAndWait` could # finish future.complete() - let res = await cancelFut + await cancelFut check: cancelFut.finished() == true future.cancelled() == false future.finished() == true testFut.cancelled() == false testFut.finished() == true - res == false block: # Cancellation of pending Pending->Pending->Pending->Pending, when @@ -1852,14 +1839,13 @@ suite "Future[T] behavior test suite": # Note, future will never be finished in such case, until we manually not # finish it let testFut = testOuterFoo(future) - let res = await checkedCancelAndWait(testFut) + await cancelAndWait(testFut) await sleepAsync(100.milliseconds) check: testFut.cancelled() == false testFut.finished() == true future.cancelled() == false future.finished() == true - res == false block: # Cancellation of pending Pending->Pending->Pending->Pending, when @@ -1872,14 +1858,13 @@ suite "Future[T] behavior test suite": # Note, future will never be finished in such case, until we manually not # finish it let testFut = testOuterFoo(future) - let res = await checkedCancelAndWait(testFut) + await cancelAndWait(testFut) await sleepAsync(100.milliseconds) check: testFut.cancelled() == false testFut.failed() == true future.cancelled() == false future.failed() == true - res == false block: # Cancellation of pending Pending->Pending->Pending->Pending, when @@ -1892,12 +1877,11 @@ suite "Future[T] behavior test suite": # Note, future will never be finished in such case, until we manually not # finish it let testFut = testOuterFoo(future) - let res = await checkedCancelAndWait(testFut) + await cancelAndWait(testFut) await sleepAsync(100.milliseconds) check: testFut.cancelled() == true future.cancelled() == true - res == true test "Issue #334 test": proc test(): bool = @@ -1925,7 +1909,7 @@ suite "Future[T] behavior test suite": raise exc let x = c() - x.cancel() + x.cancelSoon() try: waitFor x diff --git a/tests/testratelimit.nim b/tests/testratelimit.nim index 2a7560bf6..e206884c4 100644 --- a/tests/testratelimit.nim +++ b/tests/testratelimit.nim @@ -96,7 +96,7 @@ suite "Token Bucket": futBlocker.finished == false fut2.finished == false - futBlocker.cancel() + futBlocker.cancelSoon() waitFor(fut2.wait(10.milliseconds)) test "Very long replenish": diff --git a/tests/testsync.nim b/tests/testsync.nim index 4e0968229..4ade20531 100644 --- a/tests/testsync.nim +++ b/tests/testsync.nim @@ -150,9 +150,9 @@ suite "Asynchronous sync primitives test suite": var fut2 = task(lock, 2, n2) var fut3 = task(lock, 3, n3) if cancelIndex == 2: - fut2.cancel() + fut2.cancelSoon() else: - fut3.cancel() + fut3.cancelSoon() await allFutures(fut1, fut2, fut3) result = stripe diff --git a/tests/testtime.nim b/tests/testtime.nim index 0c4a1f718..03c2318d3 100644 --- a/tests/testtime.nim +++ b/tests/testtime.nim @@ -102,7 +102,7 @@ suite "Asynchronous timers & steps test suite": # We need `fut` because `stepsAsync` do not power `poll()` anymore. block: - var fut = sleepAsync(50.milliseconds) + var fut {.used.} = sleepAsync(50.milliseconds) poll() check: @@ -111,7 +111,7 @@ suite "Asynchronous timers & steps test suite": fut3.completed() == false block: - var fut = sleepAsync(50.milliseconds) + var fut {.used.} = sleepAsync(50.milliseconds) poll() check: @@ -119,7 +119,7 @@ suite "Asynchronous timers & steps test suite": fut3.completed() == false block: - var fut = sleepAsync(50.milliseconds) + var fut {.used.} = sleepAsync(50.milliseconds) poll() check: