tspy.data_structures.stream_multi_time_series.StreamMultiTimeSeries module

class tspy.data_structures.stream_multi_time_series.StreamMultiTimeSeries.StreamMultiTimeSeries(tsc, j_stream_mts, trs=None)

Bases: object

This is the data-structure to handle multiple time-series in motion where each time-series is denoted by a unique key. Stream-Multi-Time-Series can be thought of as a FIFO queue, having a peek and poll method. A unique quality of Stream-Multi-Time-Series is that it allows for a rich set of segmentation functions which are blocking by nature (will not resolve till a full window is uncovered). Because of this, Stream-Multi-Time-Series has an extremely low memory footprint when executing on large amounts of data as well.

Examples

create a simple queue

>>> import queue
>>> key_observation_queue = queue.Queue()

create a simple stream-multi-time-series from a queue

>>> import tspy
>>> smts = tspy.stream_multi_time_series.queue(key_observation_queue)

create a background thread adding to the queue

>>> from threading import Thread
>>> from time import sleep
>>> def thread_function():
    ...c = 0
    ...while True:
    ...     observation = tspy.observation(c, c)
    ...     key = "a" if c % 2 == 0 else "b"
    ...     c = c + 1
    ...     key_observation_queue.put_nowait((key, observation))
    ...     sleep(1)
>>> thread = Thread(target = thread_function)
>>> thread.start()

continuously get values from the queue

>>> for mts in smts:
...     print(mts)

Methods

add_sink(multi_data_sink)

add a multi-data-sink to this piece of the streaming pipeline.

fillna(interpolator[, null_value])

produce a new stream-multi-time-series which is the result of filling all null values.

filter(func)

produce a new stream-multi-time-series which is the result of filtering by each observation’s value given a filter function.

flatmap(func)

produce a new stream-multi-time-series where each observation’s value in this stream-multi-time-series is mapped to 0 to N new values.

full_join(right_stream_mts[, join_func, …])

join two stream-multi-time-series based on a temporal full join strategy and optionally interpolate missing values

inner_join(right_stream_mts[, join_func])

join two stream-multi-time-series based on a temporal inner join strategy

interval_join(right_stream_mts, filter_func, …)

join two stream-multi-time-series where observations in the right stream lie within an interval of this stream.

left_join(right_stream_mts[, join_func, …])

join two stream-multi-time-series based on a temporal left join strategy and optionally interpolate missing values

left_outer_join(right_stream_mts[, …])

join two stream-multi-time-series based on a temporal left outer join strategy and optionally interpolate missing values

map(func)

produce a new stream-multi-time-series where each observation’s value in this stream-time-series is mapped to a new observation value

map_with_key(func)

produce a new stream-multi-time-series where each observation’s value in this stream-time-series is mapped given a key and value function to a new observation value

partition([num_partitions, …])

partition this stream-multi-time-series such that all partitions can be run on separate threads in parallel

peek()

Optionally get the most recent values in the queue without flushing the queue.

poll([polling_interval])

Get the most recent values in the queue.

resample(periodicity, interpolator)

produce a new stream-muliti-time-series by resampling the current stream-multi-time-series to a given periodicity

right_join(right_stream_mts[, join_func, …])

join two stream-multi-time-series based on a temporal right join strategy and optionally interpolate missing values

right_outer_join(right_stream_mts[, …])

join two stream-multi-time-series based on a temporal right outer join strategy and optionally interpolate missing values

run()

run the streaming pipeline

segment(window[, step])

produce a new segment-multi-time-series from a performing a sliding-based segmentation over each time-series.

segment_by_anchor(func, left_delta, right_delta)

produce a new segment-time-series from performing an anchor-based segmentation over each time-series.

segment_by_time(window[, step])

produce a new segment-time-series from a performing a time-based segmentation over each time-series

to_multi_observation_stream()

Returns

with_trs([granularity, start_time])

create a new stream-multi-time-series with its timestamps mapped based on a granularity and start_time.

add_sink(multi_data_sink)

add a multi-data-sink to this piece of the streaming pipeline.

Parameters
multi_data_sinkMultiDataSink

the multi-data-sink to output this piece of the pipeline to

Returns
:class`tspy.data_structures.StreamMultiTimeSeries.StreamMultiTimeSeries`

a new stream-multi-time-series

Examples

create a stream-multi-time-series from a queue

>>> import queue
>>> import tspy
>>> q = queue.Queue()
>>> mts = tspy.stream_multi_time_series.queue(q)

create a datasink

>>> from tspy.io.MultiDataSink import MultiDataSink
>>> class MySink(MultiDataSink):
    ...def dump(self, observations_dict):
    ...     print(observations_dict)

