Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into retry-creating-page-o…
Browse files Browse the repository at this point in the history
…n-crashed-browser
  • Loading branch information
elacuesta committed Jul 16, 2024
2 parents b85ccf0 + 5b8cfd7 commit 2546b2b
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.0.39
current_version = 0.0.40
commit = True
tag = True

Expand Down
7 changes: 7 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# scrapy-playwright changelog


### [v0.0.40](https://github.com/scrapy-plugins/scrapy-playwright/releases/tag/v0.0.40) (2024-07-16)

* Enforce asyncio reactor in all platforms (#298)
* Allow multiple handlers in separate thread (#299)


### [v0.0.39](https://github.com/scrapy-plugins/scrapy-playwright/releases/tag/v0.0.39) (2024-07-11)

* Return proper status and headers for downloads (#293)
Expand Down
2 changes: 1 addition & 1 deletion scrapy_playwright/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.39"
__version__ = "0.0.40"
100 changes: 45 additions & 55 deletions scrapy_playwright/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import platform
import threading
from typing import Awaitable, Iterator, Optional, Tuple, Union
from typing import Awaitable, Dict, Iterator, Optional, Tuple, Union

import scrapy
from playwright.async_api import Error, Page, Request, Response
Expand Down Expand Up @@ -103,68 +103,58 @@ async def _get_header_value(
return None


if platform.system() == "Windows":

class _ThreadedLoopAdapter:
"""Utility class to start an asyncio event loop in a new thread and redirect coroutines.
This allows to run Playwright in a different loop than the Scrapy crawler, allowing to
use ProactorEventLoop which is supported by Playwright on Windows.
"""

_loop: asyncio.AbstractEventLoop
_thread: threading.Thread
_coro_queue: asyncio.Queue = asyncio.Queue()
_stop_event: asyncio.Event = asyncio.Event()

@classmethod
async def _handle_coro(cls, coro, future) -> None:
try:
future.set_result(await coro)
except Exception as exc:
future.set_exception(exc)

@classmethod
async def _process_queue(cls) -> None:
while not cls._stop_event.is_set():
coro, future = await cls._coro_queue.get()
asyncio.create_task(cls._handle_coro(coro, future))
cls._coro_queue.task_done()

@classmethod
def _deferred_from_coro(cls, coro) -> Deferred:
future: asyncio.Future = asyncio.Future()
asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, future)), cls._loop)
return scrapy.utils.defer.deferred_from_coro(future)

@classmethod
def start(cls) -> None:
policy = asyncio.WindowsProactorEventLoopPolicy() # type: ignore[attr-defined]
class _ThreadedLoopAdapter:
"""Utility class to start an asyncio event loop in a new thread and redirect coroutines.
This allows to run Playwright in a different loop than the Scrapy crawler, allowing to
use ProactorEventLoop which is supported by Playwright on Windows.
"""

_loop: asyncio.AbstractEventLoop
_thread: threading.Thread
_coro_queue: asyncio.Queue = asyncio.Queue()
_stop_events: Dict[int, asyncio.Event] = {}

@classmethod
async def _handle_coro(cls, coro, future) -> None:
try:
future.set_result(await coro)
except Exception as exc:
future.set_exception(exc)

@classmethod
async def _process_queue(cls) -> None:
while any(not ev.is_set() for ev in cls._stop_events.values()):
coro, future = await cls._coro_queue.get()
asyncio.create_task(cls._handle_coro(coro, future))
cls._coro_queue.task_done()

@classmethod
def _deferred_from_coro(cls, coro) -> Deferred:
future: asyncio.Future = asyncio.Future()
asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, future)), cls._loop)
return scrapy.utils.defer.deferred_from_coro(future)

@classmethod
def start(cls, caller_id: int) -> None:
cls._stop_events[caller_id] = asyncio.Event()
if not getattr(cls, "_loop", None):
policy = asyncio.DefaultEventLoopPolicy()
if platform.system() == "Windows":
policy = asyncio.WindowsProactorEventLoopPolicy() # type: ignore[attr-defined]
cls._loop = policy.new_event_loop()
asyncio.set_event_loop(cls._loop)

if not getattr(cls, "_thread", None):
cls._thread = threading.Thread(target=cls._loop.run_forever, daemon=True)
cls._thread.start()
logger.info("Started loop on separate thread: %s", cls._loop)

asyncio.run_coroutine_threadsafe(cls._process_queue(), cls._loop)

@classmethod
def stop(cls) -> None:
cls._stop_event.set()
@classmethod
def stop(cls, caller_id: int) -> None:
"""Wait until all handlers are closed to stop the event loop and join the thread."""
cls._stop_events[caller_id].set()
if all(ev.is_set() for ev in cls._stop_events.values()):
asyncio.run_coroutine_threadsafe(cls._coro_queue.join(), cls._loop)
cls._loop.call_soon_threadsafe(cls._loop.stop)
cls._thread.join()

_deferred_from_coro = _ThreadedLoopAdapter._deferred_from_coro
else:

