tspy.data_structures.stream_time_series.StreamTimeSeries module

class tspy.data_structures.stream_time_series.StreamTimeSeries.StreamTimeSeries(tsc, j_stream_time_series, trs=None)

Bases: object

This is the data-structure to handle time-series in motion. Stream-Time-Series can be thought of as a FIFO queue, having a peek and poll method. A unique quality of Stream-Time-Series is that it allows for a rich set of segmentation functions which are blocking by nature (will not resolve till a full window is uncovered). Because of this, Stream-Time-Series has an extremely low memory footprint when executing on large amounts of data as well.

Examples

create a simple queue

>>> import queue
>>> observation_queue = queue.Queue()

create a simple stream-time-series from a queue

>>> import tspy
>>> sts = tspy.stream_time_series.queue(observation_queue)

create a background thread adding to the queue

>>> from threading import Thread
>>> from time import sleep
>>> def thread_function():
    ...c = 0
    ...while True:
    ...     observation = tspy.observation(c, c)
    ...     c = c + 1
    ...     q.put_nowait(observation)
    ...     sleep(1)
>>> thread = Thread(target = thread_function)
>>> thread.start()

continuously get values from the queue

>>> for ts in sts:
...     print(ts)
Attributes
trs: TRS

a time-reference-system TRS

Methods

add_sink(data_sink)

add a data-sink to this piece of the streaming pipeline.

fillna(interpolator[, null_value])

produce a new stream-time-series which is the result of filling all null values.

filter(func)

produce a new stream-time-series which is the result of filtering by each observation’s value given a filter function.

flatmap(func)

produce a new stream-time-series where each observation’s value in this stream-time-series is mapped to 0 to N new values.

full_join(right_time_series[, join_func, …])

join two stream-time-series based on a temporal full join strategy and optionally interpolate missing values

inner_join(right_time_series[, join_func])

join two stream-time-series based on a temporal inner join strategy

interval_join(right_stream_ts, filter_func, …)

join two stream-time-series where observations in the right stream lie within an interval of this stream.

left_join(right_time_series[, join_func, …])

join two stream-time-series based on a temporal left join strategy and optionally interpolate missing values

left_outer_join(right_time_series[, …])

join two stream-time-series based on a temporal left outer join strategy and optionally interpolate missing values

map(func)

produce a new stream-time-series where each observation’s value in this stream-time-series is mapped to a new observation value

peek()

Optionally get the most recent values in the queue without flushing the queue.

poll([polling_interval])

Get the most recent values in the queue.

resample(periodicity, func)

produce a new stream-time-series by resampling the current stream-time-series to a given periodicity

right_join(right_time_series[, join_func, …])

join two stream-time-series based on a temporal right join strategy and optionally interpolate missing values

right_outer_join(right_time_series[, …])

join two stream-time-series based on a temporal right outer join strategy and optionally interpolate missing values

run()

run the streaming pipeline

segment(window[, step])

produce a new segment-time-series from a performing a sliding-based segmentation over the time-series

segment_by_anchor(anchor_op, left_delta, …)

produce a new segment-time-series from performing an anchor-based segmentation over the time-series.

segment_by_time(window[, step])

produce a new segment-time-series from a performing a time-based segmentation over the time-series

to_observation_stream()

Returns

with_trs([granularity, start_time])

create a new stream-time-series with its timestamps mapped based on a granularity and start_time.

add_sink(data_sink)

add a data-sink to this piece of the streaming pipeline.

Parameters
data_sinkDataSink

the data-sink to output this piece of the pipeline to

Returns
:class`tspy.data_structures.StreamTimeSeries.StreamTimeSeries`

a new stream-time-series

Examples

create a stream-time-series from a queue

>>> import queue
>>> import tspy
>>> q = queue.Queue()
>>> ts = tspy.stream_time_series.queue(q)

create a datasink

>>> from tspy.io.DataSink import DataSink
>>> class MySink(DataSink):
    ...def dump(self, observations):
    ...     print(observations)

add the datasink to the time-series

