Skip to content

Commit

Permalink
v0.23.16: merge: avoid deprecated 'fork', switch to queues & 'spawn'
Browse files Browse the repository at this point in the history
  • Loading branch information
rmlibre committed Oct 21, 2024
2 parents 85b0fa6 + 53776ca commit 2991c59
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 166 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Major Changes
- build: remove no longer needed 'setup.py' file
- build: add missing wheel dependency to dev installs
- build: add missing setuptools dependency to dev installs
- fix: avoid deprecated 'fork', switch to queues & 'spawn'


Minor Changes
Expand Down
24 changes: 12 additions & 12 deletions SIGNATURE.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "b526c11bf48b18a7ba7ce1af3175106966b1378e0444e7865488882be6bbc8353f17415853ab6b426c50814106a54868",
"checksum": "cdb1e646022adbadb8e20c657538efaade767c4321a4f28dd364b86924d4cc3e656cfb40f6cfceba3aca0ca8aad9d073",
"checksums": {
".github/ISSUE_TEMPLATE/bug-report.yml": "35b0ab3c57c6f08c6e08f5432217e2f3a5c2ffb7cbf102e3c1b719324b80bdc2797ba173cba97adb66f4b8e8dc3a5964",
".github/workflows/linux-pypi-release-tests.yml": "655abac0e5be130704567909562ad95001c6cf5f07a462b0fb1f7fea0d3d41b510b916a383ce56620da3073594e9a2e3",
Expand All @@ -12,7 +12,7 @@
".github/workflows/windows-pypi-release.yml": "bd51f4dd0209a1f61cb94d7676b3c8db37a5e2bcc3def58d299ef4da831af19b6de59b847199f092ff66631f983bfd72",
".github/workflows/windows-python-app.yml": "1d0f4e525bfefac4ece44081ecae7faaad0ac4044eb94b9823a9a9305127e5abe3dd6b96f602ebf3143b0baf474123e1",
".gitignore": "12deb1309e7f3d7c571d7831e99afc0e4f9fbf9c495c91574ffeb4b873d97bb83cb4d81941a7df8f19ec2293ca276ea0",
"CHANGES.rst": "7af16a401906120f46f53c9521e9a25d1f006c578ac4cc1278396b3c747e7c667c915ff777a47f9f5bcb8b25149f6708",
"CHANGES.rst": "a0af46b8426f1c2c7c5551c8489131d791f064c598923c45a2a2657637ebbbfd51fb6a188d17b55009d919ec638d9687",
"CODE_OF_CONDUCT.md": "1999f7d9791b3e79aac666c7286c71bcf1910a8b3adfb6699d29981c146b85479eece9e9ba6d877b888f6b7082707c34",
"CONTRIBUTING.md": "6bd90aab16a531bf13652544b22400532cb13f9d89386955065f398a4fcfb065f60bbee2e28d9ae59146b70878236079",
"FAQ.rst": "cad5af23d070117d3f1398ab979048ead7884b546ea9f2fe36f0f67b337ee93675a64ab84fe2a52f62dc66dd82d7c2c4",
Expand All @@ -37,7 +37,7 @@
"aiootp/_permutations/affine_xor_chain_config.py": "aacffc2b78329686dfd1d0fbf968ad059301e9de37ff5a67fa233096a6b43986124c8b67f93029986426929cdb253e26",
"aiootp/_permutations/fast_affine_xor_chain.py": "0417a2d74f9c8830ebd3aa436d712991b269add50aad1fcd5e61f4931076f8c130727cb90ca58e298246658be9f68cfd",
"aiootp/_typing/__init__.py": "9c9eaacd57f485bfb2cf5a7c14d48508ffa7cecad729fbba487e8098bdfd026f232caf012449904f9f003d0f5937c2d1",
"aiootp/_typing/_asynchs.py": "ebcf51315a722665e4dd9e98344746f323445fbbba6e2552e56c14c63f341cff1d011bd7ba8a22d872758ae4e7662c35",
"aiootp/_typing/_asynchs.py": "b0a577a6c86fbcffe3a1bd71ef95db5222eb9c1f1ecef988f92b5b26b93f0985f782e60e5c6e5d1db4a34fa5490eab54",
"aiootp/_typing/_ciphers.py": "62936c78ffcacb6aafb61f5ebdd37a27563a57ff0279e22e2ee06841d66d82d78889d56e190225abb4799d39aa5426b0",
"aiootp/_typing/_commons.py": "1510ce2e338a77d986c84eaaee74c2049859569ddc861809cb21e95f4c595a415c571a5057780632d611bf22cd730f2b",
"aiootp/_typing/_databases.py": "fde4ad6bfbc6f7946232ab4e4da695e04a3415ae3cf028a54e2dbcef07e96e2d70db39a612921f8b0d540fcb44b378fc",
Expand All @@ -51,10 +51,10 @@
"aiootp/asynchs/__init__.py": "ebb74e9e39055e449d1662674a8c324bb7da156eddf4363a60d20c7de2890fcc46ffee583d36632f2283b3760393b2f4",
"aiootp/asynchs/aos.py": "3a7c750b3196ffeaa2359c1b683e2441deca1d4c1c5e914e73dba0ab4e1f4cc802ccc83e94e6b6fb3780f8f80581ed53",
"aiootp/asynchs/clocks.py": "b9a8924ecd9468a992749917acbe49d382ffa897505f455883dedb530910d6e2eafd5e88e3bc7726ad8d227ed42ccafc",
"aiootp/asynchs/concurrency_interface.py": "302c3051b2c0f627f8bb0e063347d9bf9a0e5c94e168e2a98b914412df2cb2bec83b19e3122f9af63261b97a535f3258",
"aiootp/asynchs/concurrency_interface.py": "10db592cfe2a5b6abbf19e4683626262ab3dc52901d8d018d59aca9a0ecb51600219a5a4bb2e1e9b9976208b98f8210a",
"aiootp/asynchs/loops.py": "9089cd86b7ac1abc438ffda63061fc2120d4da87bbb76bf944384b4bdd178399f50962627a4a16b5af29548d2c6e7f00",
"aiootp/asynchs/processes.py": "3fd463fe81d0a994405878a90d5f8383898e015e30257340b650962aa129256b162bdb51822c3b9cb6cbdc58470cb236",
"aiootp/asynchs/threads.py": "9dd750dce94cc814499fd233581e8b3dc263b34a7b24cda1ecf8c79d0db653a270d7da2decc6fcac76ec9cfef45574b0",
"aiootp/asynchs/processes.py": "d705e06f540c75561739f0293c9fc12954303cb5c1045bab2f779d8e7fa316b72f824911aeb38396f9f16ab6a2929a05",
"aiootp/asynchs/threads.py": "fd116e050280c6f6672ad7c55cdd38988d0578bd7f58d73aa94c6bf9355ffd6e89903730c2cf1d20146e2e2de5e2125c",
"aiootp/ciphers/__init__.py": "25b229c3fd4c1a8b883f168980656d8c72915d0f727a418eba47685edc85873854957cf712965fdf53acb92387dab55e",
"aiootp/ciphers/chunky_2048.py": "0c670a60b1763cbcaa27d8da3fab028c7719f885d86df72428cd73967480fe94622b36d674389a32e3cd481da414b322",
"aiootp/ciphers/chunky_2048_config.py": "82eb76774817828ffbabd8b4d28945b5329c99a7f795913db7888df468905357bc8f99596a4e936a746200ebc4f3c01d",
Expand Down Expand Up @@ -117,7 +117,7 @@
"aiootp/keygens/passcrypt/sessions_manager.py": "9c303ea4fc307670f9a8a6e2df3a6b248f37acd1e1035f65127bff27e60b8076138d3ecf9e451fa01a1d926ec88243a7",
"aiootp/randoms/__init__.py": "4f09fbe3c1968098ac73c25fe1b414bef61e8cdb1edd6c639db45be10fb8e5c867f0cfe609412f0d5a5a429519cb335d",
"aiootp/randoms/_early_salts.py": "0c0cc54d96c1e38196c5e2cb802a7297dbf414cafabe7efb30929081c006bb9a5b05af2e2a2733b160dc766800cf43a1",
"aiootp/randoms/entropy_daemon.py": "01d9f08c9f719069e7ea6082c415a5c659c5ed1bf5fec7f4ff51cad459a26050b9a5ab48760c99d1b9f5c2ed9876957a",
"aiootp/randoms/entropy_daemon.py": "44fc8b9c77b0274630b1ed1443c9acbf54892c598c9c6b6ae1630813b949bffe8f3956838c2c89a5e8220f5da9feaa70",
"aiootp/randoms/ids/__init__.py": "2b9a67b2450cc9d28920ed09e3decf3a2a27435c37699e3a0d3f984e1b1ed16a024f677c9daf9925728b15be7d041831",
"aiootp/randoms/ids/guid.py": "b113604765461f4b24f0e21e821b43fca8c06c61a48e1cdfc02ce675f9f64c6745d13d5224e574f859cb5d86baca28e3",
"aiootp/randoms/ids/guid_config.py": "d31d57748d912a60e6feebf8fb28fa1a7c0298c3a42969c239f80d3470c32c170c19969a192abe0dddfb418496eae836",
Expand Down Expand Up @@ -150,7 +150,7 @@
"tests/test_PackageSigner.py": "a3a8e3e0db5cc1438ab00529a24b8f406deddf78d7c6c8457c1020f068e3486869b04aac76550004c14156c04f0d0911",
"tests/test_Padding.py": "8f2c273e441d901131adcb1c2de2473bad3a6fc1d5c1798aa5a6b5ad273b78ffb41bb1d1d49699850ef8c790f8d8c155",
"tests/test_Passcrypt.py": "4a4bfbab92e5c149326cf3aa68c05b36303098618f0530a576759bc3a58968884fdf5ec1852587a12397b05a9233bc24",
"tests/test_Processes_Threads.py": "5ff872d50a9c6ff8ac52b03a0cf1d68fc172a48039a874378268bbb12b5287f049ef75fa71a8944c4b6135f21da9bcd2",
"tests/test_Processes_Threads.py": "3872d7e03af57cf4c3879cdacc1bfddf5875d831f58003a7e7cad1f59a6d4ac81cc02d7ac87f3ab13db07dd69a300ecb",
"tests/test_RawGUID.py": "cffadd009f1a9d8ad3159640a7a3241fe5672729eb8eb4bfc8903dbe0d00ba2fbc37738f649946ed667f7793c2509604",
"tests/test_SequenceID.py": "6747f3baaf4d5d4ab33e0252ee304eac7e4f02dc404f29eadfb1cfda3f9df71e7a0324fe3e5656348d1ed57d6451666f",
"tests/test_StreamHMAC.py": "1d988fda201211cd00441f5290ffac36185cfda4a87f0bc07f9de2c0222c9be426f31f44cd645e29ace704f3ce0c63bf",
Expand Down Expand Up @@ -180,22 +180,22 @@
"tests/test_randoms.py": "056fc3e65dbad03d0bb9044080ea821dfa7f342712fe8ee15335bd3e0e57d21302385c255436d8bf4c442991321e128b",
"tests/test_time_to_live.py": "ba902ba10362643fba954b16cf4fafc22e9c6e4b42f8bee2af40f8beb41f770d99676a5c9e841ce4c3144781495c621f",
"tests/test_typing.py": "f4e75dabbac3d653a02bf9e22b842bd9d26b05b027767e0fecfc03c00ec3838cb263f608099bc8ddf78c06a05962e33d",
"tests/test_typing_protocols.py": "c32de109dd5d309282c2b73e339d691c7273421cfd7f9ae6031d4398603f0844242b62250ce5202baf86eaaaaf204216"
"tests/test_typing_protocols.py": "6be6888113eda63836a8c305d60f9a438b58ea85d960daa12b228f08a2f49aef97fc8a4aaa23fafffb30c041bcd72f9b"
},
"public_credentials": {
"rmlibre_git_commit_signing_key": "-----BEGIN PGP PUBLIC KEY BLOCK-----\n\nmDMEZoDhpxYJKwYBBAHaRw8BAQdA4ukwNFuUROjDZF9uDWZ4mBbZhNIWhddUmNbZ\nd/XQExu0PHJpY2NoaSBtYWNoYWRvIChnaXQgY29tbWl0IHNpZ25pbmcga2V5KSA8\ncm1saWJyZUByaXNldXAubmV0PoiWBBMWCAA+FiEESjfOAk3bwKm2N5Ni/ETXxWjh\ninAFAmaA4acCGwMFCQWjmoAFCwkIBwIGFQoJCAsCBBYCAwECHgECF4AACgkQ/ETX\nxWjhinC0XgEArr+a+kCA9o53Mdj+qU5uIQdKpwwnSaJXpX6sOOcKHrYA/3egnXeU\nIlDQV8IYusMJXiaTFzf7eC1UUv2jJ6zqNhMC\n=inI6\n-----END PGP PUBLIC KEY BLOCK-----",
"signed_aiootp_package_signing_key": "-----BEGIN PGP MESSAGE-----\n\nowGbwMvMwCX2x+X60YyHXQWMp92SGNIad+eZG6QYmpsYpBklmhhbpiRaWhiZGCeb\nGCeapJinphkmp1laGidZmCeam6YZJyUZWJgaJqaaW6akmpmbJqaZJhkndZSyMIhx\nMciKKbJ4mZ9j8r19YOU288lJMHtYmUCWMHBxCsBENG8yMsycYnbl9u1apfMxup5x\nLOarbe5/nikZZdk3iSG3+FFc9nZGhqdWHn93NF+y7nuw7POp57qK4kVyqx3+f5N2\n9H2gyXjIjBEA\n=ucIC\n-----END PGP MESSAGE-----"
},
"scope": {
"author": "[email protected]",
"build_number": "1",
"date": 20016,
"build_number": "2",
"date": 20017,
"description": "a high-level async cryptographic anonymity library to scale, simplify, & automate privacy best practices for secure data & identity processing, communication, & storage.",
"git_branch": "main",
"license": "AGPL-3.0-only",
"package": "aiootp",
"version": "0.23.16"
},
"signing_key": "70d1740f2a439da98243c43a4d7ef1cf993b87a75f3bb0851ae79de675af5b3b",
"signature": "2ac02f6c0da7476ee94d7eb99074cbfb50b9a2507c884bd1313d0f6f1d9ed7af3188b4bc45e136925ffd40c270db138d8eec1489bb5a6dc299898e95fe22f604"
"signature": "8b5432258e422b21af64b97759fa51d9a48a0acc3fece7dcc0abe327b3f75c45216d302a8c0cecac7b3df7e8cf907abdfc7da3fc63788e3cdd44864dcb914e05"
}
32 changes: 31 additions & 1 deletion aiootp/_typing/_asynchs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
the `asynchs` subpackage.
"""

__all__ = ["AsyncOrSyncIterable", "ClockType", "Future", "PoolExecutorType"]
__all__ = [
"AsyncOrSyncIterable",
"ClockType",
"Future",
"PoolExecutorType",
"QueueType",
]


from concurrent.futures._base import Future
Expand Down Expand Up @@ -65,6 +71,29 @@ def submit(
pass # pragma: no cover


@t.runtime_checkable
class QueueType(t.Protocol):
def get(self, block: bool, timeout: t.Optional[int]) -> t.Any:
pass # pragma: no cover

def get_nowait(self) -> t.Any:
pass # pragma: no cover

def put(
self, item: t.Any, block: bool, timeout: t.Optional[int]
) -> None:
pass # pragma: no cover

def put_nowait(self, item: t.Any) -> None:
pass # pragma: no cover

def empty(self) -> bool:
pass # pragma: no cover

def full(self) -> bool:
pass # pragma: no cover


@t.runtime_checkable
class ClockType(t.Protocol):
async def atime(self) -> int:
Expand Down Expand Up @@ -95,6 +124,7 @@ def test_timestamp(
ClockType=t.add_type(ClockType),
Future=t.add_type(Future),
PoolExecutorType=t.add_type(PoolExecutorType),
QueueType=t.add_type(QueueType),
__all__=__all__,
__doc__=__doc__,
__file__=__file__,
Expand Down
52 changes: 26 additions & 26 deletions aiootp/asynchs/concurrency_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,47 +184,56 @@ class ConcurrencyInterface:

__slots__ = ()

_Manager: type

_default_probe_delay: t.PositiveRealNumber
_pool: t.PoolExecutorType
_type: type

BrokenPool: type

get_id: t.Callable[[], int]

@classmethod
async def aget_id(cls, /) -> int:
"""
Retrieves either the current calling environment's process or
thread ID depending on the subclass being `Processes` or `Threads`.
"""
await asleep()
return cls.get_id()

@staticmethod
def _arun_func(
func: t.Callable[..., t.Any],
state: t.Sequence[t.Any],
queue: t.QueueType,
/,
*args: t.Any,
**kwargs: t.Any,
) -> None:
"""
Used by the class to retrieve return values from an async or
sync `func` run in a new process / thread by storing the result
in a shared `state` container.
in a shared `queue` container.
"""
if is_async_function(func):
run = new_event_loop().run_until_complete
state.append(run(func(*args, **kwargs)))
queue.put_nowait(run(func(*args, **kwargs)))
else:
state.append(func(*args, **kwargs))
queue.put_nowait(func(*args, **kwargs))

@staticmethod
def _run_func(
func: t.Callable[..., t.Any],
state: t.Sequence[t.Any],
queue: t.QueueType,
/,
*args: t.Any,
**kwargs: t.Any,
) -> None:
"""
Used by the class to retrieve return values from a sync `func`
run in a new process / thread by storing the result in a shared
`state` container.
`queue` container.
"""
state.append(func(*args, **kwargs))
queue.put_nowait(func(*args, **kwargs))

@classmethod
async def anew(
Expand All @@ -242,17 +251,17 @@ async def anew(
coexist with asynchronous code.
"""
delay = process_probe_delay(cls, probe_delay)
state = cls._Manager().list()
queue = cls._get_queue()
task = cls._type(
target=cls._arun_func,
args=(func, state, *args),
args=(func, queue, *args),
kwargs=kwargs,
)
task.start()
while task.is_alive():
await asleep(delay)
task.join()
return state.pop()
return queue.get_nowait()

