Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Extend widget creation policy #1611

Merged
merged 5 commits into from
Sep 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 98 additions & 61 deletions Orange/canvas/scheme/signalmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
"""

import logging
import itertools
import warnings

from collections import namedtuple, defaultdict, deque
from operator import attrgetter, add
from operator import attrgetter
from functools import partial


from PyQt4.QtCore import QObject, QCoreApplication, QEvent
from PyQt4.QtCore import pyqtSignal as Signal
from PyQt4.QtCore import QObject, QCoreApplication, QEvent, QTimer
from PyQt4.QtCore import pyqtSignal as Signal, pyqtSlot as Slot


from .scheme import SchemeNode, SchemeLink
Expand Down Expand Up @@ -83,6 +83,8 @@ def __init__(self, scheme):

# A flag indicating if UpdateRequest event should be rescheduled
self.__reschedule = False
self.__update_timer = QTimer(self, interval=100, singleShot=True)
self.__update_timer.timeout.connect(self.__process_next)

def _can_process(self):
"""
Expand Down Expand Up @@ -123,6 +125,7 @@ def stop(self):
if self.__state != SignalManager.Stoped:
self.__state = SignalManager.Stoped
self.stateChanged.emit(SignalManager.Stoped)
self.__update_timer.stop()

def pause(self):
"""
Expand All @@ -132,6 +135,7 @@ def pause(self):
if self.__state != SignalManager.Paused:
self.__state = SignalManager.Paused
self.stateChanged.emit(SignalManager.Paused)
self.__update_timer.stop()

def resume(self):
if self.__state == SignalManager.Paused:
Expand All @@ -141,7 +145,7 @@ def resume(self):

def step(self):
if self.__state == SignalManager.Paused:
self.process_queued(1)
self.process_queued()

def state(self):
"""
Expand Down Expand Up @@ -294,30 +298,34 @@ def _update_link(self, link):
def process_queued(self, max_nodes=None):
"""
Process queued signals.

Take one node node from the pending input queue and deliver
all scheduled signals.
"""
if max_nodes is not None or max_nodes != 1:
warnings.warn(
"`max_nodes` is deprecated and unused (will always equal 1)",
DeprecationWarning, stacklevel=2)

if self.__runtime_state == SignalManager.Processing:
raise RuntimeError("Cannot re-enter 'process_queued'")

if not self._can_process():
raise RuntimeError("Can't process in state %i" % self.__state)

log.info("Processing queued signals")
log.info("SignalManager: Processing queued signals")

node_update_front = self.node_update_front()

if max_nodes is not None:
node_update_front = node_update_front[:max_nodes]

log.debug("Nodes for update %s",
log.debug("SignalManager: Nodes eligible for update %s",
[node.title for node in node_update_front])

self._set_runtime_state(SignalManager.Processing)
try:
# TODO: What if the update front changes in the loop?
for node in node_update_front:
if node_update_front:
node = node_update_front[0]
self._set_runtime_state(SignalManager.Processing)
try:
self.process_node(node)
finally:
self._set_runtime_state(SignalManager.Waiting)
finally:
self._set_runtime_state(SignalManager.Waiting)

def process_node(self, node):
"""
Expand Down Expand Up @@ -374,9 +382,16 @@ def is_pending(self, node):

def pending_nodes(self):
"""
Return a list of pending nodes (in no particular order).
Return a list of pending nodes.

The nodes are returned in the order they were enqueued for
signal delivery.

Returns
-------
nodes : List[SchemeNode]
"""
return list(set(sig.link.sink_node for sig in self._input_queue))
return list(unique(sig.link.sink_node for sig in self._input_queue))

def pending_input_signals(self, node):
"""
Expand Down Expand Up @@ -422,61 +437,59 @@ def node_update_front(self):
dependents = partial(dependent_nodes, scheme)

blocked_nodes = reduce(set.union,
list(map(dependents, blocking_nodes)),
map(dependents, blocking_nodes),
set(blocking_nodes))

pending = set(self.pending_nodes())

pending = self.pending_nodes()
pending_downstream = reduce(set.union,
list(map(dependents, pending)),
map(dependents, pending),
set())

log.debug("Pending nodes: %s", pending)
log.debug("Blocking nodes: %s", blocking_nodes)

return list(pending - pending_downstream - blocked_nodes)

def event(self, event):
if event.type() == QEvent.UpdateRequest:
if not self.__state == SignalManager.Running:
log.debug("Received 'UpdateRequest' event while not "
"in 'Running' state")
event.setAccepted(False)
return False

if self.__runtime_state == SignalManager.Processing:
log.debug("Received 'UpdateRequest' event while in "
"'process_queued'")
# This happens if someone calls QCoreApplication.processEvents
# from the signal handlers.
self.__reschedule = True
event.accept()
return True

log.info("'UpdateRequest' event, queued signals: %i",
len(self._input_queue))
if self._input_queue:
self.process_queued(max_nodes=1)
event.accept()

if self.__reschedule:
log.debug("Rescheduling 'UpdateRequest' event")
self._update()
self.__reschedule = False
elif self.node_update_front():
log.debug("More nodes are eligible for an update. "
"Scheduling another update.")
self._update()

return True

return QObject.event(self, event)
noneligible = pending_downstream | blocked_nodes
return [node for node in pending if node not in noneligible]

@Slot()
def __process_next(self):
if not self.__state == SignalManager.Running:
log.debug("Received 'UpdateRequest' while not in 'Running' state")
return

if self.__runtime_state == SignalManager.Processing:
# This happens if someone calls QCoreApplication.processEvents
# from the signal handlers.
# A `__process_next` must be rescheduled when exiting
# process_queued.
log.warning("Received 'UpdateRequest' while in 'process_queued'. "
"An update will be re-scheduled when exiting the "
"current update.")
self.__reschedule = True
return

log.info("'UpdateRequest' event, queued signals: %i",
len(self._input_queue))
if self._input_queue:
self.process_queued()

if self.__reschedule and self.__state == SignalManager.Running:
self.__reschedule = False
log.debug("Rescheduling signal update")
self.__update_timer.start()

if self.node_update_front():
log.debug("More nodes are eligible for an update. "
"Scheduling another update.")
self._update()

def _update(self):
"""
Schedule processing at a later time.
"""
QCoreApplication.postEvent(self, QEvent(QEvent.UpdateRequest))
if self.__state == SignalManager.Running and \
not self.__update_timer.isActive():
self.__update_timer.start()


def can_enable_dynamic(link, value):
Expand Down Expand Up @@ -553,3 +566,27 @@ def group_by_all(sequence, key=None):
order_seen.append(item_key)

return [(key, groups[key]) for key in order_seen]


def unique(iterable):
"""
Return unique elements of `iterable` while preserving their order.

Parameters
----------
iterable : Iterable[Hashable]


Returns
-------
unique : Iterable
Unique elements from `iterable`.
"""
seen = set()

def observed(el):
observed = el in seen
seen.add(el)
return observed

return (el for el in iterable if not observed(el))
Loading