add the datasink to the time-series

>>> mts_with_sink = mts.add_sink(MySink())
fillna(interpolator, null_value=None)

produce a new stream-multi-time-series which is the result of filling all null values.

Parameters
interpolatorfunc or interpolator

the interpolator method to be used when a value is null

null_valueany, optional

denotes a null value, for instance if nullValue = NaN, NaN would be filled

Returns
StreamMultiTimeSeries

a new stream-multi-time-series

Notes

see fillna() for usage

filter(func)

produce a new stream-multi-time-series which is the result of filtering by each observation’s value given a filter function.

Parameters
funcfunc

the filter on observation’s value function

Returns
StreamMultiTimeSeries

a new stream-multi-time-series

Notes

see filter() for usage

flatmap(func)

produce a new stream-multi-time-series where each observation’s value in this stream-multi-time-series is mapped to 0 to N new values.

Parameters
funcfunc

value mapping function which returns a list of values

Returns
StreamMultiTimeSeries

a new stream-multi-time-series with its values flat-mapped

Notes

see flatmap() for usage

an observations time-tick will be duplicated if a single value maps to multiple values

full_join(right_stream_mts, join_func=None, left_interp_func=<function StreamMultiTimeSeries.<lambda>>, right_interp_func=<function StreamMultiTimeSeries.<lambda>>)

join two stream-multi-time-series based on a temporal full join strategy and optionally interpolate missing values

Parameters
right_stream_mtsStreamMultiTimeSeries

the stream-multi-time-series to align with

join_funcfunc, optional

function to join to values (default is join to list where left is index 0, right is index 1)

left_interp_funcfunc or interpolator, optional

the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

right_interp_funcfunc or interpolator, optional

the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

Returns
StreamMultiTimeSeries

a new stream-multi-time-series

Notes

see full_join() for usage

inner_join(right_stream_mts, join_func=None)

join two stream-multi-time-series based on a temporal inner join strategy

Parameters
right_stream_mtsStreamMultiTimeSeries

the stream-time-series to join with

join_funcfunc, optional

function to join 2 values at a given time-tick. If None given, joined value will be in a list (default is None)

Returns
StreamMultiTimeSeries

a new stream-multi-time-series

Notes

see inner_join() for usage

interval_join(right_stream_mts, filter_func, left_delta, right_delta)

join two stream-multi-time-series where observations in the right stream lie within an interval of this stream. The interval is defined by a point (discovered from the filter_func) with left_delta time-ticks to the left and right_delta time-ticks to the right.

Parameters
right_stream_mtsStreamMultiTimeSeries

the stream-time-series to align with

filter_funcfunc

function which given a value, will return whether to form an interval

left_deltaint

how many time-ticks to the left of the interval-point

right_deltaint

how many time-ticks to the right of the interval-point

Returns
StreamMultiTimeSeries

a new stream-multi-time-series

left_join(right_stream_mts, join_func=None, interp_func=<function StreamMultiTimeSeries.<lambda>>)

join two stream-multi-time-series based on a temporal left join strategy and optionally interpolate missing values

Parameters
right_stream_mtsStreamMultiTimeSeries

the stream-multi-time-series to align with

join_funcfunc, optional

function to join to values (default is join to list where left is index 0, right is index 1)

interp_funcfunc or interpolator, optional

the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

Returns
StreamMultiTimeSeries

a new stream-multi-time-series

Notes

see left_join() for usage

left_outer_join(right_stream_mts, join_func=None, interp_func=<function StreamMultiTimeSeries.<lambda>>)

join two stream-multi-time-series based on a temporal left outer join strategy and optionally interpolate missing values

Parameters
right_stream_mtsStreamMultiTimeSeries

the stream-multi-time-series to align with

join_funcfunc, optional

function to join to values (default is join to list where left is index 0, right is index 1)

interp_funcfunc or interpolator, optional

the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

Returns
StreamMultiTimeSeries

a new stream-multi-time-series

Notes

see left_outer_join() for usage

map(func)

produce a new stream-multi-time-series where each observation’s value in this stream-time-series is mapped to a new observation value

Parameters
funcfunc

value mapping function

Returns
StreamMultiTimeSeries

a new stream-multi-time-series with its values re-mapped

Notes

see map() for usage

map_with_key(func)

produce a new stream-multi-time-series where each observation’s value in this stream-time-series is mapped given a key and value function to a new observation value

Parameters
funcfunc

value mapping function which given a key and value, returns a value

Returns
StreamMultiTimeSeries

a new stream-multi-time-series with its values re-mapped

partition(num_partitions=- 1, partition_polling_interval=1000)

partition this stream-multi-time-series such that all partitions can be run on separate threads in parallel

