Skip to content

Commit

Permalink
Add noCancelWait() call which prohibits cancellation.
Browse files Browse the repository at this point in the history
Fix closeWait() calls to use noCancelWait() predicate.
Adding sleep to flaky MacOS test.
  • Loading branch information
cheatfate committed Sep 9, 2023
1 parent adda289 commit 2f5e823
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 7 deletions.
4 changes: 2 additions & 2 deletions chronos/apps/http/httpbodyrw.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ proc closeWait*(bstream: HttpBodyReader) {.async.} =
# data from stream at position [1].
for index in countdown((len(bstream.streams) - 1), 0):
res.add(bstream.streams[index].closeWait())
await allFutures(res)
await procCall(closeWait(AsyncStreamReader(bstream)))
res.add(procCall(closeWait(AsyncStreamReader(bstream))))
await noCancelWait(allFutures(res))
bstream.bstate = HttpState.Closed
untrackCounter(HttpBodyReaderTrackerName)

Expand Down
12 changes: 8 additions & 4 deletions chronos/apps/http/httpclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -827,26 +827,30 @@ proc sessionWatcher(session: HttpSessionRef) {.async.} =
break

proc closeWait*(request: HttpClientRequestRef) {.async.} =
var pending: seq[FutureBase]
if request.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}:
request.state = HttpReqRespState.Closing
if not(isNil(request.writer)):
if not(request.writer.closed()):
await request.writer.closeWait()
pending.add(FutureBase(request.writer.closeWait()))
request.writer = nil
await request.releaseConnection()
pending.add(FutureBase(request.releaseConnection()))
await noCancelWait(allFutures(pending))
request.session = nil
request.error = nil
request.state = HttpReqRespState.Closed
untrackCounter(HttpClientRequestTrackerName)

proc closeWait*(response: HttpClientResponseRef) {.async.} =
var pending: seq[FutureBase]
if response.state notin {HttpReqRespState.Closing, HttpReqRespState.Closed}:
response.state = HttpReqRespState.Closing
if not(isNil(response.reader)):
if not(response.reader.closed()):
await response.reader.closeWait()
pending.add(FutureBase(response.reader.closeWait()))
response.reader = nil
await response.releaseConnection()
pending.add(FutureBase(response.releaseConnection()))
await noCancelWait(allFutures(pending))
response.session = nil
response.error = nil
response.state = HttpReqRespState.Closed
Expand Down
29 changes: 28 additions & 1 deletion chronos/asyncfutures2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,33 @@ proc checkedCancelAndWait*(fut: FutureBase): Future[bool] =
res = fut.checkedCancel()
retFuture

proc noCancelWait*[T](future: Future[T]): Future[T] =
let retFuture = newFuture[T]("chronos.noCancelWait(T)",
{FutureFlag.OwnCancelSchedule})
template completeFuture() =
if future.completed():
when T is void:
retFuture.complete()
else:
retFuture.complete(future.value)
elif future.failed():
retFuture.fail(future.error)
else:
raiseAssert("Unexpected future state [" & $future.state & "]")

proc continuation(udata: pointer) {.gcsafe.} =
completeFuture()

proc cancellation(udata: pointer) {.gcsafe.} =
discard

if future.finished():
completeFuture()
else:
future.addCallback(continuation)
retFuture.cancelCallback = cancellation
retFuture

proc allFutures*(futs: varargs[FutureBase]): Future[void] =
## Returns a future which will complete only when all futures in ``futs``
## will be completed, failed or canceled.
Expand Down Expand Up @@ -886,7 +913,7 @@ proc allFutures*(futs: varargs[FutureBase]): Future[void] =
if len(nfuts) == 0 or len(nfuts) == finishedFutures:
retFuture.complete()

return retFuture
retFuture

proc allFutures*[T](futs: varargs[Future[T]]): Future[void] =
## Returns a future which will complete only when all futures in ``futs``
Expand Down
10 changes: 10 additions & 0 deletions tests/teststream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,11 @@ suite "Stream Transport test suite":
counter = 0
exitLoop = false

# This timer will help to awake events poll in case its going to stuck
# usually happens on MacOS.

var sleepFut = sleepAsync(1.seconds)

while not(exitLoop):
let
server = createStreamServer(initTAddress("127.0.0.1:0"))
Expand All @@ -1340,6 +1345,8 @@ suite "Stream Transport test suite":
transpFut = connect(address)
acceptFut = server.accept()

echo "AWAITING FOR [", counter, "] STEPS"

if counter > 0:
await stepsAsync(counter)

Expand Down Expand Up @@ -1379,6 +1386,9 @@ suite "Stream Transport test suite":
await server.closeWait()
echo "SERVER CLOSED"

if not(sleepFut.finished()):
await cancelAndWait(sleepFut)

echo "TEST EXITED"

markFD = getCurrentFD()
Expand Down

0 comments on commit 2f5e823

Please sign in to comment.