eventkit

Release 0.8.5.

Event

class eventkit.event.Event(name='', _with_error_done_events=True)[source]

Enable event passing between loosly coupled components. The event emits values to connected listeners and has a selection of operators to create general data flow pipelines.

Parameters

name (str) – Name to use for this event.

__await__()[source]

Asynchronously await the next emit of an event:

async def coro():
    args = await event
    ...

If the event does an empty emit(), then the value of args is set to util.NO_VALUE.

wait() and __await__() are each other’s inverse.

__aiter__(skip_to_last=False, tuples=False)

Synonym for aiter() with default arguments:

async def coro():
    async for args in event:
        ...

aiterate() and __aiter__() are each other’s inverse.

NO_VALUE = <NoValue>
logger = <Logger eventkit.event (WARNING)>
error_event

Sub event that emits errors from this event as emit(source, exception).

done_event

Sub event that emits when this event is done as emit(source).

name()[source]

This event’s name.

Return type

str

done()[source]

True if event has ended with no more emits coming, False otherwise.

Return type

bool

set_done()[source]

Set this event to be ended. The event should not emit anything after that.

value()[source]

This event’s last emitted value.

connect(listener, error=None, done=None, keep_ref=False)[source]

Connect a listener to this event. If the listener is added multiple times then it is invoked just as many times on emit.

The += operator can be used as a synonym for this method:

import eventkit as ev

def f(a, b):
    print(a * b)

def g(a, b):
    print(a / b)

event = ev.Event()
event += f
event += g
event.emit(10, 5)
Parameters
  • listener – The callback to invoke on emit of this event. It gets the *args from an emit as arguments.

  • error – The callback to invoke on error of this event. It gets (this event, exception) as two arguments.

  • done – The callback to invoke on ending of this event. It gets this event as single argument.

  • keep_ref (bool) –

    • True: A strong reference to the callable is kept

    • False: If the callable allows weak refs and it is garbage collected, then it is automatically disconnected from this event.

disconnect(listener, error=None, done=None)[source]

Disconnect a listener from this event.

The -= operator can be used as a synonym for this method.

Parameters
  • listener – The callback to disconnect. The callback is removed at most once. It is valid if the callback is already not connected.

  • error – The error callback to disconnect.

  • done – The done callback to disconnect.

disconnect_obj(obj)[source]

Disconnect all listeners on the given object. (also the error and done listeners).

Parameters

obj – The target object that is to be completely removed from this event.

