Skip to content

Commit

Permalink
requirements: bump min aiorpcx to 0.22.0
Browse files Browse the repository at this point in the history
aiorpcx 0.20 changed the behaviour/API of TaskGroups.
When used as a context manager, TaskGroups no longer propagate
exceptions raised by their tasks. Instead, the calling code has
to explicitly check the results of tasks and decide whether to re-raise
any exceptions.
This is a significant change, and so this commit introduces "OldTaskGroup",
which should behave as the TaskGroup class of old aiorpcx. All existing
usages of TaskGroup are replaced with OldTaskGroup.

closes spesmilo#7446
  • Loading branch information
SomberNight committed Feb 8, 2022
1 parent 5eebc00 commit 86a1e58
Show file tree
Hide file tree
Showing 16 changed files with 96 additions and 80 deletions.
6 changes: 3 additions & 3 deletions contrib/deterministic-build/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ aiohttp==3.8.1 \
aiohttp-socks==0.7.1 \
--hash=sha256:2215cac4891ef3fa14b7d600ed343ed0f0a670c23b10e4142aa862b3db20341a \
--hash=sha256:94bcff5ef73611c6c6231c2ffc1be4af1599abec90dbd2fdbbd63233ec2fb0ff
aiorpcX==0.18.7 \
--hash=sha256:7fa48423e1c06cd0ffb7b60f2cca7e819b6cbbf57d4bc8a82944994ef5038f05 \
--hash=sha256:808a9ec9172df11677a0f7b459b69d1a6cf8b19c19da55541fa31fb1afce5ce7
aiorpcX==0.22.1 \
--hash=sha256:6026f7bed3432e206589c94dcf599be8cd85b5736b118c7275845c1bd922a553 \
--hash=sha256:e74f9fbed3fd21598e71fe05066618fc2c06feec504fe29490ddda05fdbdde62
aiosignal==1.2.0 \
--hash=sha256:26e62109036cd181df6e6ad646f91f0dcfd05fe16d0cb924138ff2ab75d64e3a \
--hash=sha256:78ed67db6c7b7ced4f98e495e572106d5c432a93e1ddd1bf475e1dc05f5b7df2
Expand Down
2 changes: 1 addition & 1 deletion contrib/requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
qrcode
protobuf>=3.12
qdarkstyle>=2.7
aiorpcx>=0.18.7,<0.19
aiorpcx>=0.22.0,<0.23
aiohttp>=3.3.0,<4.0.0
aiohttp_socks>=0.3
certifi
Expand Down
6 changes: 2 additions & 4 deletions electrum/address_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, Optional, Set, Tuple, NamedTuple, Sequence, List

from aiorpcx import TaskGroup

from . import bitcoin, util
from .bitcoin import COINBASE_MATURITY
from .util import profiler, bfh, TxMinedInfo, UnrelatedTransactionException, with_lock
from .util import profiler, bfh, TxMinedInfo, UnrelatedTransactionException, with_lock, OldTaskGroup
from .transaction import Transaction, TxOutput, TxInput, PartialTxInput, TxOutpoint, PartialTransaction
from .synchronizer import Synchronizer
from .verifier import SPV
Expand Down Expand Up @@ -193,7 +191,7 @@ def on_blockchain_updated(self, event, *args):
async def stop(self):
if self.network:
try:
async with TaskGroup() as group:
async with OldTaskGroup() as group:
if self.synchronizer:
await group.spawn(self.synchronizer.stop())
if self.verifier:
Expand Down
7 changes: 3 additions & 4 deletions electrum/bip39_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@

from typing import TYPE_CHECKING

from aiorpcx import TaskGroup

from . import bitcoin
from .constants import BIP39_WALLET_FORMATS
from .bip32 import BIP32_PRIME, BIP32Node
from .bip32 import convert_bip32_path_to_list_of_uint32 as bip32_str_to_ints
from .bip32 import convert_bip32_intpath_to_strpath as bip32_ints_to_str
from .util import OldTaskGroup

if TYPE_CHECKING:
from .network import Network


