diff --git a/CHANGES.rst b/CHANGES.rst index 7fe198f..e6ee3c3 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -15,6 +15,7 @@ Major Changes - build: remove no longer needed 'setup.py' file - build: add missing wheel dependency to dev installs - build: add missing setuptools dependency to dev installs +- fix: avoid deprecated 'fork', switch to queues & 'spawn' Minor Changes diff --git a/SIGNATURE.txt b/SIGNATURE.txt index abb0286..d9b8ea2 100644 --- a/SIGNATURE.txt +++ b/SIGNATURE.txt @@ -1,5 +1,5 @@ { - "checksum": "b526c11bf48b18a7ba7ce1af3175106966b1378e0444e7865488882be6bbc8353f17415853ab6b426c50814106a54868", + "checksum": "cdb1e646022adbadb8e20c657538efaade767c4321a4f28dd364b86924d4cc3e656cfb40f6cfceba3aca0ca8aad9d073", "checksums": { ".github/ISSUE_TEMPLATE/bug-report.yml": "35b0ab3c57c6f08c6e08f5432217e2f3a5c2ffb7cbf102e3c1b719324b80bdc2797ba173cba97adb66f4b8e8dc3a5964", ".github/workflows/linux-pypi-release-tests.yml": "655abac0e5be130704567909562ad95001c6cf5f07a462b0fb1f7fea0d3d41b510b916a383ce56620da3073594e9a2e3", @@ -12,7 +12,7 @@ ".github/workflows/windows-pypi-release.yml": "bd51f4dd0209a1f61cb94d7676b3c8db37a5e2bcc3def58d299ef4da831af19b6de59b847199f092ff66631f983bfd72", ".github/workflows/windows-python-app.yml": "1d0f4e525bfefac4ece44081ecae7faaad0ac4044eb94b9823a9a9305127e5abe3dd6b96f602ebf3143b0baf474123e1", ".gitignore": "12deb1309e7f3d7c571d7831e99afc0e4f9fbf9c495c91574ffeb4b873d97bb83cb4d81941a7df8f19ec2293ca276ea0", - "CHANGES.rst": "7af16a401906120f46f53c9521e9a25d1f006c578ac4cc1278396b3c747e7c667c915ff777a47f9f5bcb8b25149f6708", + "CHANGES.rst": "a0af46b8426f1c2c7c5551c8489131d791f064c598923c45a2a2657637ebbbfd51fb6a188d17b55009d919ec638d9687", "CODE_OF_CONDUCT.md": "1999f7d9791b3e79aac666c7286c71bcf1910a8b3adfb6699d29981c146b85479eece9e9ba6d877b888f6b7082707c34", "CONTRIBUTING.md": "6bd90aab16a531bf13652544b22400532cb13f9d89386955065f398a4fcfb065f60bbee2e28d9ae59146b70878236079", "FAQ.rst": "cad5af23d070117d3f1398ab979048ead7884b546ea9f2fe36f0f67b337ee93675a64ab84fe2a52f62dc66dd82d7c2c4", @@ -37,7 +37,7 @@ "aiootp/_permutations/affine_xor_chain_config.py": "aacffc2b78329686dfd1d0fbf968ad059301e9de37ff5a67fa233096a6b43986124c8b67f93029986426929cdb253e26", "aiootp/_permutations/fast_affine_xor_chain.py": "0417a2d74f9c8830ebd3aa436d712991b269add50aad1fcd5e61f4931076f8c130727cb90ca58e298246658be9f68cfd", "aiootp/_typing/__init__.py": "9c9eaacd57f485bfb2cf5a7c14d48508ffa7cecad729fbba487e8098bdfd026f232caf012449904f9f003d0f5937c2d1", - "aiootp/_typing/_asynchs.py": "ebcf51315a722665e4dd9e98344746f323445fbbba6e2552e56c14c63f341cff1d011bd7ba8a22d872758ae4e7662c35", + "aiootp/_typing/_asynchs.py": "b0a577a6c86fbcffe3a1bd71ef95db5222eb9c1f1ecef988f92b5b26b93f0985f782e60e5c6e5d1db4a34fa5490eab54", "aiootp/_typing/_ciphers.py": "62936c78ffcacb6aafb61f5ebdd37a27563a57ff0279e22e2ee06841d66d82d78889d56e190225abb4799d39aa5426b0", "aiootp/_typing/_commons.py": "1510ce2e338a77d986c84eaaee74c2049859569ddc861809cb21e95f4c595a415c571a5057780632d611bf22cd730f2b", "aiootp/_typing/_databases.py": "fde4ad6bfbc6f7946232ab4e4da695e04a3415ae3cf028a54e2dbcef07e96e2d70db39a612921f8b0d540fcb44b378fc", @@ -51,10 +51,10 @@ "aiootp/asynchs/__init__.py": "ebb74e9e39055e449d1662674a8c324bb7da156eddf4363a60d20c7de2890fcc46ffee583d36632f2283b3760393b2f4", "aiootp/asynchs/aos.py": "3a7c750b3196ffeaa2359c1b683e2441deca1d4c1c5e914e73dba0ab4e1f4cc802ccc83e94e6b6fb3780f8f80581ed53", "aiootp/asynchs/clocks.py": "b9a8924ecd9468a992749917acbe49d382ffa897505f455883dedb530910d6e2eafd5e88e3bc7726ad8d227ed42ccafc", - "aiootp/asynchs/concurrency_interface.py": "302c3051b2c0f627f8bb0e063347d9bf9a0e5c94e168e2a98b914412df2cb2bec83b19e3122f9af63261b97a535f3258", + "aiootp/asynchs/concurrency_interface.py": "10db592cfe2a5b6abbf19e4683626262ab3dc52901d8d018d59aca9a0ecb51600219a5a4bb2e1e9b9976208b98f8210a", "aiootp/asynchs/loops.py": "9089cd86b7ac1abc438ffda63061fc2120d4da87bbb76bf944384b4bdd178399f50962627a4a16b5af29548d2c6e7f00", - "aiootp/asynchs/processes.py": "3fd463fe81d0a994405878a90d5f8383898e015e30257340b650962aa129256b162bdb51822c3b9cb6cbdc58470cb236", - "aiootp/asynchs/threads.py": "9dd750dce94cc814499fd233581e8b3dc263b34a7b24cda1ecf8c79d0db653a270d7da2decc6fcac76ec9cfef45574b0", + "aiootp/asynchs/processes.py": "d705e06f540c75561739f0293c9fc12954303cb5c1045bab2f779d8e7fa316b72f824911aeb38396f9f16ab6a2929a05", + "aiootp/asynchs/threads.py": "fd116e050280c6f6672ad7c55cdd38988d0578bd7f58d73aa94c6bf9355ffd6e89903730c2cf1d20146e2e2de5e2125c", "aiootp/ciphers/__init__.py": "25b229c3fd4c1a8b883f168980656d8c72915d0f727a418eba47685edc85873854957cf712965fdf53acb92387dab55e", "aiootp/ciphers/chunky_2048.py": "0c670a60b1763cbcaa27d8da3fab028c7719f885d86df72428cd73967480fe94622b36d674389a32e3cd481da414b322", "aiootp/ciphers/chunky_2048_config.py": "82eb76774817828ffbabd8b4d28945b5329c99a7f795913db7888df468905357bc8f99596a4e936a746200ebc4f3c01d", @@ -117,7 +117,7 @@ "aiootp/keygens/passcrypt/sessions_manager.py": "9c303ea4fc307670f9a8a6e2df3a6b248f37acd1e1035f65127bff27e60b8076138d3ecf9e451fa01a1d926ec88243a7", "aiootp/randoms/__init__.py": "4f09fbe3c1968098ac73c25fe1b414bef61e8cdb1edd6c639db45be10fb8e5c867f0cfe609412f0d5a5a429519cb335d", "aiootp/randoms/_early_salts.py": "0c0cc54d96c1e38196c5e2cb802a7297dbf414cafabe7efb30929081c006bb9a5b05af2e2a2733b160dc766800cf43a1", - "aiootp/randoms/entropy_daemon.py": "01d9f08c9f719069e7ea6082c415a5c659c5ed1bf5fec7f4ff51cad459a26050b9a5ab48760c99d1b9f5c2ed9876957a", + "aiootp/randoms/entropy_daemon.py": "44fc8b9c77b0274630b1ed1443c9acbf54892c598c9c6b6ae1630813b949bffe8f3956838c2c89a5e8220f5da9feaa70", "aiootp/randoms/ids/__init__.py": "2b9a67b2450cc9d28920ed09e3decf3a2a27435c37699e3a0d3f984e1b1ed16a024f677c9daf9925728b15be7d041831", "aiootp/randoms/ids/guid.py": "b113604765461f4b24f0e21e821b43fca8c06c61a48e1cdfc02ce675f9f64c6745d13d5224e574f859cb5d86baca28e3", "aiootp/randoms/ids/guid_config.py": "d31d57748d912a60e6feebf8fb28fa1a7c0298c3a42969c239f80d3470c32c170c19969a192abe0dddfb418496eae836", @@ -150,7 +150,7 @@ "tests/test_PackageSigner.py": "a3a8e3e0db5cc1438ab00529a24b8f406deddf78d7c6c8457c1020f068e3486869b04aac76550004c14156c04f0d0911", "tests/test_Padding.py": "8f2c273e441d901131adcb1c2de2473bad3a6fc1d5c1798aa5a6b5ad273b78ffb41bb1d1d49699850ef8c790f8d8c155", "tests/test_Passcrypt.py": "4a4bfbab92e5c149326cf3aa68c05b36303098618f0530a576759bc3a58968884fdf5ec1852587a12397b05a9233bc24", - "tests/test_Processes_Threads.py": "5ff872d50a9c6ff8ac52b03a0cf1d68fc172a48039a874378268bbb12b5287f049ef75fa71a8944c4b6135f21da9bcd2", + "tests/test_Processes_Threads.py": "3872d7e03af57cf4c3879cdacc1bfddf5875d831f58003a7e7cad1f59a6d4ac81cc02d7ac87f3ab13db07dd69a300ecb", "tests/test_RawGUID.py": "cffadd009f1a9d8ad3159640a7a3241fe5672729eb8eb4bfc8903dbe0d00ba2fbc37738f649946ed667f7793c2509604", "tests/test_SequenceID.py": "6747f3baaf4d5d4ab33e0252ee304eac7e4f02dc404f29eadfb1cfda3f9df71e7a0324fe3e5656348d1ed57d6451666f", "tests/test_StreamHMAC.py": "1d988fda201211cd00441f5290ffac36185cfda4a87f0bc07f9de2c0222c9be426f31f44cd645e29ace704f3ce0c63bf", @@ -180,7 +180,7 @@ "tests/test_randoms.py": "056fc3e65dbad03d0bb9044080ea821dfa7f342712fe8ee15335bd3e0e57d21302385c255436d8bf4c442991321e128b", "tests/test_time_to_live.py": "ba902ba10362643fba954b16cf4fafc22e9c6e4b42f8bee2af40f8beb41f770d99676a5c9e841ce4c3144781495c621f", "tests/test_typing.py": "f4e75dabbac3d653a02bf9e22b842bd9d26b05b027767e0fecfc03c00ec3838cb263f608099bc8ddf78c06a05962e33d", - "tests/test_typing_protocols.py": "c32de109dd5d309282c2b73e339d691c7273421cfd7f9ae6031d4398603f0844242b62250ce5202baf86eaaaaf204216" + "tests/test_typing_protocols.py": "6be6888113eda63836a8c305d60f9a438b58ea85d960daa12b228f08a2f49aef97fc8a4aaa23fafffb30c041bcd72f9b" }, "public_credentials": { "rmlibre_git_commit_signing_key": "-----BEGIN PGP PUBLIC KEY BLOCK-----\n\nmDMEZoDhpxYJKwYBBAHaRw8BAQdA4ukwNFuUROjDZF9uDWZ4mBbZhNIWhddUmNbZ\nd/XQExu0PHJpY2NoaSBtYWNoYWRvIChnaXQgY29tbWl0IHNpZ25pbmcga2V5KSA8\ncm1saWJyZUByaXNldXAubmV0PoiWBBMWCAA+FiEESjfOAk3bwKm2N5Ni/ETXxWjh\ninAFAmaA4acCGwMFCQWjmoAFCwkIBwIGFQoJCAsCBBYCAwECHgECF4AACgkQ/ETX\nxWjhinC0XgEArr+a+kCA9o53Mdj+qU5uIQdKpwwnSaJXpX6sOOcKHrYA/3egnXeU\nIlDQV8IYusMJXiaTFzf7eC1UUv2jJ6zqNhMC\n=inI6\n-----END PGP PUBLIC KEY BLOCK-----", @@ -188,8 +188,8 @@ }, "scope": { "author": "rmlibre@riseup.net", - "build_number": "1", - "date": 20016, + "build_number": "2", + "date": 20017, "description": "a high-level async cryptographic anonymity library to scale, simplify, & automate privacy best practices for secure data & identity processing, communication, & storage.", "git_branch": "main", "license": "AGPL-3.0-only", @@ -197,5 +197,5 @@ "version": "0.23.16" }, "signing_key": "70d1740f2a439da98243c43a4d7ef1cf993b87a75f3bb0851ae79de675af5b3b", - "signature": "2ac02f6c0da7476ee94d7eb99074cbfb50b9a2507c884bd1313d0f6f1d9ed7af3188b4bc45e136925ffd40c270db138d8eec1489bb5a6dc299898e95fe22f604" + "signature": "8b5432258e422b21af64b97759fa51d9a48a0acc3fece7dcc0abe327b3f75c45216d302a8c0cecac7b3df7e8cf907abdfc7da3fc63788e3cdd44864dcb914e05" } \ No newline at end of file diff --git a/aiootp/_typing/_asynchs.py b/aiootp/_typing/_asynchs.py index f3765cf..0ca885b 100644 --- a/aiootp/_typing/_asynchs.py +++ b/aiootp/_typing/_asynchs.py @@ -16,7 +16,13 @@ the `asynchs` subpackage. """ -__all__ = ["AsyncOrSyncIterable", "ClockType", "Future", "PoolExecutorType"] +__all__ = [ + "AsyncOrSyncIterable", + "ClockType", + "Future", + "PoolExecutorType", + "QueueType", +] from concurrent.futures._base import Future @@ -65,6 +71,29 @@ def submit( pass # pragma: no cover +@t.runtime_checkable +class QueueType(t.Protocol): + def get(self, block: bool, timeout: t.Optional[int]) -> t.Any: + pass # pragma: no cover + + def get_nowait(self) -> t.Any: + pass # pragma: no cover + + def put( + self, item: t.Any, block: bool, timeout: t.Optional[int] + ) -> None: + pass # pragma: no cover + + def put_nowait(self, item: t.Any) -> None: + pass # pragma: no cover + + def empty(self) -> bool: + pass # pragma: no cover + + def full(self) -> bool: + pass # pragma: no cover + + @t.runtime_checkable class ClockType(t.Protocol): async def atime(self) -> int: @@ -95,6 +124,7 @@ def test_timestamp( ClockType=t.add_type(ClockType), Future=t.add_type(Future), PoolExecutorType=t.add_type(PoolExecutorType), + QueueType=t.add_type(QueueType), __all__=__all__, __doc__=__doc__, __file__=__file__, diff --git a/aiootp/asynchs/concurrency_interface.py b/aiootp/asynchs/concurrency_interface.py index ac210ad..bd8c6c3 100644 --- a/aiootp/asynchs/concurrency_interface.py +++ b/aiootp/asynchs/concurrency_interface.py @@ -184,18 +184,27 @@ class ConcurrencyInterface: __slots__ = () - _Manager: type - _default_probe_delay: t.PositiveRealNumber _pool: t.PoolExecutorType _type: type BrokenPool: type + get_id: t.Callable[[], int] + + @classmethod + async def aget_id(cls, /) -> int: + """ + Retrieves either the current calling environment's process or + thread ID depending on the subclass being `Processes` or `Threads`. + """ + await asleep() + return cls.get_id() + @staticmethod def _arun_func( func: t.Callable[..., t.Any], - state: t.Sequence[t.Any], + queue: t.QueueType, /, *args: t.Any, **kwargs: t.Any, @@ -203,18 +212,18 @@ def _arun_func( """ Used by the class to retrieve return values from an async or sync `func` run in a new process / thread by storing the result - in a shared `state` container. + in a shared `queue` container. """ if is_async_function(func): run = new_event_loop().run_until_complete - state.append(run(func(*args, **kwargs))) + queue.put_nowait(run(func(*args, **kwargs))) else: - state.append(func(*args, **kwargs)) + queue.put_nowait(func(*args, **kwargs)) @staticmethod def _run_func( func: t.Callable[..., t.Any], - state: t.Sequence[t.Any], + queue: t.QueueType, /, *args: t.Any, **kwargs: t.Any, @@ -222,9 +231,9 @@ def _run_func( """ Used by the class to retrieve return values from a sync `func` run in a new process / thread by storing the result in a shared - `state` container. + `queue` container. """ - state.append(func(*args, **kwargs)) + queue.put_nowait(func(*args, **kwargs)) @classmethod async def anew( @@ -242,17 +251,17 @@ async def anew( coexist with asynchronous code. """ delay = process_probe_delay(cls, probe_delay) - state = cls._Manager().list() + queue = cls._get_queue() task = cls._type( target=cls._arun_func, - args=(func, state, *args), + args=(func, queue, *args), kwargs=kwargs, ) task.start() while task.is_alive(): await asleep(delay) task.join() - return state.pop() + return queue.get_nowait() @classmethod def new( @@ -270,17 +279,17 @@ def new( asynchronous code. """ delay = process_probe_delay(cls, probe_delay) - state = cls._Manager().list() + queue = cls._get_queue() task = cls._type( target=cls._run_func, - args=(func, state, *args), + args=(func, queue, *args), kwargs=kwargs, ) task.start() while task.is_alive(): sleep(delay) task.join() - return state.pop() + return queue.get_nowait() @staticmethod def _get_result( @@ -367,7 +376,7 @@ async def agather( /, *functions: t.Callable[..., t.Any], args: t.Iterable[t.Any] = (), - kwargs: t.Mapping[t.Hashable, t.Any] = {}, + kwargs: t.Mapping[str, t.Any] = {}, ) -> t.List[t.Any]: """ Sumbits all of the async or synchronous `functions` to the @@ -389,7 +398,7 @@ def gather( /, *functions: t.Callable[..., t.Any], args: t.Iterable[t.Any] = (), - kwargs: t.Mapping[t.Hashable, t.Any] = {}, + kwargs: t.Mapping[str, t.Any] = {}, ) -> t.List[t.Any]: """ Sumbits all the `functions` to the `Processes._pool` or @@ -402,15 +411,6 @@ def gather( for task in tasks: task.cancel() - @classmethod - def reset_pool(cls, /) -> None: - """ - When a process or thread pool is broken by an abruptly exited, - this method can be called to reset the class' pool object with - a new instance. - """ - cls._pool = cls._pool.__class__() - module_api = dict( ConcurrencyGuard=t.add_type(ConcurrencyGuard), diff --git a/aiootp/asynchs/processes.py b/aiootp/asynchs/processes.py index bca4fa6..a2bc956 100644 --- a/aiootp/asynchs/processes.py +++ b/aiootp/asynchs/processes.py @@ -19,6 +19,7 @@ import multiprocessing +from multiprocessing.context import SpawnContext from os import getpid as get_process_id from concurrent.futures import process from concurrent.futures import ProcessPoolExecutor @@ -37,14 +38,32 @@ class Processes(ConcurrencyInterface): __slots__ = () - _Manager: type = multiprocessing.Manager - _default_probe_delay: t.PositiveRealNumber = 0.005 - _pool: t.PoolExecutorType = ProcessPoolExecutor() - _type: type = multiprocessing.Process + _context: SpawnContext = multiprocessing.get_context("spawn") + _pool: t.PoolExecutorType = ProcessPoolExecutor(mp_context=_context) + _type: type = _context.Process BrokenPool: type = process.BrokenProcessPool + get_id: t.Callable[[], int] = get_process_id + + @classmethod + def _get_queue(cls, /, maxsize: int = 1) -> t.QueueType: + """ + Returns a queue object to retrieve values from spawned workers + with the class' default multiprocessing context. + """ + return cls._context.Queue(maxsize=maxsize) + + @classmethod + def reset_pool(cls, /) -> None: + """ + When a process pool is broken by being abruptly exited, this + method can be called to reset the class' pool object with a new + instance with its default multiprocessing context. + """ + cls._pool = cls._pool.__class__(mp_context=cls._context) + module_api = dict( Processes=t.add_type(Processes), diff --git a/aiootp/asynchs/threads.py b/aiootp/asynchs/threads.py index 7655c22..050e8ad 100644 --- a/aiootp/asynchs/threads.py +++ b/aiootp/asynchs/threads.py @@ -18,8 +18,8 @@ __all__ = ["Threads", "get_thread_id"] +import queue from threading import Thread -from collections import deque from concurrent.futures import thread from _thread import get_ident as get_thread_id from concurrent.futures import ThreadPoolExecutor @@ -38,24 +38,30 @@ class Threads(ConcurrencyInterface): __slots__ = () - class _Manager: - """ - This type is for parity with the `Processes` class' use of the - `multiprocessing.Manager`. It returns an atomic list-like - container so state can be passed around from spawned threads to - calling code. - """ - - @staticmethod - def list() -> t.SupportsAppendPop: - return deque(maxlen=1) - _default_probe_delay: t.PositiveRealNumber = 0.001 _pool: t.PoolExecutorType = ThreadPoolExecutor() _type: type = Thread BrokenPool: type = thread.BrokenThreadPool + get_id: t.Callable[[], int] = get_thread_id + + @classmethod + def _get_queue(cls, /, maxsize: int = 1) -> t.QueueType: + """ + Returns a queue object to retrieve values from spawned workers. + """ + return queue.Queue(maxsize=maxsize) + + @classmethod + def reset_pool(cls, /) -> None: + """ + When a thread pool is broken by being abruptly exited, this + method can be called to reset the class' pool object with a new + instance. + """ + cls._pool = cls._pool.__class__() + module_api = dict( Threads=t.add_type(Threads), diff --git a/aiootp/randoms/entropy_daemon.py b/aiootp/randoms/entropy_daemon.py index 6a9a263..646fe36 100644 --- a/aiootp/randoms/entropy_daemon.py +++ b/aiootp/randoms/entropy_daemon.py @@ -149,10 +149,10 @@ def start(self) -> t.Self: This supports the package by asynchronously & continuously seeding into & extracting new entropy from its entropy pools. """ - state = Threads._Manager().list() + queue = Threads._get_queue() self._daemon = Threads._type( target=Threads._arun_func, - args=(self._araw_loop, state), + args=(self._araw_loop, queue), ) self._daemon.daemon = True self._daemon.start() diff --git a/tests/test_Processes_Threads.py b/tests/test_Processes_Threads.py index 97dade7..bf374cc 100644 --- a/tests/test_Processes_Threads.py +++ b/tests/test_Processes_Threads.py @@ -11,116 +11,126 @@ # -import platform - from conftest import * -class BasicTestSuite: - _type: type - _id_name: str - - system = platform.system() - - async def aget_ids(self) -> t.Dict[str, int]: - from aiootp.asynchs import asleep - from aiootp.asynchs.threads import get_thread_id - from aiootp.asynchs.processes import get_process_id - - await asleep() - return dict(process_id=get_process_id(), thread_id=get_thread_id()) - - def get_ids(self) -> t.Dict[str, int]: - from aiootp.asynchs.threads import get_thread_id - from aiootp.asynchs.processes import get_process_id - - return dict(process_id=get_process_id(), thread_id=get_thread_id()) - - async def test_anew(self) -> None: - name = self._id_name - - is_non_linux_multiprocessing_issue = lambda _: ( - (self._type is Processes) and (self.system != "Linux") - ) - with Ignore( - IndexError, if_except=is_non_linux_multiprocessing_issue - ): - result = (await self._type.anew(self.aget_ids))[name] - assert result > 0 - assert result.__class__ is int - assert result != (await self.aget_ids())[name] - assert result != self.get_ids()[name] - - result = (await self._type.anew(self.get_ids))[name] - assert result > 0 - assert result.__class__ is int - assert result != self.get_ids()[name] - - def test_new(self) -> None: - name = self._id_name - - is_non_linux_multiprocessing_issue = lambda _: ( - (self._type is Processes) and (self.system != "Linux") - ) - with Ignore( - IndexError, if_except=is_non_linux_multiprocessing_issue - ): - result = self._type.new(self.get_ids)[name] - assert result > 0 - assert result.__class__ is int - assert result != self.get_ids()[name] - - async def test_asubmit(self) -> None: - name = self._id_name - self._type.reset_pool() - - fut = await self._type.asubmit(self.aget_ids) - result = (await fut.aresult())[name] - assert result == fut.result()[name] - assert result > 0 - assert result.__class__ is int - assert result != (await self.aget_ids())[name] - assert result != self.get_ids()[name] - - fut = await self._type.asubmit(self.get_ids) - result = (await fut.aresult())[name] - assert result == fut.result()[name] - assert result > 0 - assert result.__class__ is int - assert result != (await self.aget_ids())[name] - assert result != self.get_ids()[name] - - def test_submit(self) -> None: - name = self._id_name - self._type.reset_pool() - - fut = self._type.submit(self.get_ids) - result = fut.result()[name] - assert result > 0 - assert result.__class__ is int - assert result != self.get_ids()[name] - - async def test_probe_delay_must_be_positive(self) -> None: - problem = ( # fmt: skip - "A non-positive probe_delay was allowed." - ) - with Ignore(ValueError, if_else=violation(problem)): - await self._type.anew(acsprng, probe_delay=-1) - - with Ignore(ValueError, if_else=violation(problem)): - self._type.new(csprng, probe_delay=-1) - - -if platform.system() != "Windows": - - class TestProcesses(BasicTestSuite): - _type: type = Processes - _id_name: str = "process_id" - - -class TestThreads(BasicTestSuite): - _type: type = Threads - _id_name: str = "thread_id" +INTERFACES = (Processes, Threads) + + +@pytest.mark.parametrize("interface", INTERFACES) +async def test_anew_workers_have_distinct_ids_from_their_caller( + interface: t.ConcurrencyInterface, +) -> None: + amethod = interface.aget_id + method = interface.get_id + + result = await interface.anew(amethod) + acaller_id = await amethod() + caller_id = method() + assert result > 0 + assert result.__class__ is int + assert result != acaller_id + assert result != caller_id + assert acaller_id == caller_id + + result = await interface.anew(method) + acaller_id = await amethod() + caller_id = method() + assert result > 0 + assert result.__class__ is int + assert result != acaller_id + assert result != caller_id + assert acaller_id == caller_id + + +@pytest.mark.parametrize("interface", INTERFACES) +async def test_new_workers_have_distinct_ids_from_their_caller( + interface: t.ConcurrencyInterface, +) -> None: + amethod = interface.aget_id + method = interface.get_id + + result = interface.new(method) + acaller_id = await amethod() + caller_id = method() + assert result > 0 + assert result.__class__ is int + assert result != acaller_id + assert result != caller_id + assert acaller_id == caller_id + + +@pytest.mark.parametrize("interface", INTERFACES) +async def test_asubmit_workers_have_distinct_ids_from_their_caller( + interface: t.ConcurrencyInterface, +) -> None: + interface.reset_pool() + amethod = interface.aget_id + method = interface.get_id + + fut = await interface.asubmit(amethod) + result = await fut.aresult() + acaller_id = await amethod() + caller_id = method() + assert result == fut.result() + assert result > 0 + assert result.__class__ is int + assert result != acaller_id + assert result != caller_id + assert acaller_id == caller_id + + fut = await interface.asubmit(method) + result = await fut.aresult() + acaller_id = await amethod() + caller_id = method() + assert result == fut.result() + assert result > 0 + assert result.__class__ is int + assert result != acaller_id + assert result != caller_id + assert acaller_id == caller_id + + +@pytest.mark.parametrize("interface", INTERFACES) +async def test_submit_workers_have_distinct_ids_from_their_caller( + interface: t.ConcurrencyInterface, +) -> None: + interface.reset_pool() + amethod = interface.aget_id + method = interface.get_id + + fut = interface.submit(method) + result = fut.result() + acaller_id = await amethod() + caller_id = method() + assert result > 0 + assert result.__class__ is int + assert result != acaller_id + assert result != caller_id + assert acaller_id == caller_id + + +@pytest.mark.parametrize("interface", INTERFACES) +async def test_probe_delay_must_be_positive( + interface: t.ConcurrencyInterface, +) -> None: + amethod = interface.aget_id + method = interface.get_id + + problem = ( # fmt: skip + "A non-positive probe_delay was allowed." + ) + with Ignore(ValueError, if_else=violation(problem)): + await interface.anew(amethod, probe_delay=-1) + + with Ignore(ValueError, if_else=violation(problem)): + interface.new(method, probe_delay=-1) + + with Ignore(ValueError, if_else=violation(problem)): + await interface.asubmit(amethod, probe_delay=-1) + + with Ignore(ValueError, if_else=violation(problem)): + interface.submit(method, probe_delay=-1) __all__ = sorted({n for n in globals() if n.lower().startswith("test")}) diff --git a/tests/test_typing_protocols.py b/tests/test_typing_protocols.py index b24fed4..4216caf 100644 --- a/tests/test_typing_protocols.py +++ b/tests/test_typing_protocols.py @@ -155,10 +155,16 @@ class TestPermutationTypes(ProtocolSubTypeTests): ] +class TestQueueTypes(ProtocolSubTypeTests): + protocol = t.QueueType + instances_tested = [t.Processes._get_queue(), t.Threads._get_queue()] + types_tested = [obj.__class__ for obj in instances_tested] + + class TestPoolExecutorTypes(ProtocolSubTypeTests): protocol = t.PoolExecutorType - types_tested = [t.Processes._pool.__class__, t.Threads._pool.__class__] instances_tested = [t.Processes._pool, t.Threads._pool] + types_tested = [obj.__class__ for obj in instances_tested] class TestAsyncDatabaseType(ProtocolSubTypeTests):