emit(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

emit_threadsafe(*args)[source]

Threadsafe version of emit() that doesn’t invoke the listeners directly but via the event loop.

clear()[source]

Disconnect all listeners.

run()[source]

Start the asyncio event loop, run this event to completion and return all values as a list:

import eventkit as ev

ev.Timer(0.25, count=10).run()
->
[0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5]

Note

When running inside a Jupyter notebook this will give an error that the asyncio event loop is already running. This can be remedied by applying nest_asyncio or by using the top-level await statement of Jupyter:

await event.list()
Return type

List[~T]

pipe(*targets)[source]

Form several events into a pipe:

import eventkit as ev

e1 = ev.Sequence('abcde')
e2 = ev.Enumerate().map(lambda i, c: (i, i + ord(c)))
e3 = ev.Star().pluck(1).map(chr)

e1.pipe(e2, e3)     # or: ev.Event.Pipe(e1, e2, e3)
->
['a', 'c', 'e', 'g', 'i']
Parameters

targets (Event) – One or more Events that have no source yet, or Event constructors that needs no arguments.

fork(*targets)[source]

Fork this event into one or more target events. Square brackets can be used as a synonym:

import eventkit as ev

ev.Range(2, 5)[ev.Min, ev.Max, ev.Sum].zip()
->
[(2, 2, 2), (2, 3, 5), (2, 4, 9)]

The events in the fork can be combined by one of the join methods of Fork.

Parameters

targets (Event) – One or more events that have no source yet, or Event constructors that need no arguments.

Return type

Fork

set_source(source)[source]
aiter(skip_to_last=False, tuples=False)[source]

Create an asynchronous iterator that yields the emitted values from this event:

async def coro():
    async for args in event.aiter():
        ...

__aiter__() is a synonym for aiter() with default arguments,

Parameters
  • skip_to_last (bool) –

    • True: Backlogged source values are skipped over to yield only the latest value. Can be used as a slipper clutch between a source that produces too fast and the handling that can’t keep up.

    • False: All events are yielded.

  • tuples (bool) –

    • True: Always yield arguments as a tuple.

    • False: Unpack single argument tuples.

static init(obj, event_names)[source]

Convenience function for initializing multiple events as members of the given object.

Parameters

event_names (Iterable[+T_co]) – Names to use for the created events.

static create(obj)[source]

Create an event from a async iterator, awaitable, or event constructor without arguments.

Parameters

obj – The source object. If it’s already an event then it is passed as-is.

static wait(future)[source]

Create a new event that emits the value of the awaitable when it becomes available and then set this event done.

wait() and __await__() are each other’s inverse.

Parameters

future (Awaitable[+T_co]) – Future to wait on.

Return type

Wait

static aiterate(ait)[source]

Create a new event that emits the yielded values from the asynchronous iterator.

The asynchronous iterator serves as a source for both the time and value of emits.

aiterate() and __aiter__() are each other’s inverse.

Parameters

ait (AsyncIterable[+T_co]) –

The asynchronous source iterator. It must await at least once; If necessary use:

await asyncio.sleep(0)

Return type

Aiterate

static sequence(values, interval=0, times=None)[source]

Create a new event that emits the given values. Supply at most one interval or times.

Parameters
  • value – The source values.

  • interval (float) – Time interval in seconds between values.

  • times (Optional[Iterable[float]]) – Relative times for individual values, in seconds since start of event. The sequence should match values.

Return type

Sequence

static repeat(value=<NoValue>, count=1, interval=0, times=None)[source]

Create a new event that repeats value a number of count times.

Parameters
  • value – The value to emit.

  • count – Number of times to emit.

  • interval (float) – Time interval in seconds between values.

  • times (Optional[Iterable[float]]) – Relative times for individual values, in seconds since start of event. The sequence should match values.

Return type

Repeat

static range(*args, interval=0, times=None)[source]

Create a new event that emits the values from a range.

Parameters
  • args – Same as for built-in range.

  • interval (float) – Time interval in seconds between values.

  • times (Optional[Iterable[float]]) – Relative times for individual values, in seconds since start of event. The sequence should match the range.

Return type

Range

static timerange(start=0, end=None, step=1)[source]

Create a new event that emits the datetime value, at that datetime, from a range of datetimes.

Parameters
  • start

    Start time, can be specified as:

    • datetime.datetime.

    • datetime.time: Today is used as date.

    • int or float: Number of seconds relative to now. Values will be quantized to the given step.

  • end

    End time, can be specified as:

    • datetime.datetime.

    • datetime.time: Today is used as date.

    • None: No end limit.

  • step – Number of seconds, or datetime.timedelta, to space between values.

Return type

Timerange

static timer(interval, count=None)[source]

Create a new timer event that emits at regularly paced intervals the number of seconds since starting it.

Parameters
  • interval (float) – Time interval in seconds between emits.

  • count (Optional[int]) – Number of times to emit, or None for no limit.

Return type

Timer

static marble(s, interval=0, times=None)[source]

Create a new event that emits the values from a Rx-type marble string.

Parameters
  • s (str) – The string with characters that are emitted.

  • interval (float) – Time interval in seconds between values.

  • times (Optional[Iterable[float]]) – Relative times for individual values, in seconds since start of event. The sequence should match the marble string.

Return type

Marble

filter(predicate=<class 'bool'>)[source]

For every source value, apply predicate and re-emit when True.

Parameters

predicate – The function to test every source value with. The default is to test the general truthiness with bool().

Return type

Filter

skip(count=1)[source]

Drop the first count values from source and follow the source after that.

Parameters

count (int) – Number of source values to drop.

Return type

Skip

take(count=1)[source]

Re-emit first count values from the source and then end.

Parameters

count (int) – Number of source values to re-emit.

Return type

Take

takewhile(predicate=<class 'bool'>)[source]

Re-emit values from the source until the predicate becomes False and then end.

Parameters

predicate – The function to test every source value with. The default is to test the general truthiness with bool().

Return type

TakeWhile

dropwhile(predicate=<function Event.<lambda>>)[source]

Drop source values until the predicate becomes False and after that re-emit everything from the source.

Parameters

predicate – The function to test every source value with. The default is to test the inverted general truthiness.

Return type

DropWhile

takeuntil(notifier)[source]

Re-emit values from the source until the notifier emits and then end. If the notifier ends without any emit then keep passing source values.

Parameters

notifier (Event) – Event that signals to end this event.

Return type

TakeUntil

constant(constant)[source]

On emit of the source emit a constant value:

emit(value) -> emit(constant)
Parameters

constant – The constant value to emit.

Return type

Constant

iterate(it)[source]

On emit of the source, emit the next value from an iterator:

emit(a, b, ...) -> emit(next(it))

The time of events follows the source and the values follow the iterator.

Parameters

it – The source iterator to use for generating values. When the iterator is exhausted the event is set to be done.

Return type

Iterate

count(start=0, step=1)[source]

Count and emit the number of source emits:

emit(a, b, ...) -> emit(count)
Parameters
  • start – Start count.

  • step – Add count by this amount for every new source value.

Return type

Count

enumerate(start=0, step=1)[source]

Add a count to every source value:

emit(a, b, ...) -> emit(count, a, b, ...)
Parameters
  • start – Start count.

  • step – Increase by this amount for every new source value.

Return type

Enumerate

timestamp()[source]

Add a timestamp (from time.time()) to every source value:

emit(a, b, ...) -> emit(timestamp, a, b, ...)

The timestamp is the float number in seconds since the midnight Jan 1, 1970 epoch.

Return type

Timestamp

partial(*left_args)[source]

Pad source values with extra arguments on the left:

emit(a, b, ...) -> emit(*left_args, a, b, ...)
Parameters

left_args – Arguments to inject.

Return type

Partial

partial_right(*right_args)[source]

Pad source values with extra arguments on the right:

emit(a, b, ...) -> emit(a, b, ..., *right_args)
Parameters

right_args – Arguments to inject.

Return type

PartialRight

star()[source]

Unpack a source tuple into positional arguments, similar to the star operator:

emit((a, b, ...)) -> emit(a, b, ...)

star() and pack() are each other’s inverse.

Return type

Star

pack()[source]

Pack positional arguments into a tuple:

emit(a, b, ...) -> emit((a, b, ...))

star() and pack() are each other’s inverse.

Return type

Pack

pluck(*selections)[source]

Extract arguments or nested properties from the source values.

Select which argument positions to keep:

emit(a, b, c, d).pluck(1, 2) -> emit(b, c)

Re-order arguments:

emit(a, b, c).pluck(2, 1, 0) -> emit(c, b, a)

To do an empty emit leave selections empty:

emit(a, b).pluck() -> emit()

Select nested properties from positional arguments:

emit(person, account).pluck(
    '1.number', '0.address.street') ->

emit(account.number, person.address.street)

If no value can be extracted then NO_VALUE is emitted in its place.

Parameters

selections (Union[int, str]) – The values to extract.

Return type

Pluck

map(func, timeout=None, ordered=True, task_limit=None)[source]

Apply a sync or async function to source values using positional arguments:

emit(a, b, ...) -> emit(func(a, b, ...))

or if func returns an awaitable then it will be awaited:

emit(a, b, ...) -> emit(await func(a, b, ...))

In case of timeout or other failure, NO_VALUE is emitted.

Parameters
  • func – The function or coroutine constructor to apply.

  • timeout – Timeout in seconds since coroutine is started

  • ordered

    • True: The order of emitted results preserves the order of the source values.

    • False: Results are in order of completion.

  • task_limit – Max number of concurrent tasks, or None for no limit.

timeout, ordered and task_limit apply to async functions only.

Return type

Map

emap(constr, joiner)[source]

Higher-order event map that creates a new Event instance for every source value:

emit(a, b, ...) -> new Event constr(a, b, ...)
Parameters
  • constr – Constructor function for creating a new event. Apart from returning an Event, the constructor may also return an awaitable or an asynchronous iterator, in which case an Event will be created.

  • joiner (AddableJoinOp) – Join operator to combine the emits of nested events.

Return type

Emap

mergemap(constr)[source]

emap() that uses merge() to combine the nested events:

marbles = [
    'A   B    C    D',
    '_1   2  3    4',
    '__K   L     M   N']

ev.Range(3).mergemap(lambda v: ev.Marble(marbles[v]))
->
['A', '1', 'K', 'B', '2', 'L', '3', 'C', 'M', '4', 'D', 'N']
Return type

Mergemap

concatmap(constr)[source]

emap() that uses concat() to combine the nested events:

marbles = [
    'A    B    C    D',
    '_       1    2    3    4',
    '__                  K    L      M   N']

ev.Range(3).concatmap(lambda v: ev.Marble(marbles[v]))
->
['A', 'B', '1', '2', '3', 'K', 'L', 'M', 'N']
Return type

Concatmap

chainmap(constr)[source]

emap() that uses chain() to combine the nested events:

marbles = [
    'A    B    C    D           ',
    '_       1    2    3    4',
    '__                  K    L      M   N']

ev.Range(3).chainmap(lambda v: ev.Marble(marbles[v]))
->
['A', 'B', 'C', 'D', '1', '2', '3', '4', 'K', 'L', 'M', 'N']
Return type

Chainmap

switchmap(constr)[source]

emap() that uses switch() to combine the nested events:

marbles = [
    'A    B    C    D           ',
    '_                 K    L      M   N',
    '__      1    2      3    4'
]
ev.Range(3).switchmap(lambda v: Event.marble(marbles[v]))
->
['A', 'B', '1', '2', 'K', 'L', 'M', 'N'])
Return type