async def account_discovery(network: 'Network', get_account_xpub):
async with TaskGroup() as group:
async with OldTaskGroup() as group:
account_scan_tasks = []
for wallet_format in BIP39_WALLET_FORMATS:
account_scan = scan_for_active_accounts(network, get_account_xpub, wallet_format)
Expand Down Expand Up @@ -46,7 +45,7 @@ async def scan_for_active_accounts(network: 'Network', get_account_xpub, wallet_

async def account_has_history(network: 'Network', account_node: BIP32Node, script_type: str) -> bool:
gap_limit = 20
async with TaskGroup() as group:
async with OldTaskGroup() as group:
get_history_tasks = []
for address_index in range(gap_limit):
address_node = account_node.subkey_at_public_derivation("0/" + str(address_index))
Expand Down
10 changes: 5 additions & 5 deletions electrum/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@

import aiohttp
from aiohttp import web, client_exceptions
from aiorpcx import TaskGroup, timeout_after, TaskTimeout, ignore_after
from aiorpcx import timeout_after, TaskTimeout, ignore_after

from . import util
from .network import Network
from .util import (json_decode, to_bytes, to_string, profiler, standardize_path, constant_time_compare)
from .invoices import PR_PAID, PR_EXPIRED
from .util import log_exceptions, ignore_exceptions, randrange
from .util import log_exceptions, ignore_exceptions, randrange, OldTaskGroup
from .wallet import Wallet, Abstract_Wallet
from .storage import WalletStorage
from .wallet_db import WalletDB
Expand Down Expand Up @@ -493,7 +493,7 @@ def __init__(self, config: SimpleConfig, fd=None, *, listen_jsonrpc=True):
self._stop_entered = False
self._stopping_soon_or_errored = threading.Event()
self._stopped_event = threading.Event()
self.taskgroup = TaskGroup()
self.taskgroup = OldTaskGroup()
asyncio.run_coroutine_threadsafe(self._run(jobs=daemon_jobs), self.asyncio_loop)

@log_exceptions
Expand Down Expand Up @@ -591,12 +591,12 @@ async def stop(self):
if self.gui_object:
self.gui_object.stop()
self.logger.info("stopping all wallets")
async with TaskGroup() as group:
async with OldTaskGroup() as group:
for k, wallet in self._wallets.items():
await group.spawn(wallet.stop())
self.logger.info("stopping network and taskgroup")
async with ignore_after(2):
async with TaskGroup() as group:
async with OldTaskGroup() as group:
if self.network:
await group.spawn(self.network.stop(full_shutdown=True))
await group.spawn(self.taskgroup.cancel_remaining())
Expand Down
6 changes: 3 additions & 3 deletions electrum/exchange_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
from decimal import Decimal
from typing import Sequence, Optional

from aiorpcx.curio import timeout_after, TaskTimeout, TaskGroup
from aiorpcx.curio import timeout_after, TaskTimeout
import aiohttp

from . import util
from .bitcoin import COIN
from .i18n import _
from .util import (ThreadJob, make_dir, log_exceptions,
from .util import (ThreadJob, make_dir, log_exceptions, OldTaskGroup,
make_aiohttp_session, resource_path)
from .network import Network
from .simple_config import SimpleConfig
Expand Down Expand Up @@ -449,7 +449,7 @@ async def get_currencies_safe(name, exchange):

async def query_all_exchanges_for_their_ccys_over_network():
async with timeout_after(10):
async with TaskGroup() as group:
async with OldTaskGroup() as group:
for name, klass in exchanges.items():
exchange = klass(None, None)
await group.spawn(get_currencies_safe(name, exchange))
Expand Down
7 changes: 3 additions & 4 deletions electrum/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import functools

import aiorpcx
from aiorpcx import TaskGroup
from aiorpcx import RPCSession, Notification, NetAddress, NewlineFramer
from aiorpcx.curio import timeout_after, TaskTimeout
from aiorpcx.jsonrpc import JSONRPC, CodeMessageError
Expand All @@ -47,7 +46,7 @@

from .util import (ignore_exceptions, log_exceptions, bfh, MySocksProxy,
is_integer, is_non_negative_integer, is_hash256_str, is_hex_str,
is_int_or_float, is_non_negative_int_or_float)
is_int_or_float, is_non_negative_int_or_float, OldTaskGroup)
from . import util
from . import x509
from . import pem
Expand Down Expand Up @@ -376,7 +375,7 @@ def __init__(self, *, network: 'Network', server: ServerAddr, proxy: Optional[di
# Dump network messages (only for this interface). Set at runtime from the console.
self.debug = False

self.taskgroup = TaskGroup()
self.taskgroup = OldTaskGroup()

async def spawn_task():
task = await self.network.taskgroup.spawn(self.run())
Expand Down Expand Up @@ -675,7 +674,7 @@ async def ping(self):
async def request_fee_estimates(self):
from .simple_config import FEE_ETA_TARGETS
while True:
async with TaskGroup() as group:
async with OldTaskGroup() as group:
fee_tasks = []
for i in FEE_ETA_TARGETS:
fee_tasks.append((i, await group.spawn(self.get_estimatefee(i))))
Expand Down
10 changes: 5 additions & 5 deletions electrum/lnpeer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
import functools

import aiorpcx
from aiorpcx import TaskGroup, ignore_after
from aiorpcx import ignore_after

from .crypto import sha256, sha256d
from . import bitcoin, util
from . import ecc
from .ecc import sig_string_from_r_and_s, der_sig_from_sig_string
from . import constants
from .util import (bh2u, bfh, log_exceptions, ignore_exceptions, chunks, TaskGroup,
from .util import (bh2u, bfh, log_exceptions, ignore_exceptions, chunks, OldTaskGroup,
UnrelatedTransactionException)
from . import transaction
from .bitcoin import make_op_return
Expand Down Expand Up @@ -105,7 +105,7 @@ def __init__(
self.announcement_signatures = defaultdict(asyncio.Queue)
self.orphan_channel_updates = OrderedDict() # type: OrderedDict[ShortChannelID, dict]
Logger.__init__(self)
self.taskgroup = TaskGroup()
self.taskgroup = OldTaskGroup()
# HTLCs offered by REMOTE, that we started removing but are still active:
self.received_htlcs_pending_removal = set() # type: Set[Tuple[Channel, int]]
self.received_htlc_removed_event = asyncio.Event()
Expand Down Expand Up @@ -1859,7 +1859,7 @@ async def htlc_switch(self):
# we can get triggered for events that happen on the downstream peer.
# TODO: trampoline forwarding relies on the polling
async with ignore_after(0.1):
async with TaskGroup(wait=any) as group:
async with OldTaskGroup(wait=any) as group:
await group.spawn(self._received_revack_event.wait())
await group.spawn(self.downstream_htlc_resolved_event.wait())
self._htlc_switch_iterstart_event.set()
Expand Down Expand Up @@ -1943,7 +1943,7 @@ async def htlc_switch_iteration():
await self._htlc_switch_iterstart_event.wait()
await self._htlc_switch_iterdone_event.wait()

async with TaskGroup(wait=any) as group:
async with OldTaskGroup(wait=any) as group:
await group.spawn(htlc_switch_iteration())
await group.spawn(self.got_disconnected.wait())

Expand Down
12 changes: 6 additions & 6 deletions electrum/lnworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@

import dns.resolver
import dns.exception
from aiorpcx import run_in_thread, TaskGroup, NetAddress, ignore_after
from aiorpcx import run_in_thread, NetAddress, ignore_after

from . import constants, util
from . import keystore
from .util import profiler, chunks
from .util import profiler, chunks, OldTaskGroup
from .invoices import PR_TYPE_LN, PR_UNPAID, PR_EXPIRED, PR_PAID, PR_INFLIGHT, PR_FAILED, PR_ROUTING, LNInvoice, LN_EXPIRY_NEVER
from .util import NetworkRetryManager, JsonRPCClient
from .lnutil import LN_MAX_FUNDING_SAT
Expand Down Expand Up @@ -200,7 +200,7 @@ def __init__(self, xprv, features: LnFeatures):
self.node_keypair = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.NODE_KEY)
self.backup_key = generate_keypair(BIP32Node.from_xkey(xprv), LnKeyFamily.BACKUP_CIPHER).privkey
self._peers = {} # type: Dict[bytes, Peer] # pubkey -> Peer # needs self.lock
self.taskgroup = TaskGroup()
self.taskgroup = OldTaskGroup()
self.listen_server = None # type: Optional[asyncio.AbstractServer]
self.features = features
self.network = None # type: Optional[Network]
Expand Down Expand Up @@ -767,13 +767,13 @@ async def wait_for_received_pending_htlcs_to_get_removed(self):
# to wait a bit for it to become irrevocably removed.
# Note: we don't wait for *all htlcs* to get removed, only for those
# that we can already fail/fulfill. e.g. forwarded htlcs cannot be removed
async with TaskGroup() as group:
async with OldTaskGroup() as group:
for peer in self.peers.values():
await group.spawn(peer.wait_one_htlc_switch_iteration())
while True:
if all(not peer.received_htlcs_pending_removal for peer in self.peers.values()):
break
async with TaskGroup(wait=any) as group:
async with OldTaskGroup(wait=any) as group:
for peer in self.peers.values():
await group.spawn(peer.received_htlc_removed_event.wait())

Expand Down Expand Up @@ -2269,7 +2269,7 @@ async def _request_force_close_from_backup(self, channel_id: bytes):
transport = LNTransport(privkey, peer_addr, proxy=self.network.proxy)
peer = Peer(self, node_id, transport, is_channel_backup=True)
try:
async with TaskGroup(wait=any) as group:
async with OldTaskGroup(wait=any) as group:
await group.spawn(peer._message_loop())
await group.spawn(peer.trigger_force_close(channel_id))
return
Expand Down
18 changes: 9 additions & 9 deletions electrum/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
import functools

import aiorpcx
from aiorpcx import TaskGroup, ignore_after
from aiorpcx import ignore_after
from aiohttp import ClientResponse

from . import util
from .util import (log_exceptions, ignore_exceptions,
from .util import (log_exceptions, ignore_exceptions, OldTaskGroup,
bfh, make_aiohttp_session, send_exception_to_crash_reporter,
is_hash256_str, is_non_negative_integer, MyEncoder, NetworkRetryManager,
nullcontext)
Expand Down Expand Up @@ -246,7 +246,7 @@ class Network(Logger, NetworkRetryManager[ServerAddr]):

LOGGING_SHORTCUT = 'n'

taskgroup: Optional[TaskGroup]
taskgroup: Optional[OldTaskGroup]
interface: Optional[Interface]
interfaces: Dict[ServerAddr, Interface]
_connecting_ifaces: Set[ServerAddr]
Expand Down Expand Up @@ -462,7 +462,7 @@ async def get_server_peers():
async def get_relay_fee():
self.relay_fee = await interface.get_relay_fee()

async with TaskGroup() as group:
async with OldTaskGroup() as group:
await group.spawn(get_banner)
await group.spawn(get_donation_address)
await group.spawn(get_server_peers)
Expand Down Expand Up @@ -839,7 +839,7 @@ async def make_reliable_wrapper(self: 'Network', *args, **kwargs):
assert iface.ready.done(), "interface not ready yet"
# try actual request
try:
async with TaskGroup(wait=any) as group:
async with OldTaskGroup(wait=any) as group:
task = await group.spawn(func(self, *args, **kwargs))
await group.spawn(iface.got_disconnected.wait())
except RequestTimedOut:
Expand Down Expand Up @@ -1184,7 +1184,7 @@ def export_checkpoints(self, path):

async def _start(self):
assert not self.taskgroup
self.taskgroup = taskgroup = TaskGroup()
self.taskgroup = taskgroup = OldTaskGroup()
assert not self.interface and not self.interfaces
assert not self._connecting_ifaces
assert not self._closing_ifaces
Expand Down Expand Up @@ -1225,7 +1225,7 @@ async def stop(self, *, full_shutdown: bool = True):
# timeout: if full_shutdown, it is up to the caller to time us out,
# otherwise if e.g. restarting due to proxy changes, we time out fast
async with (nullcontext() if full_shutdown else ignore_after(1)):
async with TaskGroup() as group:
async with OldTaskGroup() as group:
await group.spawn(self.taskgroup.cancel_remaining())
if full_shutdown:
await group.spawn(self.stop_gossip(full_shutdown=full_shutdown))
Expand Down Expand Up @@ -1278,7 +1278,7 @@ async def maintain_main_interface():
except asyncio.CancelledError:
# suppress spurious cancellations
group = self.taskgroup
if not group or group.closed():
if not group or group.joined:
raise
await asyncio.sleep(0.1)

Expand Down Expand Up @@ -1352,7 +1352,7 @@ async def get_response(server: ServerAddr):
except Exception as e:
res = e
responses[interface.server] = res
async with TaskGroup() as group:
async with OldTaskGroup() as group:
for server in servers:
await group.spawn(get_response(server))
return responses
Expand Down
6 changes: 3 additions & 3 deletions electrum/synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
from collections import defaultdict
import logging

from aiorpcx import TaskGroup, run_in_thread, RPCError
from aiorpcx import run_in_thread, RPCError

from . import util
from .transaction import Transaction, PartialTransaction
from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy
from .util import bh2u, make_aiohttp_session, NetworkJobOnDefaultServer, random_shuffled_copy, OldTaskGroup
from .bitcoin import address_to_scripthash, is_address
from .logging import Logger
from .interface import GracefulDisconnect, NetworkTimeout
Expand Down Expand Up @@ -218,7 +218,7 @@ async def _request_missing_txs(self, hist, *, allow_server_not_finding_tx=False)
self.requested_tx[tx_hash] = tx_height

if not transaction_hashes: return
async with TaskGroup() as group:
async with OldTaskGroup() as group:
for tx_hash in transaction_hashes:
await group.spawn(self._get_transaction(tx_hash, allow_server_not_finding_tx=allow_server_not_finding_tx))

Expand Down
Loading

0 comments on commit 86a1e58

Please sign in to comment.