import asyncio
import logging
import types
import weakref
from typing import (
Any as AnyType, AsyncIterable, Awaitable, Iterable, List, Optional,
Tuple, Union)
from .util import NO_VALUE, get_event_loop, main_event_loop
[docs]
class Event:
"""
Enable event passing between loosely coupled components.
The event emits values to connected listeners and has
a selection of operators to create general data flow pipelines.
Args:
name: Name to use for this event.
"""
__slots__ = (
'error_event', 'done_event', '_name', '_value',
'_slots', '_done', '_source', '__weakref__')
NO_VALUE = NO_VALUE
logger = logging.getLogger(__name__)
error_event: Optional["Event"]
done_event: Optional["Event"]
_name: str
_value: AnyType
_slots: List[List]
_done: bool
_source: Optional["Event"]
def __init__(self, name: str = '', _with_error_done_events: bool = True):
self.error_event = None
"""
Sub event that emits errors from this event as
``emit(source, exception)``.
"""
self.done_event = None
"""
Sub event that emits when this event is done as
``emit(source)``.
"""
if _with_error_done_events:
self.error_event = Event('error', False)
self.done_event = Event('done', False)
self._slots = [] # list of [obj, weakref, func] sublists
self._name = name or self.__class__.__qualname__
self._value = NO_VALUE
self._done = False
self._source = None
[docs]
def name(self) -> str:
"""
This event's name.
"""
return self._name
[docs]
def done(self) -> bool:
"""
``True`` if event has ended with no more emits coming,
``False`` otherwise.
"""
return self._done
[docs]
def set_done(self):
"""
Set this event to be ended. The event should not emit anything
after that.
"""
if not self._done:
self._done = True
self.done_event.emit(self)
[docs]
def value(self):
"""
This event's last emitted value.
"""
v = self._value
return NO_VALUE if v is NO_VALUE else \
v[0] if len(v) == 1 else v if v else NO_VALUE
[docs]
def connect(self, listener, error=None, done=None,
keep_ref: bool = False) -> "Event":
"""
Connect a listener to this event. If the listener is added multiple
times then it is invoked just as many times on emit.
The ``+=`` operator can be used as a synonym for this method::
import eventkit as ev
def f(a, b):
print(a * b)
def g(a, b):
print(a / b)
event = ev.Event()
event += f
event += g
event.emit(10, 5)
Args:
listener: The callback to invoke on emit of this event.
It gets the ``*args`` from an emit as arguments.
If the listener is a coroutine function, or a function that
returns an awaitable, the awaitable is run in the
asyncio event loop.
error: The callback to invoke on error of this event.
It gets (this event, exception) as two arguments.
done: The callback to invoke on ending of this event.
It gets this event as single argument.
keep_ref:
* ``True``: A strong reference to the callable is kept
* ``False``: If the callable allows weak refs and it is
garbage collected, then it is automatically disconnected
from this event.
"""
if isinstance(listener, Op):
# let the operator connect itself to this event
listener.set_source(self)
return self
obj, func = self._split(listener)
if not keep_ref and hasattr(obj, '__weakref__'):
ref = weakref.ref(obj, self._onFinalize)
obj = None
else:
ref = None
slot = [obj, ref, func]
self._slots.append(slot)
if self.done_event and done is not None:
self.done_event.connect(done)
if self.error_event and error is not None:
self.error_event.connect(error)
return self
[docs]
def disconnect(self, listener, error=None, done=None):
"""
Disconnect a listener from this event.
The ``-=`` operator can be used as a synonym for this method.
Args:
listener: The callback to disconnect. The callback is removed at
most once. It is valid if the callback is already
not connected.
error: The error callback to disconnect.
done: The done callback to disconnect.
"""
obj, func = self._split(listener)
for slot in self._slots:
if (slot[0] is obj or slot[1] and slot[1]() is obj) \
and slot[2] is func:
slot[0] = slot[1] = slot[2] = None
break
self._slots = [s for s in self._slots if s != [None, None, None]]
if error is not None:
self.error_event.disconnect(error)
if done is not None:
self.done_event.disconnect(done)
return self
[docs]
def disconnect_obj(self, obj):
"""
Disconnect all listeners on the given object.
(also the error and done listeners).
Args:
obj: The target object that is to be completely removed from
this event.
"""
for slot in self._slots:
if slot[0] is obj or slot[1] and slot[1]() is obj:
slot[0] = slot[1] = slot[2] = None
self._slots = [s for s in self._slots if s != [None, None, None]]
if self.error_event is not None:
self.error_event.disconnect_obj(obj)
if self.done_event is not None:
self.done_event.disconnect_obj(obj)
[docs]
def emit(self, *args):
"""
Emit a new value to all connected listeners.
Args:
args: Argument values to emit to listeners.
"""
self._value = args
for obj, ref, func in self._slots.copy():
try:
if ref:
obj = ref()
result = None
if obj is None:
if func:
result = func(*args)
else:
if func:
result = func(obj, *args)
else:
result = obj(*args)
if result and hasattr(result, '__await__'):
loop = get_event_loop()
asyncio.ensure_future(result, loop=loop)
except Exception as error:
if len(self.error_event):
self.error_event.emit(self, error)
else:
Event.logger.exception(
f'Value {args} caused exception for event {self}')
[docs]
def emit_threadsafe(self, *args):
"""
Threadsafe version of :meth:`emit` that doesn't invoke the
listeners directly but via the event loop of the main thread.
"""
main_event_loop.call_soon_threadsafe(self.emit, *args)
[docs]
def clear(self):
"""
Disconnect all listeners.
"""
for slot in self._slots:
slot[0] = slot[1] = slot[2] = None
self._slots = []
[docs]
def run(self) -> List:
"""
Start the asyncio event loop, run this event to completion and
return all values as a list::
import eventkit as ev
ev.Timer(0.25, count=10).run()
->
[0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5]
.. note::
When running inside a Jupyter notebook this will give an error
that the asyncio event loop is already running. This can be
remedied by applying
`nest_asyncio <https://github.com/erdewit/nest_asyncio>`_
or by using the top-level ``await`` statement of Jupyter::
await event.list()
"""
loop = get_event_loop()
return loop.run_until_complete(self.list())
[docs]
def pipe(self, *targets: "Event"):
"""
Form several events into a pipe::
import eventkit as ev
e1 = ev.Sequence('abcde')
e2 = ev.Enumerate().map(lambda i, c: (i, i + ord(c)))
e3 = ev.Star().pluck(1).map(chr)
e1.pipe(e2, e3) # or: ev.Event.Pipe(e1, e2, e3)
->
['a', 'c', 'e', 'g', 'i']
Args:
targets: One or more Events that have no source yet,
or ``Event`` constructors that needs no arguments.
"""
source = self
for t in targets:
t = Event.create(t)
t.set_source(source)
source = t
return source
[docs]
def fork(self, *targets: "Event") -> "Fork":
"""
Fork this event into one or more target events.
Square brackets can be used as a synonym::
import eventkit as ev
ev.Range(2, 5)[ev.Min, ev.Max, ev.Sum].zip()
->
[(2, 2, 2), (2, 3, 5), (2, 4, 9)]
The events in the fork can be combined by one of the join
methods of ``Fork``.
Args:
targets: One or more events that have no source yet,
or ``Event`` constructors that need no arguments.
"""
fork = Fork()
for t in targets:
t = Event.create(t)
t.set_source(self)
fork.append(t)
return fork
def set_source(self, source):
self._source = source
def _onFinalize(self, ref):
for slot in self._slots:
if slot[1] is ref:
slot[0] = slot[1] = slot[2] = None
self._slots = [s for s in self._slots if s != [None, None, None]]
@staticmethod
def _split(c):
"""
Split given callable in (object, function) tuple.
"""
if isinstance(c, types.FunctionType):
return (None, c)
elif isinstance(c, types.MethodType):
return (c.__self__, c.__func__)
elif isinstance(c, types.BuiltinMethodType):
if type(c.__self__) is type:
# built-in method
return (c.__self__, c)
else:
# built-in function
return (None, c)
elif hasattr(c, '__call__'):
return (c, None)
else:
raise ValueError(f'Invalid callable: {c}')
[docs]
async def aiter(self, skip_to_last: bool = False, tuples: bool = False):
"""
Create an asynchronous iterator that yields the emitted values
from this event::
async def coro():
async for args in event.aiter():
...
:meth:`__aiter__` is a synonym for :meth:`aiter` with
default arguments,
Args:
skip_to_last:
* ``True``: Backlogged source values are skipped over to
yield only the latest value. Can be used as a
slipper clutch between a source that produces too fast
and the handling that can't keep up.
* ``False``: All events are yielded.
tuples:
* ``True``: Always yield arguments as a tuple.
* ``False``: Unpack single argument tuples.
"""
def on_event(*args):
if skip_to_last:
while q.qsize():
q.get_nowait()
q.put_nowait(('', args))
def on_error(source, error):
q.put_nowait(('ERROR', error))
def on_done(source):
q.put_nowait(('DONE', None))
if self.done():
return
q: asyncio.Queue[Tuple[str, AnyType]] = asyncio.Queue()
self.connect(on_event, on_error, on_done)
try:
while True:
what, args = await q.get()
if not what:
yield args if tuples else args[0] if len(args) == 1 \
else args if args else NO_VALUE
elif what == 'ERROR':
raise args
else:
break
finally:
self.disconnect(on_event, on_error, on_done)
__iadd__ = connect
__isub__ = disconnect
__call__ = emit
__or__ = pipe
def __repr__(self):
return f'Event<{self.name()}, {self._slots}>'
def __len__(self):
return len(self._slots)
def __bool__(self):
return True
def __getitem__(self, fork_targets) -> "Fork":
if not hasattr(fork_targets, '__iter__'):
fork_targets = (fork_targets,)
return self.fork(*fork_targets)
[docs]
def __await__(self):
"""
Asynchronously await the next emit of an event::
async def coro():
args = await event
...
If the event does an empty ``emit()``, then the value
of ``args`` is set to ``util.NO_VALUE``.
:meth:`wait` and :meth:`__await__` are each other's inverse.
"""
def on_event(*args):
if not fut.done():
fut.set_result(
args[0] if len(args) == 1 else args if args else NO_VALUE)
def on_error(source, error):
if not fut.done():
fut.set_exception(error)
def on_future_done(f):
self.disconnect(on_event, on_error)
if self.done():
raise ValueError('Event already done')
fut = asyncio.Future()
self.connect(on_event, on_error)
fut.add_done_callback(on_future_done)
return fut.__await__()
__aiter__ = aiter
"""
Synonym for :meth:`aiter` with default arguments::
async def coro():
async for args in event:
...
:meth:`aiterate` and :meth:`__aiter__` are each other's inverse.
"""
def __contains__(self, c):
"""
See if callable is already connected.
"""
obj, func = self._split(c)
return any(
(s[0] is obj or s[1] and s[1]() is obj) and s[2] is func
for s in self._slots)
def __reduce__(self):
"""
Don't pickle slots.
"""
with_error_done_event = (
self.error_event is not None or self.done_event is not None)
return self.__class__, (self._name, with_error_done_event)
[docs]
@staticmethod
def init(obj, event_names: Iterable):
"""
Convenience function for initializing multiple events as members
of the given object.
Args:
event_names: Names to use for the created events.
"""
for name in event_names:
setattr(obj, name, Event(name))
# dot access to constructors
[docs]
@staticmethod
def create(obj):
"""
Create an event from a async iterator, awaitable, or event
constructor without arguments.
Args:
obj: The source object. If it's already an event then it
is passed as-is.
"""
if isinstance(obj, Event):
return obj
if hasattr(obj, '__call__'):
obj = obj()
if isinstance(obj, Event):
return obj
elif hasattr(obj, '__aiter__'):
return Event.aiterate(obj)
elif hasattr(obj, '__await__'):
return Event.wait(obj)
else:
raise ValueError(f'Invalid type: {obj}')
[docs]
@staticmethod
def wait(future: Awaitable) -> "Wait":
"""
Create a new event that emits the value of the
awaitable when it becomes available and then set this event done.
:meth:`wait` and :meth:`__await__` are each other's inverse.
Args:
future: Future to wait on.
"""
return Wait(future)
[docs]
@staticmethod
def aiterate(ait: AsyncIterable) -> "Aiterate":
"""
Create a new event that emits the yielded values from the
asynchronous iterator.
The asynchronous iterator serves as a source for both the time
and value of emits.
:meth:`aiterate` and :meth:`__aiter__` are each other's inverse.
Args:
ait: The asynchronous source iterator. It must ``await``
at least once; If necessary use::
await asyncio.sleep(0)
"""
return Aiterate(ait)
[docs]
@staticmethod
def sequence(
values: Iterable, interval: float = 0,
times: Union[Iterable[float], None] = None) -> "Sequence":
"""
Create a new event that emits the given values.
Supply at most one ``interval`` or ``times``.
Args:
values: The source values.
interval: Time interval in seconds between values.
times: Relative times for individual values, in seconds since
start of event. The sequence should match ``values``.
"""
return Sequence(values, interval, times)
[docs]
@staticmethod
def repeat(
value=NO_VALUE, count=1, interval: float = 0,
times: Union[Iterable[float], None] = None) -> "Repeat":
"""
Create a new event that repeats ``value`` a number of ``count`` times.
Args:
value: The value to emit.
count: Number of times to emit.
interval: Time interval in seconds between values.
times: Relative times for individual values, in seconds since
start of event. The sequence should match ``values``.
"""
return Repeat(interval, value, count, times)
[docs]
@staticmethod
def range(
*args, interval: float = 0,
times: Union[Iterable[float], None] = None) -> "Range":
"""
Create a new event that emits the values from a range.
Args:
args: Same as for built-in ``range``.
interval: Time interval in seconds between values.
times: Relative times for individual values, in seconds since
start of event. The sequence should match the range.
"""
return Range(*args, interval=interval, times=times)
[docs]
@staticmethod
def timerange(start=0, end=None, step=1) -> "Timerange":
"""
Create a new event that emits the datetime value, at that datetime,
from a range of datetimes.
Args:
start: Start time, can be specified as:
* ``datetime.datetime``.
* ``datetime.time``: Today is used as date.
* ``int`` or ``float``: Number of seconds relative to now.
Values will be quantized to the given step.
end: End time, can be specified as:
* ``datetime.datetime``.
* ``datetime.time``: Today is used as date.
* ``None``: No end limit.
step: Number of seconds, or ``datetime.timedelta``,
to space between values.
"""
return Timerange(start, end, step)
[docs]
@staticmethod
def timer(interval: float, count: Union[int, None] = None) -> "Timer":
"""
Create a new timer event that emits at regularly paced intervals
the number of seconds since starting it.
Args:
interval: Time interval in seconds between emits.
count: Number of times to emit, or ``None`` for no limit.
"""
return Timer(interval, count)
[docs]
@staticmethod
def marble(
s: str, interval: float = 0,
times: Union[Iterable[float], None] = None) -> "Marble":
"""
Create a new event that emits the values from a Rx-type marble string.
Args:
s: The string with characters that are emitted.
interval: Time interval in seconds between values.
times: Relative times for individual values, in seconds since
start of event. The sequence should match the marble string.
"""
return Marble(s, interval, times)
# dot access to operators
[docs]
def filter(self, predicate=bool) -> "Filter":
"""
For every source value, apply predicate and re-emit when True.
Args:
predicate: The function to test every source value with.
The default is to test the general truthiness with ``bool()``.
"""
return Filter(predicate, self)
[docs]
def skip(self, count: int = 1) -> "Skip":
"""
Drop the first ``count`` values from source and follow the source
after that.
Args:
count: Number of source values to drop.
"""
return Skip(count, self)
[docs]
def take(self, count: int = 1) -> "Take":
"""
Re-emit first ``count`` values from the source and then end.
Args:
count: Number of source values to re-emit.
"""
return Take(count, self)
[docs]
def takewhile(self, predicate=bool) -> "TakeWhile":
"""
Re-emit values from the source until the predicate becomes False
and then end.
Args:
predicate: The function to test every source value with.
The default is to test the general truthiness with ``bool()``.
"""
return TakeWhile(predicate, self)
[docs]
def dropwhile(self, predicate=lambda x: not x) -> "DropWhile":
"""
Drop source values until the predicate becomes False and after that
re-emit everything from the source.
Args:
predicate: The function to test every source value with.
The default is to test the inverted general truthiness.
"""
return DropWhile(predicate, self)
[docs]
def takeuntil(self, notifier: "Event") -> "TakeUntil":
"""
Re-emit values from the source until the ``notifier`` emits
and then end. If the notifier ends without any emit then
keep passing source values.
Args:
notifier: Event that signals to end this event.
"""
return TakeUntil(notifier, self)
[docs]
def constant(self, constant) -> "Constant":
"""
On emit of the source emit a constant value::
emit(value) -> emit(constant)
Args:
constant: The constant value to emit.
"""
return Constant(constant, self)
[docs]
def iterate(self, it) -> "Iterate":
"""
On emit of the source, emit the next value from an iterator::
emit(a, b, ...) -> emit(next(it))
The time of events follows the source and the values follow
the iterator.
Args:
it: The source iterator to use for generating values. When the
iterator is exhausted the event is set to be done.
"""
return Iterate(it, self)
[docs]
def count(self, start=0, step=1) -> "Count":
"""
Count and emit the number of source emits::
emit(a, b, ...) -> emit(count)
Args:
start: Start count.
step: Add count by this amount for every new source value.
"""
return Count(start, step, self)
[docs]
def enumerate(self, start=0, step=1) -> "Enumerate":
"""
Add a count to every source value::
emit(a, b, ...) -> emit(count, a, b, ...)
Args:
start: Start count.
step: Increase by this amount for every new source value.
"""
return Enumerate(start, step, self)
[docs]
def timestamp(self) -> "Timestamp":
"""
Add a timestamp (from time.time()) to every source value::
emit(a, b, ...) -> emit(timestamp, a, b, ...)
The timestamp is the float number in seconds since the
midnight Jan 1, 1970 epoch.
"""
return Timestamp(self)
[docs]
def partial(self, *left_args) -> "Partial":
"""
Pad source values with extra arguments on the left::
emit(a, b, ...) -> emit(*left_args, a, b, ...)
Args:
left_args: Arguments to inject.
"""
return Partial(*left_args, source=self)
[docs]
def partial_right(self, *right_args) -> "PartialRight":
"""
Pad source values with extra arguments on the right::
emit(a, b, ...) -> emit(a, b, ..., *right_args)
Args:
right_args: Arguments to inject.
"""
return PartialRight(*right_args, source=self)
[docs]
def star(self) -> "Star":
"""
Unpack a source tuple into positional arguments, similar to the
star operator::
emit((a, b, ...)) -> emit(a, b, ...)
:meth:`star` and :meth:`pack` are each other's inverse.
"""
return Star(self)
[docs]
def pack(self) -> "Pack":
"""
Pack positional arguments into a tuple::
emit(a, b, ...) -> emit((a, b, ...))
:meth:`star` and :meth:`pack` are each other's inverse.
"""
return Pack(self)
[docs]
def pluck(self, *selections: Union[int, str]) -> "Pluck":
"""
Extract arguments or nested properties from the source values.
Select which argument positions to keep::
emit(a, b, c, d).pluck(1, 2) -> emit(b, c)
Re-order arguments::
emit(a, b, c).pluck(2, 1, 0) -> emit(c, b, a)
To do an empty emit leave ``selections`` empty::
emit(a, b).pluck() -> emit()
Select nested properties from positional arguments::
emit(person, account).pluck(
'1.number', '0.address.street') ->
emit(account.number, person.address.street)
If no value can be extracted then ``NO_VALUE`` is emitted in its place.
Args:
selections: The values to extract.
"""
return Pluck(*selections, source=self)
[docs]
def map(
self, func, timeout=None, ordered=True,
task_limit=None) -> "Map":
"""
Apply a sync or async function to source values using
positional arguments::
emit(a, b, ...) -> emit(func(a, b, ...))
or if ``func`` returns an awaitable then it will be awaited::
emit(a, b, ...) -> emit(await func(a, b, ...))
In case of timeout or other failure, ``NO_VALUE`` is emitted.
Args:
func: The function or coroutine constructor to apply.
timeout: Timeout in seconds since coroutine is started
ordered:
* ``True``: The order of emitted results preserves the
order of the source values.
* ``False``: Results are in order of completion.
task_limit: Max number of concurrent tasks, or None for no limit.
``timeout``, ``ordered`` and ``task_limit`` apply to
async functions only.
"""
return Map(func, timeout, ordered, task_limit, self)
[docs]
def emap(self, constr, joiner: "AddableJoinOp") -> "Emap":
"""
Higher-order event map that creates a new ``Event`` instance
for every source value::
emit(a, b, ...) -> new Event constr(a, b, ...)
Args:
constr: Constructor function for creating a new event.
Apart from returning an ``Event``, the constructor may also
return an awaitable or an asynchronous iterator, in which
case an ``Event`` will be created.
joiner: Join operator to combine the emits of nested events.
"""
return Emap(constr, joiner, self)
[docs]
def mergemap(self, constr) -> "Mergemap":
"""
:meth:`emap` that uses :meth:`merge` to combine the nested events::
marbles = [
'A B C D',
'_1 2 3 4',
'__K L M N']
ev.Range(3).mergemap(lambda v: ev.Marble(marbles[v]))
->
['A', '1', 'K', 'B', '2', 'L', '3', 'C', 'M', '4', 'D', 'N']
"""
return Mergemap(constr, self)
[docs]
def concatmap(self, constr) -> "Concatmap":
"""
:meth:`emap` that uses :meth:`concat` to combine the nested events::
marbles = [
'A B C D',
'_ 1 2 3 4',
'__ K L M N']
ev.Range(3).concatmap(lambda v: ev.Marble(marbles[v]))
->
['A', 'B', '1', '2', '3', 'K', 'L', 'M', 'N']
"""
return Concatmap(constr, self)
[docs]
def chainmap(self, constr) -> "Chainmap":
"""
:meth:`emap` that uses :meth:`chain` to combine the nested events::
marbles = [
'A B C D ',
'_ 1 2 3 4',
'__ K L M N']
ev.Range(3).chainmap(lambda v: ev.Marble(marbles[v]))
->
['A', 'B', 'C', 'D', '1', '2', '3', '4', 'K', 'L', 'M', 'N']
"""
return Chainmap(constr, self)
[docs]
def switchmap(self, constr) -> "Switchmap":
"""
:meth:`emap` that uses :meth:`switch` to combine the nested events::
marbles = [
'A B C D ',
'_ K L M N',
'__ 1 2 3 4'
]
ev.Range(3).switchmap(lambda v: Event.marble(marbles[v]))
->
['A', 'B', '1', '2', 'K', 'L', 'M', 'N'])
"""
return Switchmap(constr, self)
[docs]
def reduce(self, func, initializer=NO_VALUE) -> "Reduce":
"""
Apply a two-argument reduction function to the previous reduction
result and the current value and emit the new reduction result.
Args:
func: Reduction function::
emit(args) -> emit(func(prev_args, args))
initializer: First argument of first reduction::
first_result = func(initializer, first_value)
If no initializer is given, then the first result is
emitted on the second source emit.
"""
return Reduce(func, initializer, self)
[docs]
def min(self) -> "Min":
"""
Minimum value.
"""
return Min(self)
[docs]
def max(self) -> "Max":
"""
Maximum value.
"""
return Max(self)
[docs]
def sum(self, start=0) -> "Sum":
"""
Total sum.
Args:
start: Value added to total sum.
"""
return Sum(start, self)
[docs]
def product(self, start=1) -> "Product":
"""
Total product.
Args:
start: Initial start value.
"""
return Product(start, self)
[docs]
def mean(self) -> "Mean":
"""
Total average.
"""
return Mean(self)
[docs]
def any(self) -> "Any":
"""
Test if predicate holds for at least one source value.
"""
return Any(self)
[docs]
def all(self) -> "All":
"""
Test if predicate holds for all source values.
"""
return All(self)
[docs]
def ema(self, n: Union[int, None] = None,
weight: Union[float, None] = None) -> "Ema":
"""
Exponential moving average.
Args:
n: Number of periods.
weight: Weight of new value.
Give either ``n`` or ``weight``.
The relation is ``weight = 2 / (n + 1)``.
"""
return Ema(n, weight, self)
[docs]
def previous(self, count: int = 1) -> "Previous":
"""
For every source value, emit the ``count``-th previous value::
source: -ab---c--d-e-
output: --a---b--c-d-
Starts emitting on the ``count + 1``-th source emit.
Args:
count: Number of periods to go back.
"""
return Previous(count, self)
[docs]
def pairwise(self) -> "Pairwise":
"""
Emit ``(previous_source_value, current_source_value)`` tuples.
Starts emitting on the second source emit::
source: -a----b------c--------d-----
output: ------(a,b)--(b,c)----(c,d)-
"""
return Pairwise(self)
[docs]
def changes(self) -> "Changes":
"""
Emit only source values that have changed from the previous value.
"""
return Changes(self)
[docs]
def unique(self, key=None) -> "Unique":
"""
Emit only unique values, dropping values that have already
been emitted.
Args:
key: `The callable `'key(value)`` is used to group values.
The default of ``None`` groups values by equality.
The resulting group must be hashable.
"""
return Unique(key, self)
[docs]
def last(self) -> "Last":
"""
Wait until source has ended and re-emit its last value.
"""
return Last(self)
[docs]
def list(self) -> "ListOp":
"""
Collect all source values and emit as list when the source ends.
"""
return ListOp(self)
[docs]
def deque(self, count=0) -> "Deque":
"""
Emit a ``deque`` with the last ``count`` values from the source
(or less in the lead-in phase).
Args:
count: Number of last periods to use, or 0 to use all.
"""
return Deque(count, self)
[docs]
def array(self, count=0) -> "Array":
"""
Emit a numpy array with the last ``count`` values from the source
(or less in the lead-in phase).
Args:
count: Number of last periods to use, or 0 to use all.
"""
return Array(count, self)
[docs]
def chunk(self, size: int) -> "Chunk":
"""
Chunk values up in lists of equal size. The last chunk can be shorter.
Args:
size: Chunk size.
"""
return Chunk(size, self)
[docs]
def chunkwith(
self, timer: "Event", emit_empty: bool = True) -> "ChunkWith":
"""
Emit a chunked list of values when the timer emits.
Args:
timer: Event to use for timing the chunks.
emit_empty: Emit empty list if no values present since last emit.
"""
return ChunkWith(timer, emit_empty, self)
[docs]
def chain(self, *sources: "Event") -> "Chain":
"""
Re-emit from a source until it ends, then move to the next source,
Repeat until all sources have ended, ending the chain.
Emits from pending sources are queued up::
source 1: -a----b---c|
source 2: --2-----3--4|
source 3: ------------x---------y--|
output: -a----b---c2--3--4x---y--|
Args:
sources: Source events.
"""
return Chain(self, *sources)
[docs]
def merge(self, *sources) -> "Merge":
"""
Re-emit everything from the source events::
source 1: -a----b-------------c------d-|
source 2: ------1-----2------3--4-|
source 3: --------x----y--|
output: -a----b--1--x--2-y--c-3--4-d-|
Args:
sources: Source events.
"""
return Merge(self, *sources)
[docs]
def concat(self, *sources) -> "Concat":
"""
Re-emit everything from one source until it ends and then move
to the next source::
source 1: -a----b-----|
source 2: --1-----2-----3----4--|
source 3: -----------x--y--|
output: -a----b---------3----4----x--y--|
Args:
sources: Source events.
"""
return Concat(self, *sources)
[docs]
def switch(self, *sources) -> "Switch":
"""
Re-emit everything from one source and move to another source as soon
as that other source starts to emit::
source 1: -a----b---c-----d---|
source 2: -----------x---y-|
source 3: ---------1----2----3-----|
output: -a----b--1----2--x---y---|
Args:
sources: Source events.
"""
return Switch(self, *sources)
[docs]
def zip(self, *sources) -> "Zip":
"""
Zip sources together: The i-th emit has the i-th value from
each source as positional arguments. Only emits when each source has
emtted its i-th value and ends when any source ends::
source 1: -a----b------------------c------d---e--f---|
source 2: --------1-------2-------3---------4-----|
output emit: --------(a,1)---(b,2)----(c,3)----(d,4)-|
Args:
sources: Source events.
"""
return Zip(self, *sources)
[docs]
def ziplatest(self, *sources, partial: bool = True) -> "Ziplatest":
"""
Emit zipped values with the latest value from each of the
source events. Emits every time when a source emits::
source 1: -a-------------------b-------c---|
source 2: ---------------1--------------------2------|
output emit: (a,NoValue)---(a,1)-(b,1)---(c,1)--(c,2)--|
Args:
sources: Source events.
partial:
* True: Use ``NoValue`` for sources that have not emitted yet.
* False: Wait until all sources have emitted.
"""
return Ziplatest(self, *sources, partial=partial)
[docs]
def delay(self, delay) -> "Delay":
"""
Time-shift all source events by a delay::
source: -abc-d-e---f---|
output: ---abc-d-e---f---|
This applies to the source errors and the source done event as well.
Args:
delay: Time delay of all events (in seconds).
"""
return Delay(delay, self)
[docs]
def timeout(self, timeout) -> "Timeout":
"""
When the source doesn't emit for longer than the timeout period,
do an empty emit and set this event as done.
Args:
timeout: Timeout value.
"""
return Timeout(timeout, self)
[docs]
def throttle(
self, maximum, interval, cost_func=None) -> "Throttle":
"""
Limit number of emits per time without dropping values.
Values that come in too fast are queued and re-emitted as soon
as allowed by the limits.
A nested ``status_event`` emits ``True`` when throttling starts
and ``False`` when throttling ends.
The limit can be dynamically changed with ``set_limit``.
Args:
maximum: Maximum payload per interval.
interval: Time interval (in seconds).
cost_func: The sum of ``cost_func(value)`` for every
source value inside the ``interval`` that is to remain
under the ``maximum``. The default is to count every
source value as 1.
"""
return Throttle(maximum, interval, cost_func, self)
[docs]
def debounce(self, delay, on_first: bool = False) -> "Debounce":
"""
Filter out values from the source that happen in rapid succession.
Args:
delay: Maximal time difference (in seconds) between
successive values before debouncing kicks in.
on_first:
* True: First value is send immediately and following values
in the rapid succession are dropped::
source: -abcd----efg-
output: -a-------e---
* False: Last value of a rapid succession is send after
the delay and the values before that are dropped::
source: -abcd----efg--
output: ----d------g-
"""
return Debounce(delay, on_first, self)
[docs]
def copy(self) -> "Copy":
"""
Create a shallow copy of the source values.
"""
return Copy(self)
[docs]
def deepcopy(self) -> "Deepcopy":
"""
Create a deep copy of the source values.
"""
return Deepcopy(self)
[docs]
def sample(self, timer: "Event") -> "Sample":
"""
At the times that the timer emits, sample the value from this
event and emit the sample.
Args:
timer: Event used to time the samples.
"""
return Sample(timer, self)
[docs]
def errors(self) -> "Errors":
"""
Emit errors from the source.
"""
return Errors(self)
[docs]
def end_on_error(self) -> "EndOnError":
"""
End on any error from the source.
"""
return EndOnError(self)
from .ops.aggregate import (
All, Any, Count, Deque, Ema, List as ListOp, Max, Mean, Min, Pairwise,
Product, Reduce, Sum)
from .ops.array import (
Array, ArrayAll, ArrayAny, ArrayMax, ArrayMean, ArrayMin, ArrayProd,
ArrayStd, ArraySum)
from .ops.combine import (
AddableJoinOp, Chain, Concat, Fork, Merge, Switch, Zip, Ziplatest)
from .ops.create import (
Aiterate, Marble, Range, Repeat, Sequence, Timer, Timerange, Wait)
from .ops.misc import EndOnError, Errors
from .ops.op import Op
from .ops.select import (
Changes, DropWhile, Filter, Last, Skip, Take, TakeUntil, TakeWhile, Unique)
from .ops.timing import (
Debounce, Delay, Sample, Throttle, Timeout)
from .ops.transform import (
Chainmap, Chunk, ChunkWith, Concatmap, Constant, Copy, Deepcopy, Emap,
Enumerate, Iterate, Map, Mergemap, Pack, Partial, PartialRight, Pluck,
Previous, Star, Switchmap, Timestamp)