Switchmap

reduce(func, initializer=<NoValue>)[source]

Apply a two-argument reduction function to the previous reduction result and the current value and emit the new reduction result.

Parameters
  • func

    Reduction function:

    emit(args) -> emit(func(prev_args, args))
    

  • initializer

    First argument of first reduction:

    first_result = func(initializer, first_value)
    

    If no initializer is given, then the first result is emitted on the second source emit.

Return type

Reduce

min()[source]

Minimum value.

Return type

Min

max()[source]

Maximum value.

Return type

Max

sum(start=0)[source]

Total sum.

Parameters

start – Value added to total sum.

Return type

Sum

product(start=1)[source]

Total product.

Parameters

start – Initial start value.

Return type

Product

mean()[source]

Total average.

Return type

Mean

any()[source]

Test if predicate holds for at least one source value.

Return type

Any

all()[source]

Test if predicate holds for all source values.

Return type

All

ema(n=None, weight=None)[source]

Exponential moving average.

Parameters
  • n (Optional[int]) – Number of periods.

  • weight (Optional[float]) – Weight of new value.

Give either n or weight. The relation is weight = 2 / (n + 1).

Return type

Ema

previous(count=1)[source]

For every source value, emit the count-th previous value:

source:  -ab---c--d-e-
output:  --a---b--c-d-

