Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
cheatfate committed Sep 13, 2023
1 parent e8995ea commit 6731bcd
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 134 deletions.
123 changes: 59 additions & 64 deletions chronos/asyncfutures2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,15 @@ proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) =
template cancelAndSchedule*(future: FutureBase) =
cancelAndSchedule(future, getSrcLocation())

proc cancel(future: FutureBase, loc: ptr SrcLoc): bool =
## Request that Future ``future`` cancel itself.
proc tryCancel(future: FutureBase, loc: ptr SrcLoc): bool =
## Perform attempt to cancel ``future``.
##
## This arranges for a `CancelledError` to be thrown into procedure which
## waits for ``future`` on the next cycle through the event loop.
## The procedure then has a chance to clean up or even deny the request
## using `try/except/finally`.
##
## This call do not guarantee that the ``future`` will be cancelled: the
## exception might be caught and acted upon, delaying cancellation of the
## ``future`` or preventing cancellation completely. The ``future`` may also
## return value or raise different exception.
##
## Immediately after this procedure is called, ``future.cancelled()`` will
## not return ``true`` (unless the Future was already cancelled).
## This procedure iterates through all the ``future`` children and tries to
## cancel the latest one. This will propagate CancelledError from latest
## child to the parent ``future``. Procedure returns ``true`` if latest child
## was successfully cancelled (e.g. it was pending before this call) and
## returns ``false`` if latest child is already finished (completed, failed or
## cancelled before this call).
if future.cancelled():
return true
if future.finished():
Expand All @@ -229,21 +223,16 @@ proc cancel(future: FutureBase, loc: ptr SrcLoc): bool =
doAssert future.internalCancelcb.isNil,
"futures returned from `{.async.}` functions must not use " &
"`cancelCallback`"
cancel(future.internalChild, loc)
tryCancel(future.internalChild, loc)
else:
if not(isNil(future.internalCancelcb)):
future.internalCancelcb(cast[pointer](future))
if FutureFlag.OwnCancelSchedule notin future.internalFlags:
cancelAndSchedule(future, loc)
future.cancelled()

template cancel*(future: FutureBase) =
## Cancel ``future``.
discard cancel(future, getSrcLocation())

template checkedCancel*(future: FutureBase): bool =
## Cancel ``future`` and return cancellation result.
cancel(future, getSrcLocation())
template tryCancel*(future: FutureBase): bool =
tryCancel(future, getSrcLocation())

