Skip to content

Commit

Permalink
Yield in each loop of observe_value (#648)
Browse files Browse the repository at this point in the history
This helps in the very specific case of an observe_value directly
or indirectly modifying the signal that is being updated. This
creates a busy loop which will not be interrupted by wrapping
in asyncio.wait_for.  To demonstrate, added
test_observe_value_times_out_with_no_external_task
  • Loading branch information
coretl authored Nov 14, 2024
1 parent f0d9565 commit 2d4b12b
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 7 deletions.
11 changes: 4 additions & 7 deletions src/ophyd_async/core/_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,20 +449,17 @@ async def observe_value(
"""

q: asyncio.Queue[SignalDatatypeT | Status] = asyncio.Queue()
if timeout is None:
get_value = q.get
else:

async def get_value():
return await asyncio.wait_for(q.get(), timeout)

if done_status is not None:
done_status.add_callback(q.put_nowait)

signal.subscribe_value(q.put_nowait)
try:
while True:
item = await get_value()
# yield here in case something else is filling the queue
# like in test_observe_value_times_out_with_no_external_task()
await asyncio.sleep(0)
item = await asyncio.wait_for(q.get(), timeout)
if done_status and item is done_status:
if exc := done_status.exception():
raise exc
Expand Down
90 changes: 90 additions & 0 deletions tests/core/test_observe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
import time

import pytest

from ophyd_async.core import AsyncStatus, observe_value, soft_signal_r_and_setter


async def test_observe_value_working_correctly():
sig, setter = soft_signal_r_and_setter(float)

async def tick():
for i in range(2):
await asyncio.sleep(0.01)
setter(i + 1)

recv = []
status = AsyncStatus(tick())
async for val in observe_value(sig, done_status=status):
recv.append(val)
assert recv == [0, 1, 2]
await status


async def test_observe_value_times_out():
sig, setter = soft_signal_r_and_setter(float)

async def tick():
for i in range(5):
await asyncio.sleep(0.1)
setter(i + 1)

recv = []

async def watch():
async for val in observe_value(sig):
recv.append(val)

t = asyncio.create_task(tick())
start = time.time()
try:
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(watch(), timeout=0.2)
assert recv == [0, 1]
assert time.time() - start == pytest.approx(0.2, abs=0.05)
finally:
t.cancel()


async def test_observe_value_times_out_with_busy_sleep():
sig, setter = soft_signal_r_and_setter(float)

async def tick():
for i in range(5):
await asyncio.sleep(0.1)
setter(i + 1)

recv = []

async def watch():
async for val in observe_value(sig):
time.sleep(0.15)
recv.append(val)

t = asyncio.create_task(tick())
start = time.time()
try:
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(watch(), timeout=0.2)
assert recv == [0, 1]
assert time.time() - start == pytest.approx(0.3, abs=0.05)
finally:
t.cancel()


async def test_observe_value_times_out_with_no_external_task():
sig, setter = soft_signal_r_and_setter(float)

recv = []

async def watch():
async for val in observe_value(sig):
recv.append(val)
setter(val + 1)

start = time.time()
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(watch(), timeout=0.1)
assert recv
assert time.time() - start == pytest.approx(0.1, abs=0.05)

0 comments on commit 2d4b12b

Please sign in to comment.