>>> ts_with_sink = ts.add_sink(MySink())
fillna(interpolator, null_value=None)

produce a new stream-time-series which is the result of filling all null values.

Parameters
interpolatorfunc or interpolator

the interpolator method to be used when a value is null

null_valueany, optional

denotes a null value, for instance if nullValue = NaN, NaN would be filled

Returns
StreamTimeSeries

a new stream-time-series

Notes

see fillna() for usage

filter(func)

produce a new stream-time-series which is the result of filtering by each observation’s value given a filter function.

Parameters
funcfunc

the filter on observation’s value function

Returns
StreamTimeSeries

a new stream-time-series

Notes

see filter() for usage

flatmap(func)

produce a new stream-time-series where each observation’s value in this stream-time-series is mapped to 0 to N new values.

Parameters
funcfunc

value mapping function which returns a list of values

Returns
StreamTimeSeries

a new stream-time-series with its values flat-mapped

Notes

see flatmap() for usage

an observations time-tick will be duplicated if a single value maps to multiple values

full_join(right_time_series, join_func=None, left_interp_func=<function StreamTimeSeries.<lambda>>, right_interp_func=<function StreamTimeSeries.<lambda>>)

join two stream-time-series based on a temporal full join strategy and optionally interpolate missing values

Parameters
right_time_seriesStreamTimeSeries

the stream-time-series to align with

join_funcfunc, optional

function to join to values (default is join to list where left is index 0, right is index 1)

left_interp_funcfunc or interpolator, optional

the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

right_interp_funcfunc or interpolator, optional

the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

Returns
StreamTimeSeries

a new stream-time-series

Notes

see full_join() for usage

inner_join(right_time_series, join_func=None)

join two stream-time-series based on a temporal inner join strategy

Parameters
right_time_seriesStreamTimeSeries

the stream-time-series to join with

join_funcfunc, optional

function to join 2 values at a given time-tick. If None given, joined value will be in a list (default is None)

Returns
StreamTimeSeries

a new stream-time-series

Notes

see inner_join() for usage

interval_join(right_stream_ts, filter_func, left_delta, right_delta)

join two stream-time-series where observations in the right stream lie within an interval of this stream. The interval is defined by a point (discovered from the filter_func) with left_delta time-ticks to the left and right_delta time-ticks to the right.

Parameters
right_stream_tsStreamTimeSeries

the stream-time-series to align with

filter_funcfunc

function which given a value, will return whether to form an interval

left_deltaint

how many time-ticks to the left of the interval-point

right_deltaint

how many time-ticks to the right of the interval-point

Returns
StreamTimeSeries

a new stream-time-series

left_join(right_time_series, join_func=None, interp_func=<function StreamTimeSeries.<lambda>>)

join two stream-time-series based on a temporal left join strategy and optionally interpolate missing values

Parameters
right_time_seriesStreamTimeSeries

the stream-time-series to align with

join_funcfunc, optional

function to join to values (default is join to list where left is index 0, right is index 1)

interp_funcfunc or interpolator, optional

the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

Returns
StreamTimeSeries

a new stream-time-series

Notes

see left_join() for usage

left_outer_join(right_time_series, join_func=None, interp_func=<function StreamTimeSeries.<lambda>>)

join two stream-time-series based on a temporal left outer join strategy and optionally interpolate missing values

Parameters
right_time_seriesStreamTimeSeries

the stream-time-series to align with

join_funcfunc, optional

function to join to values (default is join to list where left is index 0, right is index 1)

interp_funcfunc or interpolator, optional

the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

Returns
StreamTimeSeries

a new stream-time-series

Notes

see left_outer_join() for usage

map(func)

produce a new stream-time-series where each observation’s value in this stream-time-series is mapped to a new observation value

Parameters
funcfunc

value mapping function

Returns
StreamTimeSeries

a new stream-time-series with its values re-mapped

Notes

see map() for usage

peek()

Optionally get the most recent values in the queue without flushing the queue.