Starts emitting on the count + 1-th source emit.

Parameters

count (int) – Number of periods to go back.

Return type

Previous

pairwise()[source]

Emit (previous_source_value, current_source_value) tuples. Starts emitting on the second source emit:

source:  -a----b------c--------d-----
output:  ------(a,b)--(b,c)----(c,d)-
Return type

Pairwise

changes()[source]

Emit only source values that have changed from the previous value.

Return type

Changes

unique(key=None)[source]

Emit only unique values, dropping values that have already been emitted.

Parameters

keyThe callable `’key(value)` is used to group values. The default of None groups values by equality. The resulting group must be hashable.

Return type

Unique

last()[source]

Wait until source has ended and re-emit its last value.

Return type

Last

list()[source]

Collect all source values and emit as list when the source ends.

Return type

List

deque(count=0)[source]

Emit a deque with the last count values from the source (or less in the lead-in phase).

Parameters

count – Number of last periods to use, or 0 to use all.

Return type

Deque

array(count=0)[source]

Emit a numpy array with the last count values from the source (or less in the lead-in phase).

Parameters

count – Number of last periods to use, or 0 to use all.

Return type

Array

chunk(size)[source]

Chunk values up in lists of equal size. The last chunk can be shorter.

Parameters

size (int) – Chunk size.