class _ThreadedLoopAdapter: # type: ignore[no-redef]
@classmethod
def start(cls) -> None:
pass

@classmethod
def stop(cls) -> None:
pass

_deferred_from_coro = scrapy.utils.defer.deferred_from_coro
25 changes: 18 additions & 7 deletions scrapy_playwright/handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import platform
from contextlib import suppress
from dataclasses import dataclass, field as dataclass_field
from ipaddress import ip_address
Expand Down Expand Up @@ -27,6 +28,7 @@
from scrapy.http.headers import Headers
from scrapy.responsetypes import responsetypes
from scrapy.settings import Settings
from scrapy.utils.defer import deferred_from_coro
from scrapy.utils.misc import load_object
from scrapy.utils.reactor import verify_installed_reactor
from twisted.internet.defer import Deferred, inlineCallbacks
Expand All @@ -35,7 +37,6 @@
from scrapy_playwright.page import PageMethod
from scrapy_playwright._utils import (
_ThreadedLoopAdapter,
_deferred_from_coro,
_encode_body,
_get_float_setting,
_get_header_value,
Expand Down Expand Up @@ -93,6 +94,7 @@ class Config:
navigation_timeout: Optional[float]
restart_disconnected_browser: bool
target_closed_max_retries: int = 3
use_threaded_loop: bool = False

@classmethod
def from_settings(cls, settings: Settings) -> "Config":
Expand All @@ -116,6 +118,8 @@ def from_settings(cls, settings: Settings) -> "Config":
restart_disconnected_browser=settings.getbool(
"PLAYWRIGHT_RESTART_DISCONNECTED_BROWSER", default=True
),
use_threaded_loop=platform.system() == "Windows"
or settings.getbool("_PLAYWRIGHT_THREADED_LOOP", False),
)
cfg.cdp_kwargs.pop("endpoint_url", None)
cfg.connect_kwargs.pop("ws_endpoint", None)
Expand All @@ -132,13 +136,14 @@ class ScrapyPlaywrightDownloadHandler(HTTPDownloadHandler):

def __init__(self, crawler: Crawler) -> None:
super().__init__(settings=crawler.settings, crawler=crawler)
_ThreadedLoopAdapter.start()
verify_installed_reactor("twisted.internet.asyncioreactor.AsyncioSelectorReactor")
crawler.signals.connect(self._engine_started, signals.engine_started)
self.stats = crawler.stats

self.config = Config.from_settings(crawler.settings)

if self.config.use_threaded_loop:
_ThreadedLoopAdapter.start(id(self))

self.browser_launch_lock = asyncio.Lock()
self.context_launch_lock = asyncio.Lock()
self.context_wrappers: Dict[str, BrowserContextWrapper] = {}
Expand All @@ -164,9 +169,14 @@ def __init__(self, crawler: Crawler) -> None:
def from_crawler(cls: Type[PlaywrightHandler], crawler: Crawler) -> PlaywrightHandler:
return cls(crawler)

def _deferred_from_coro(self, coro: Awaitable) -> Deferred:
if self.config.use_threaded_loop:
return _ThreadedLoopAdapter._deferred_from_coro(coro)
return deferred_from_coro(coro)

def _engine_started(self) -> Deferred:
"""Launch the browser. Use the engine_started signal as it supports returning deferreds."""
return _deferred_from_coro(self._launch())
return self._deferred_from_coro(self._launch())

async def _launch(self) -> None:
"""Launch Playwright manager and configured startup context(s)."""
Expand Down Expand Up @@ -338,8 +348,9 @@ def _set_max_concurrent_context_count(self):
def close(self) -> Deferred:
logger.info("Closing download handler")
yield super().close()
yield _deferred_from_coro(self._close())
_ThreadedLoopAdapter.stop()
yield self._deferred_from_coro(self._close())
if self.config.use_threaded_loop:
_ThreadedLoopAdapter.stop(id(self))

async def _close(self) -> None:
with suppress(TargetClosedError):
Expand All @@ -355,7 +366,7 @@ async def _close(self) -> None:

def download_request(self, request: Request, spider: Spider) -> Deferred:
if request.meta.get("playwright"):
return _deferred_from_coro(self._download_request(request, spider))
return self._deferred_from_coro(self._download_request(request, spider))
return super().download_request(request, spider)

async def _download_request(self, request: Request, spider: Spider) -> Response:
Expand Down
5 changes: 3 additions & 2 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ def allow_windows(test_method):

@wraps(test_method)
async def wrapped(self, *args, **kwargs):
_ThreadedLoopAdapter.start()
caller_id = 1234
_ThreadedLoopAdapter.start(caller_id)
coro = test_method(self, *args, **kwargs)
asyncio.run_coroutine_threadsafe(coro=coro, loop=_ThreadedLoopAdapter._loop).result()
_ThreadedLoopAdapter.stop()
_ThreadedLoopAdapter.stop(caller_id)

return wrapped

Expand Down

0 comments on commit 2546b2b

Please sign in to comment.