proc clearCallbacks(future: FutureBase) =
future.internalCallbacks = default(seq[AsyncCallback])
Expand Down Expand Up @@ -788,19 +777,14 @@ proc oneValue*[T](futs: varargs[Future[T]]): Future[T] {.

return retFuture

proc cancelAndWait*(fut: FutureBase, loc: ptr SrcLoc): Future[void] =
## Initiate cancellation process for Future ``fut`` and wait until ``fut`` is
## done e.g. changes its state (become completed, failed or cancelled).
##
## If ``fut`` is already finished (completed, failed or cancelled) result
## Future[void] object will be returned complete.
let retFuture = newFuture[void]("chronos.cancelAndWait(FutureBase)",
{FutureFlag.OwnCancelSchedule})

proc cancelSoon(fut: FutureBase, aftercb: CallbackFunc, udata: pointer,
loc: ptr SrcLoc) =
## Initiate cancellation process for Future ``fut`` and call ``acb`` when
## the ``fut`` become finished (completed, failed or cancelled).
proc checktick(udata: pointer) {.gcsafe.} =
# We trying to cancel Future on more time, and if `cancel()` succeeds we
# return early.
if cancel(fut, loc):
if tryCancel(fut, loc):
return
# Cancellation signal was not delivered, so we trying to deliver it one
# more time after one tick. But we need to check situation when child
Expand All @@ -809,63 +793,74 @@ proc cancelAndWait*(fut: FutureBase, loc: ptr SrcLoc): Future[void] =
callTick(checktick, nil)

proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()

proc cancellation(udata: pointer) {.gcsafe.} =
# We are not going to change the state of `retFuture` to cancelled, so we
# will prevent the entire sequence of Futures from being cancelled one more
# time.
discard
# We do not use `callSoon` here because we was just scheduled from `poll()`.
if not(isNil(aftercb)):
aftercb(udata)

if fut.finished():
retFuture.complete()
else:
fut.addCallback(continuation)
retFuture.cancelCallback = cancellation
# Initiate cancellation process.
if not(cancel(fut, loc)):
# Cancellation signal was not delivered, so we trying to deliver it one
# more time after async tick. But we need to check case, when future was
# finished but our completion callback is not yet invoked.
if not(fut.finished()):
callTick(checktick, nil)
# We could not schedule callback directly otherwise we could fall into
# recursion problem.
if not(isNil(aftercb)):
let loop = getThreadDispatcher()
loop.callbacks.addLast(AsyncCallback(function: aftercb, udata: udata))
return

fut.addCallback(continuation)
# Initiate cancellation process.
if not(tryCancel(fut, loc)):
# Cancellation signal was not delivered, so we trying to deliver it one
# more time after async tick. But we need to check case, when future was
# finished but our completion callback is not yet invoked.
if not(fut.finished()):
callTick(checktick, nil)

retFuture
template cancelSoon*(fut: FutureBase, cb: CallbackFunc, udata: pointer) =
cancelSoon(fut, cb, udata, getSrcLocation())

template cancelAndWait*(future: FutureBase): Future[void] =
template cancelSoon*(fut: FutureBase, cb: CallbackFunc) =
cancelSoon(fut, cb, nil, getSrcLocation())

template cancelSoon*(fut: FutureBase, acb: AsyncCallback) =
cancelSoon(fut, acb.function, acb.udata, getSrcLocation())

template cancelSoon*(fut: FutureBase) =
cancelSoon(fut, nil, nil, getSrcLocation())

template cancel*(future: FutureBase) {.
deprecated: "Please use cancelSoon() or cancelAndWait() instead".} =
## Cancel ``future``.
cancelAndWait(future, getSrcLocation())
cancelSoon(future, nil, nil, getSrcLocation())

proc checkedCancelAndWait*(fut: FutureBase): Future[bool] =
proc cancelAndWait*(fut: FutureBase, loc: ptr SrcLoc): Future[void] =
## Initiate cancellation process for Future ``fut`` and wait until ``fut`` is
## done e.g. changes its state (become completed, failed or cancelled).
##
## If ``fut`` is already finished (completed, failed or cancelled) result
## Future[void] object will be returned complete.
let retFuture = newFuture[bool]("chronos.checkedCancelAndWait(FutureBase)",
let retFuture = newFuture[void]("chronos.cancelAndWait(FutureBase)",
{FutureFlag.OwnCancelSchedule})
var res: bool = false

proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete(res)
retFuture.complete()

proc cancellation(udata: pointer) {.gcsafe.} =
# We are not going to change the state of `retFuture` to cancelled, so we
# will prevent the entire sequence of Futures from being cancelled one more
# time.
discard

if fut.cancelled():
retFuture.complete(true)
elif fut.finished():
retFuture.complete(false)
if fut.finished():
retFuture.complete()
else:
fut.addCallback(continuation)
retFuture.cancelCallback = cancellation
# Initiate cancellation process.
res = fut.checkedCancel()
cancelSoon(fut, continuation, cast[pointer](retFuture), loc)

retFuture

template cancelAndWait*(future: FutureBase): Future[void] =
## Cancel ``future``.
cancelAndWait(future, getSrcLocation())

proc noCancelWait*[T](future: Future[T]): Future[T] =
let retFuture = newFuture[T]("chronos.noCancelWait(T)",
{FutureFlag.OwnCancelSchedule})
Expand Down
8 changes: 4 additions & 4 deletions chronos/asyncloop.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
# Timer exceeded first, we going to cancel `fut` and wait until it
# not completes.
timeouted = true
fut.cancel()
fut.cancelSoon()
else:
# Future `fut` completed/failed/cancelled first.
if not(isNil(timer)):
Expand All @@ -1430,7 +1430,7 @@ proc withTimeout*[T](fut: Future[T], timeout: Duration): Future[bool] =
if not(fut.finished()):
if not isNil(timer):
clearTimer(timer)
fut.cancel()
fut.cancelSoon()
else:
fut.completeFuture()

Expand Down Expand Up @@ -1488,7 +1488,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
if not(fut.finished()):
# Timer exceeded first.
timeouted = true
fut.cancel()
fut.cancelSoon()
else:
# Future `fut` completed/failed/cancelled first.
if not(isNil(timer)):
Expand All @@ -1500,7 +1500,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
if not(fut.finished()):
if not(isNil(timer)):
clearTimer(timer)
fut.cancel()
fut.cancelSoon()
else:
fut.completeFuture()

Expand Down
1 change: 1 addition & 0 deletions chronos/asyncsync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,7 @@ proc closeWait*(ab: AsyncEventQueue): Future[void] {.raises: [].} =
ab.close()
# Schedule `continuation` to be called only after all the `reader`
# notifications will be scheduled and processed.
retFuture.cancelCallback = cancellation
callSoon(continuation)
retFuture

Expand Down
4 changes: 2 additions & 2 deletions chronos/ratelimit.nim
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ proc worker(bucket: TokenBucket) {.async.} =
#buckets
sleeper = sleepAsync(milliseconds(timeToTarget))
await sleeper or eventWaiter
sleeper.cancel()
eventWaiter.cancel()
sleeper.cancelSoon()
eventWaiter.cancelSoon()
else:
await eventWaiter

Expand Down
4 changes: 2 additions & 2 deletions chronos/streams/asyncstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ proc close*(rw: AsyncStreamRW) =
callSoon(continuation)
else:
rw.future.addCallback(continuation)
rw.future.cancel()
rw.future.cancelSoon()
elif rw is AsyncStreamWriter:
if isNil(rw.wsource) or isNil(rw.writerLoop) or isNil(rw.future):
callSoon(continuation)
Expand All @@ -922,7 +922,7 @@ proc close*(rw: AsyncStreamRW) =
callSoon(continuation)
else:
rw.future.addCallback(continuation)
rw.future.cancel()
rw.future.cancelSoon()

proc closeWait*(rw: AsyncStreamRW): Future[void] =
## Close and frees resources of stream ``rw``.
Expand Down
14 changes: 5 additions & 9 deletions chronos/streams/tlsstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -267,19 +267,15 @@ template readAndReset(fut: untyped) =
break

proc cancelAndWait*(a, b, c, d: Future[TLSResult]): Future[void] =
var waiting: seq[Future[TLSResult]]
var waiting: seq[FutureBase]
if not(isNil(a)) and not(a.finished()):
a.cancel()
waiting.add(a)
waiting.add(a.cancelAndWait())
if not(isNil(b)) and not(b.finished()):
b.cancel()
waiting.add(b)
waiting.add(b.cancelAndWait())
if not(isNil(c)) and not(c.finished()):
c.cancel()
waiting.add(c)
waiting.add(c.cancelAndWait())
if not(isNil(d)) and not(d.finished()):
d.cancel()
waiting.add(d)
waiting.add(d.cancelAndWait())
allFutures(waiting)

proc dumpState*(state: cuint): string =
Expand Down
2 changes: 1 addition & 1 deletion tests/testbugs.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ suite "Asynchronous issues test suite":
const HELLO_PORT = 45679
const TEST_MSG = "testmsg"
const MSG_LEN = TEST_MSG.len()
const TestsCount = 500
const TestsCount = 100

type
CustomData = ref object
Expand Down
Loading

0 comments on commit 6731bcd

Please sign in to comment.