Return type

Chunk

chunkwith(timer, emit_empty=True)[source]

Emit a chunked list of values when the timer emits.

Parameters
  • timer (Event) – Event to use for timing the chunks.

  • emit_empty (bool) – Emit empty list if no values present since last emit.

Return type

ChunkWith

chain(*sources)[source]

Re-emit from a source until it ends, then move to the next source, Repeat until all sources have ended, ending the chain. Emits from pending sources are queued up:

source 1:  -a----b---c|
source 2:        --2-----3--4|
source 3:  ------------x---------y--|
output:    -a----b---c2--3--4x---y--|
Parameters

sources (Event) – Source events.

Return type

Chain

merge(*sources)[source]

Re-emit everything from the source events:

source 1:  -a----b-------------c------d-|
source 2:     ------1-----2------3--4-|
source 3:      --------x----y--|
output:    -a----b--1--x--2-y--c-3--4-d-|
Parameters

sources – Source events.

Return type

Merge

concat(*sources)[source]

Re-emit everything from one source until it ends and then move to the next source:

source 1:  -a----b-----|
source 2:    --1-----2-----3----4--|
source 3:                 -----------x--y--|
output:    -a----b---------3----4----x--y--|
Parameters

sources – Source events.

Return type

Concat

switch(*sources)[source]

Re-emit everything from one source and move to another source as soon as that other source starts to emit:

source 1:  -a----b---c-----d---|
source 2:        -----------x---y-|
source 3:  ---------1----2----3-----|
output:    -a----b--1----2--x---y---|
Parameters

sources – Source events.

Return type

Switch

zip(*sources)[source]

Zip sources together: The i-th emit has the i-th value from each source as positional arguments. Only emits when each source has emtted its i-th value and ends when any source ends:

source 1:    -a----b------------------c------d---e--f---|
source 2:    --------1-------2-------3---------4-----|
output emit: --------(a,1)---(b,2)----(c,3)----(d,4)-|
Parameters

sources – Source events.

Return type

Zip

ziplatest(*sources, partial=True)[source]

Emit zipped values with the latest value from each of the source events. Emits every time when a source emits:

source 1:   -a-------------------b-------c---|
source 2:   ---------------1--------------------2------|
output emit: (a,NoValue)---(a,1)-(b,1)---(c,1)--(c,2)--|
Parameters
  • sources – Source events.

  • partial (bool) –

    • True: Use NoValue for sources that have not emitted yet.

    • False: Wait until all sources have emitted.

Return type

Ziplatest

delay(delay)[source]

Time-shift all source events by a delay:

source:  -abc-d-e---f---|
output:  ---abc-d-e---f---|

This applies to the source errors and the source done event as well.

Parameters

delay – Time delay of all events (in seconds).

Return type

Delay

timeout(timeout)[source]

When the source doesn’t emit for longer than the timeout period, do an empty emit and set this event as done.

Parameters

timeout – Timeout value.

Return type

Timeout

throttle(maximum, interval, cost_func=None)[source]

Limit number of emits per time without dropping values. Values that come in too fast are queued and re-emitted as soon as allowed by the limits.

A nested status_event emits True when throttling starts and False when throttling ends.

The limit can be dynamically changed with set_limit.

Parameters
  • maximum – Maximum payload per interval.

  • interval – Time interval (in seconds).

  • cost_func – The sum of cost_func(value) for every source value inside the interval that is to remain under the maximum. The default is to count every source value as 1.

Return type

Throttle

debounce(delay, on_first=False)[source]

Filter out values from the source that happen in rapid succession.