Returns
:class:`~tspy.data_structures.time_series.TimeSeries.TimeSeries

a time-series containing the most recent resolved observations. If no observations exist, return None

poll(polling_interval=- 1)

Get the most recent values in the queue. If no values exists, this method will block and poll at the rate of the given polling_interval. Once values have been resolved, poll will flush the queue up till the last values time-tick

Parameters
polling_intervalint, optional

how often to poll for values if none exist in the queue (default is 0ms)

Returns
:class:`~tspy.data_structures.time_series.TimeSeries.TimeSeries

a time-series containing the most recent resolved observations.

resample(periodicity, func)

produce a new stream-time-series by resampling the current stream-time-series to a given periodicity

Parameters
periodint

the period to resample to

funcfunc or interpolator

the interpolator method to be used when a value doesn’t exist at a given time-tick

Returns
StreamTimeSeries

a new stream-time-series

Notes

see resample() for usage

right_join(right_time_series, join_func=None, interp_func=<function StreamTimeSeries.<lambda>>)

join two stream-time-series based on a temporal right join strategy and optionally interpolate missing values

Parameters
right_time_seriesStreamTimeSeries

the stream-time-series to align with

join_funcfunc, optional

function to join to values (default is join to list where left is index 0, right is index 1)

interp_funcfunc or interpolator, optional

the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

Returns
StreamTimeSeries

a new stream-time-series

Notes

see right_join() for usage

right_outer_join(right_time_series, join_func=None, interp_func=<function StreamTimeSeries.<lambda>>)

join two stream-time-series based on a temporal right outer join strategy and optionally interpolate missing values

Parameters
right_time_seriesStreamTimeSeries

the stream-time-series to align with

join_funcfunc, optional

function to join to values (default is join to list where left is index 0, right is index 1)

interp_funcfunc or interpolator, optional

the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)

Returns
StreamTimeSeries

a new stream-time-series

Notes

see right_outer_join() for usage

run()

run the streaming pipeline

Notes

if the stream is backed by an infinite source, this will never end until the program is explicitly killed

segment(window, step=1)

produce a new segment-time-series from a performing a sliding-based segmentation over the time-series

Parameters
windowint

number of observations per window

stepint, optional

step size to slide (default is 1)

Returns
SegmentStreamTimeSeries

a new segment-stream-time-series

Notes

see segment() for usage

Segment size is guaranteed to be equal to given window value

segment_by_anchor(anchor_op, left_delta, right_delta)

produce a new segment-time-series from performing an anchor-based segmentation over the time-series. An anchor point is defined as any value that satisfies the filter function. When an anchor point is determined the segment is built based on left_delta time ticks to the left of the point and right_delta time ticks to the right of the point.

Parameters
funcfunc

the filter anchor point function

left_deltaint

left delta time ticks to the left of the anchor point

right_deltaint

right delta time ticks to the right of the anchor point

Returns
SegmentStreamTimeSeries

a new segment-stream-time-series

Notes

see segment_by_anchor() for usage

segment_by_time(window, step=None)

produce a new segment-time-series from a performing a time-based segmentation over the time-series

Parameters
windowint

time-tick length of window

stepint

time-tick length of step

Returns
SegmentStreamTimeSeries

a new segment-stream-time-series

Notes

see segment_by_time() for usage

to_observation_stream()
Returns
ObservationStream

a stream iterator of observations

with_trs(granularity=datetime.timedelta(0, 0, 1000), start_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc))

create a new stream-time-series with its timestamps mapped based on a granularity and start_time. In the scope of this method, granularity refers to the granularity at which to see time_ticks and start_time refers to the zone-date-time in which to start your stream-time-series

Parameters
granularitydatetime.timedelta, optional

the granularity for use in stream-time-series TRS (default is 1ms)

start_timedatetime, optional

the starting date-time of the stream-time-series (default is 1970-01-01 UTC)

Returns
StreamTimeSeries

a new stream-time-series with its time_ticks mapped based on a new TRS.

Notes

see with_trs() for usage

time_ticks will be mapped as follows - (current_time_tick - start_time) / granularity

if the source stream-time-series does not have a time-reference-system associated with it, this method will throw and exception