Parameters
num_partitionsint

number of partitions

partition_polling_intervalint, optional

how often to poll for new values on each partition (default is 1000 ms)

Returns
:class`tspy.data_structures.StreamMultiTimeSeries.StreamMultiTimeSeries`

a new partitioned stream-multi-time-series

peek()

Optionally get the most recent values in the queue without flushing the queue.

Returns
MultiTimeSeries

a multi-time-series containing the most recent resolved observations. If no observations exist, return None

poll(polling_interval=- 1)

Get the most recent values in the queue. If no values exists, this method will block and poll at the rate of the given polling_interval. Once values have been resolved, poll will flush the queue up till the last values time-tick

Parameters
polling_intervalint, optional

how often to poll for values if none exist in the queue (default is 0ms)

Returns
MultiTimeSeries

a multi-time-series containing the most recent resolved observations.

resample(periodicity, interpolator)

produce a new stream-muliti-time-series by resampling the current stream-multi-time-series to a given periodicity

Parameters
periodint

the period to resample to

funcfunc or interpolator

the interpolator method to be used when a value doesn’t exist at a given time-tick

Returns
StreamMultiTimeSeries

a new stream-multi-time-series

Notes

see resample() for usage

right_join(right_stream_mts, join_func=None, interp_func=<function StreamMultiTimeSeries.<lambda>>)

join two stream-multi-time-series based on a temporal right join strategy and optionally interpolate missing values

Parameters
right_stream_mtsStreamMultiTimeSeries

the stream-multi-time-series to align with

join_funcfunc, optional

function to join to values (default is join to list where left is index 0, right is index 1)

interp_funcfunc or interpolator, optional

the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

Returns
StreamMultiTimeSeries

a new stream-multi-time-series

Notes

see right_join() for usage

right_outer_join(right_stream_mts, join_func=None, interp_func=<function StreamMultiTimeSeries.<lambda>>)

join two stream-multi-time-series based on a temporal right outer join strategy and optionally interpolate missing values

Parameters
right_stream_mtsStreamMultiTimeSeries

the stream-multi-time-series to align with

join_funcfunc, optional

function to join to values (default is join to list where left is index 0, right is index 1)

interp_funcfunc or interpolator, optional

the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

Returns
StreamMultiTimeSeries

a new stream-multi-time-series

Notes

see right_outer_join() for usage

run()

run the streaming pipeline

Notes

if the stream is backed by an infinite source, this will never end until the program is explicitly killed

segment(window, step=1)

produce a new segment-multi-time-series from a performing a sliding-based segmentation over each time-series.

Parameters
windowint

number of observations per window

stepint, optional

step size to slide (default is 1)

Returns
SegmentStreamMultiTimeSeries

a new segment-stream-multi-time-series

Notes

see segment() for usage

Segment size is guaranteed to be equal to given window value

segment_by_anchor(func, left_delta, right_delta)

produce a new segment-time-series from performing an anchor-based segmentation over each time-series. An anchor point is defined as any value that satisfies the filter function. When an anchor point is determined the segment is built based on left_delta time ticks to the left of the point and right_delta time ticks to the right of the point.

Parameters
funcfunc

the filter anchor point function

left_deltaint

left delta time ticks to the left of the anchor point

right_deltaint

right delta time ticks to the right of the anchor point

Returns
SegmentStreamMultiTimeSeries

a new segment-stream-multi-time-series

Notes

see segment_by_anchor() for usage

segment_by_time(window, step=None)

produce a new segment-time-series from a performing a time-based segmentation over each time-series

Parameters
windowint

time-tick length of window

stepint

time-tick length of step

Returns
SegmentStreamMultiTimeSeries

a new segment-stream-multi-time-series

Notes

see segment_by_time() for usage

to_multi_observation_stream()
Returns
MultiObservationStream

a stream iterator of (key, observation)

with_trs(granularity=datetime.timedelta(0, 0, 1000), start_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc))

create a new stream-multi-time-series with its timestamps mapped based on a granularity and start_time. In the scope of this method, granularity refers to the granularity at which to see time_ticks and start_time refers to the zone-date-time in which to start your stream-multi-time-series

Parameters
granularitydatetime.timedelta, optional

the granularity for use in stream-multi-time-series TRS (default is 1ms)

start_timedatetime, optional

the starting date-time of the stream-multi-time-series (default is 1970-01-01 UTC)

Returns
StreamMultiTimeSeries

a new stream-multi-time-series with its time_ticks mapped based on a new TRS.

Notes

time_ticks will be mapped as follows - (current_time_tick - start_time) / granularity

if the source stream-multi-time-series does not have a time-reference-system associated with it, this method will throw and exception