Parameters
  • delay – Maximal time difference (in seconds) between successive values before debouncing kicks in.

  • on_first (bool) –

    • True: First value is send immediately and following values in the rapid succession are dropped:

      source: -abcd----efg-
      output: -a-------e---
      
    • False: Last value of a rapid succession is send after the delay and the values before that are dropped:

      source:  -abcd----efg--
      output:   ----d------g-
      

Return type

Debounce

copy()[source]

Create a shallow copy of the source values.

Return type

Copy

deepcopy()[source]

Create a deep copy of the source values.

Return type

Deepcopy

sample(timer)[source]

At the times that the timer emits, sample the value from this event and emit the sample.

Parameters

timer (Event) – Event used to time the samples.

Return type

Sample

errors()[source]

Emit errors from the source.

Return type

Errors

end_on_error()[source]

End on any error from the source.

Return type

EndOnError

Op

class eventkit.ops.op.Op(source=None)[source]

Base functionality for operators.

The Observer pattern is implemented by the following three methods:

on_source(self, *args)
on_source_error(self, source, error)
on_source_done(self, source)

The default handlers will pass along source emits, errors and done events. This makes Op also suitable as an identity operator.

on_source(*args)

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_error(source, error)[source]
on_source_done(source)[source]
set_source(source)[source]

Create

class eventkit.ops.create.Wait(future, name='wait')[source]
class eventkit.ops.create.Aiterate(ait)[source]
class eventkit.ops.create.Sequence(values, interval=0, times=None)[source]
class eventkit.ops.create.Repeat(value, count, interval=0, times=None)[source]
class eventkit.ops.create.Range(*args, interval=0, times=None)[source]
class eventkit.ops.create.Timerange(start=0, end=None, step=1)[source]
class eventkit.ops.create.Timer(interval, count=None)[source]
class eventkit.ops.create.Marble(s, interval=0, times=None)[source]

Select

