Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow the lifetime of the Connection thread to be tied to an event loop #112

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions aiosqlite/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ def __init__(
connector: Callable[[], sqlite3.Connection],
iter_chunk_size: int,
loop: Optional[asyncio.AbstractEventLoop] = None,
parent_loop: Optional[asyncio.AbstractEventLoop] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason or benefit to explicitly passing the parent loop (something that was explicitly deprecated in previous versions), rather than just using get_loop to get the active loop when the connection object is made? Using get_loop instead will prevent errors if the non-active loop is passed in, and also provide more consistent behavior in the run() method.

I think I would also prefer tracking this as self._loop for brevity.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it as an explicit parameter to preserve existing behavior and because get_loop may not immediately run, like if self._tx is never filled with any futures, and it would pick the first-seen event loop in the case of multiple loops, which may not be desired.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If get_loop is called within the _connect method, then there's never a chance that it would get the wrong event loop. I'd overall prefer to have consistent behavior with minimal parameters than have more parameters that result in a higher number of distinct codepaths.

) -> None:
super().__init__()
self._running = True
self._connection: Optional[sqlite3.Connection] = None
self._connector = connector
self._tx: Queue = Queue()
self._iter_chunk_size = iter_chunk_size
self._parent_loop = parent_loop

if loop is not None:
warn(
Expand Down Expand Up @@ -87,7 +89,7 @@ def run(self) -> None:

:meta private:
"""
while True:
while self._parent_loop is None or not self._parent_loop.is_closed():
# Continues running until all queue items are processed,
# even after connection is closed (so we can finalize all
# futures)
Expand Down Expand Up @@ -116,6 +118,19 @@ def set_exception(fut, e):

get_loop(future).call_soon_threadsafe(set_exception, future, e)

# Clean up within this thread only if the parent event loop exits ungracefully
if not self._running or self._connection is None or self._parent_loop is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't obvious as to when it will return or run the cleanup below. Is there a way to always try cleaning up?

return

try:
self._conn.close()
except Exception:
LOG.info("exception occurred while closing connection")
raise
finally:
self._running = False
self._connection = None

async def _execute(self, fn, *args, **kwargs):
"""Queue a function with the given arguments for execution."""
if not self._running or not self._connection:
Expand Down Expand Up @@ -376,6 +391,7 @@ def connect(
*,
iter_chunk_size=64,
loop: Optional[asyncio.AbstractEventLoop] = None,
parent_loop: Optional[asyncio.AbstractEventLoop] = None,
**kwargs: Any
) -> Connection:
"""Create and return a connection proxy to the sqlite database."""
Expand All @@ -396,4 +412,4 @@ def connector() -> sqlite3.Connection:

return sqlite3.connect(loc, **kwargs)

return Connection(connector, iter_chunk_size)
return Connection(connector, iter_chunk_size, parent_loop=parent_loop)
18 changes: 18 additions & 0 deletions aiosqlite/tests/smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import sqlite3
import sys
import time
from pathlib import Path
from sqlite3 import OperationalError
from threading import Thread
Expand Down Expand Up @@ -465,3 +466,20 @@ async def test_backup_py36(self):
) as db2:
with self.assertRaisesRegex(RuntimeError, "backup().+3.7"):
await db1.backup(db2)

async def test_no_close_with_parent_event_loop(self):
def runner():
loop = asyncio.new_event_loop()
db = loop.run_until_complete(aiosqlite.connect(TEST_DB, parent_loop=loop))
loop.close()

# Wait long enough for the queue `get` timeout to elapse
time.sleep(0.2)

# Database has been closed
with self.assertRaises(ValueError):
db.in_transaction
Comment on lines +480 to +481
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe validate db._running also/instead?


thread = Thread(target=runner)
thread.start()
thread.join()