@classmethod
def new(
Expand All @@ -270,17 +279,17 @@ def new(
asynchronous code.
"""
delay = process_probe_delay(cls, probe_delay)
state = cls._Manager().list()
queue = cls._get_queue()
task = cls._type(
target=cls._run_func,
args=(func, state, *args),
args=(func, queue, *args),
kwargs=kwargs,
)
task.start()
while task.is_alive():
sleep(delay)
task.join()
return state.pop()
return queue.get_nowait()

@staticmethod
def _get_result(
Expand Down Expand Up @@ -367,7 +376,7 @@ async def agather(
/,
*functions: t.Callable[..., t.Any],
args: t.Iterable[t.Any] = (),
kwargs: t.Mapping[t.Hashable, t.Any] = {},
kwargs: t.Mapping[str, t.Any] = {},
) -> t.List[t.Any]:
"""
Sumbits all of the async or synchronous `functions` to the
Expand All @@ -389,7 +398,7 @@ def gather(
/,
*functions: t.Callable[..., t.Any],
args: t.Iterable[t.Any] = (),
kwargs: t.Mapping[t.Hashable, t.Any] = {},
kwargs: t.Mapping[str, t.Any] = {},
) -> t.List[t.Any]:
"""
Sumbits all the `functions` to the `Processes._pool` or
Expand All @@ -402,15 +411,6 @@ def gather(
for task in tasks:
task.cancel()

@classmethod
def reset_pool(cls, /) -> None:
"""
When a process or thread pool is broken by an abruptly exited,
this method can be called to reset the class' pool object with
a new instance.
"""
cls._pool = cls._pool.__class__()


module_api = dict(
ConcurrencyGuard=t.add_type(ConcurrencyGuard),
Expand Down
27 changes: 23 additions & 4 deletions aiootp/asynchs/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


import multiprocessing
from multiprocessing.context import SpawnContext
from os import getpid as get_process_id
from concurrent.futures import process
from concurrent.futures import ProcessPoolExecutor
Expand All @@ -37,14 +38,32 @@ class Processes(ConcurrencyInterface):

__slots__ = ()

_Manager: type = multiprocessing.Manager

_default_probe_delay: t.PositiveRealNumber = 0.005
_pool: t.PoolExecutorType = ProcessPoolExecutor()
_type: type = multiprocessing.Process
_context: SpawnContext = multiprocessing.get_context("spawn")
_pool: t.PoolExecutorType = ProcessPoolExecutor(mp_context=_context)
_type: type = _context.Process

BrokenPool: type = process.BrokenProcessPool

get_id: t.Callable[[], int] = get_process_id

@classmethod
def _get_queue(cls, /, maxsize: int = 1) -> t.QueueType:
"""
Returns a queue object to retrieve values from spawned workers
with the class' default multiprocessing context.
"""
return cls._context.Queue(maxsize=maxsize)

@classmethod
def reset_pool(cls, /) -> None:
"""
When a process pool is broken by being abruptly exited, this
method can be called to reset the class' pool object with a new
instance with its default multiprocessing context.
"""
cls._pool = cls._pool.__class__(mp_context=cls._context)


module_api = dict(
Processes=t.add_type(Processes),
Expand Down
32 changes: 19 additions & 13 deletions aiootp/asynchs/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
__all__ = ["Threads", "get_thread_id"]


import queue
from threading import Thread
from collections import deque
from concurrent.futures import thread
from _thread import get_ident as get_thread_id
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -38,24 +38,30 @@ class Threads(ConcurrencyInterface):

__slots__ = ()

class _Manager:
"""
This type is for parity with the `Processes` class' use of the
`multiprocessing.Manager`. It returns an atomic list-like
container so state can be passed around from spawned threads to
calling code.
"""

@staticmethod
def list() -> t.SupportsAppendPop:
return deque(maxlen=1)

_default_probe_delay: t.PositiveRealNumber = 0.001
_pool: t.PoolExecutorType = ThreadPoolExecutor()
_type: type = Thread

BrokenPool: type = thread.BrokenThreadPool

get_id: t.Callable[[], int] = get_thread_id

@classmethod
def _get_queue(cls, /, maxsize: int = 1) -> t.QueueType:
"""
Returns a queue object to retrieve values from spawned workers.
"""
return queue.Queue(maxsize=maxsize)

@classmethod
def reset_pool(cls, /) -> None:
"""
When a thread pool is broken by being abruptly exited, this
method can be called to reset the class' pool object with a new
instance.
"""
cls._pool = cls._pool.__class__()


module_api = dict(
Threads=t.add_type(Threads),
Expand Down
4 changes: 2 additions & 2 deletions aiootp/randoms/entropy_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ def start(self) -> t.Self:
This supports the package by asynchronously & continuously
seeding into & extracting new entropy from its entropy pools.
"""
state = Threads._Manager().list()
queue = Threads._get_queue()
self._daemon = Threads._type(
target=Threads._arun_func,
args=(self._araw_loop, state),
args=(self._araw_loop, queue),
)
self._daemon.daemon = True
self._daemon.start()
Expand Down
Loading

0 comments on commit 2991c59

Please sign in to comment.