class eventkit.ops.select.Filter(predicate=<class 'bool'>, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.select.Skip(count=1, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.select.Take(count=1, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.select.TakeWhile(predicate=<class 'bool'>, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.select.DropWhile(predicate=<function DropWhile.<lambda>>, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.select.TakeUntil(notifier, source=None)[source]
on_source_done(source)[source]
class eventkit.ops.select.Changes(source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.select.Unique(key, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.select.Last(source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_done(source)[source]

Transform

class eventkit.ops.transform.Constant(constant, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.Iterate(it, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.Enumerate(start=0, step=1, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.Timestamp(source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.Partial(*left_args, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.PartialRight(*right_args, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.Star(source=None)[source]
on_source(arg)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.Pack(source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.Pluck(*selections, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.Previous(count=1, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.Copy(source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.Deepcopy(source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.transform.Chunk(size, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_done(source)[source]
class eventkit.ops.transform.ChunkWith(timer, emit_empty, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_done(source)[source]
class eventkit.ops.transform.Map(func, timeout=0, ordered=True, task_limit=None, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_done(source)[source]
class eventkit.ops.transform.Emap(constr, joiner, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_done(source)[source]
class eventkit.ops.transform.Mergemap(constr, source=None)[source]
class eventkit.ops.transform.Chainmap(constr, source=None)[source]
class eventkit.ops.transform.Concatmap(constr, source=None)[source]
class eventkit.ops.transform.Switchmap(constr, source=None)[source]

Aggregate

class eventkit.ops.aggregate.Count(start=0, step=1, source=None)[source]
class eventkit.ops.aggregate.Reduce(func, initializer=<NoValue>, source=None)[source]
on_source(arg)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.aggregate.Min(source=None)[source]
class eventkit.ops.aggregate.Max(source=None)[source]
class eventkit.ops.aggregate.Sum(start=0, source=None)[source]
class eventkit.ops.aggregate.Product(start=1, source=None)[source]
class eventkit.ops.aggregate.Mean(source=None)[source]
on_source(arg)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.aggregate.Any(source=None)[source]
class eventkit.ops.aggregate.All(source=None)[source]
class eventkit.ops.aggregate.Ema(n=None, weight=None, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.aggregate.Pairwise(source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.aggregate.List(source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_done(source)[source]
a
e
l
s
u
v
class eventkit.ops.aggregate.Deque(count, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

Combine

class eventkit.ops.combine.Fork[source]
join(joiner)[source]
Return type

Event

concat()[source]
Return type

Concat

merge()[source]
Return type

Merge

switch()[source]
Return type

Switch

zip()[source]
Return type

Zip

ziplatest()[source]
Return type

Ziplatest

chain()[source]
Return type

Chain

class eventkit.ops.combine.JoinOp(source=None)[source]

Base class for join operators that combine the emits from multiple source events.

class eventkit.ops.combine.AddableJoinOp(*sources)[source]

Base class for join operators where new sources, produced by a parent higher-order event, can be added dynamically.

add_source(source)[source]
set_parent(parent)[source]
on_source_done(source)[source]
class eventkit.ops.combine.Merge(*sources)[source]
add_source(source)[source]
class eventkit.ops.combine.Switch(*sources)[source]
add_source(source)[source]
on_source_s(source, *args)[source]
on_source_done(source)[source]
class eventkit.ops.combine.Concat(*sources)[source]
add_source(source)[source]
on_source_done(source)[source]
class eventkit.ops.combine.Chain(*sources)[source]
add_source(source)[source]
on_source_done(source)[source]
class eventkit.ops.combine.Zip(*sources)[source]
on_source_done(source)[source]
class eventkit.ops.combine.Ziplatest(*sources, partial=True)[source]
on_source_done(source)[source]

Timing

class eventkit.ops.timing.Delay(delay, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_error(error)[source]
on_source_done(source)[source]
class eventkit.ops.timing.Timeout(timeout, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_done(source)[source]
class eventkit.ops.timing.Debounce(interval, on_first=False, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_done(source)[source]
class eventkit.ops.timing.Throttle(maximum, interval, cost_func=None, source=None)[source]
status_event

Sub event that emits True when throttling starts and False when throttling ends.

set_limit(maximum, interval)[source]

Dynamically update the maximum per interval limit.

on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_done(source)[source]
class eventkit.ops.timing.Sample(timer, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

on_source_done(source)[source]

Array

class eventkit.ops.array.Array(count, source=None)[source]
on_source(*args)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

min()[source]

Minimum value.

Return type

ArrayMin

max()[source]

Maximum value.

Return type

ArrayMax

sum()[source]

Summation.

Return type

ArraySum

prod()[source]

Product.

Return type

ArrayProd

mean()[source]

Mean value.

Return type

ArrayMean

std()[source]

Sample standard deviation.

Return type

ArrayStd

any()[source]

Test if any array value is true.

Return type

ArrayAny

all()[source]

Test if all array values are true.

Return type

ArrayAll

class eventkit.ops.array.ArrayMin(source=None)[source]
on_source(arg)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.array.ArrayMax(source=None)[source]
on_source(arg)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.array.ArraySum(source=None)[source]
on_source(arg)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.array.ArrayProd(source=None)[source]
on_source(arg)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.array.ArrayMean(source=None)[source]
on_source(arg)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.array.ArrayStd(source=None)[source]
on_source(arg)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.array.ArrayAny(source=None)[source]
on_source(arg)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

class eventkit.ops.array.ArrayAll(source=None)[source]
on_source(arg)[source]

Emit a new value to all connected listeners.

Parameters

args – Argument values to emit to listeners.

Misc

class eventkit.ops.misc.Errors(source=None)[source]
class eventkit.ops.misc.EndOnError(source=None)[source]
on_source_error(error)[source]

Util

eventkit.util.loop = <_UnixSelectorEventLoop running=False closed=False debug=False>

Main-thread event loop.

eventkit.util.timerange(start=0, end=None, step=1)[source]

Iterator that waits periodically until certain time points are reached while yielding those time points.

Parameters
  • start

    Start time, can be specified as:

    • datetime.datetime.

    • datetime.time: Today is used as date.

    • int or float: Number of seconds relative to now. Values will be quantized to the given step.

  • end

    End time, can be specified as:

    • datetime.datetime.

    • datetime.time: Today is used as date.

    • None: No end limit.

  • step (float) – Number of seconds, or datetime.timedelta, to space between values.

Return type

AsyncIterator[datetime]