tspy.data_structures.stream_multi_time_series.StreamMultiTimeSeries module¶
-
class
tspy.data_structures.stream_multi_time_series.StreamMultiTimeSeries.
StreamMultiTimeSeries
(tsc, j_stream_mts, trs=None)¶ Bases:
object
This is the data-structure to handle multiple time-series in motion where each time-series is denoted by a unique key. Stream-Multi-Time-Series can be thought of as a FIFO queue, having a peek and poll method. A unique quality of Stream-Multi-Time-Series is that it allows for a rich set of segmentation functions which are blocking by nature (will not resolve till a full window is uncovered). Because of this, Stream-Multi-Time-Series has an extremely low memory footprint when executing on large amounts of data as well.
Examples
create a simple queue
>>> import queue >>> key_observation_queue = queue.Queue()
create a simple stream-multi-time-series from a queue
>>> import tspy >>> smts = tspy.stream_multi_time_series.queue(key_observation_queue)
create a background thread adding to the queue
>>> from threading import Thread >>> from time import sleep >>> def thread_function(): ...c = 0 ...while True: ... observation = tspy.observation(c, c) ... key = "a" if c % 2 == 0 else "b" ... c = c + 1 ... key_observation_queue.put_nowait((key, observation)) ... sleep(1) >>> thread = Thread(target = thread_function) >>> thread.start()
continuously get values from the queue
>>> for mts in smts: ... print(mts)
Methods
add_sink
(multi_data_sink)add a multi-data-sink to this piece of the streaming pipeline.
fillna
(interpolator[, null_value])produce a new stream-multi-time-series which is the result of filling all null values.
filter
(func)produce a new stream-multi-time-series which is the result of filtering by each observation’s value given a filter function.
flatmap
(func)produce a new stream-multi-time-series where each observation’s value in this stream-multi-time-series is mapped to 0 to N new values.
full_join
(right_stream_mts[, join_func, …])join two stream-multi-time-series based on a temporal full join strategy and optionally interpolate missing values
inner_join
(right_stream_mts[, join_func])join two stream-multi-time-series based on a temporal inner join strategy
interval_join
(right_stream_mts, filter_func, …)join two stream-multi-time-series where observations in the right stream lie within an interval of this stream.
left_join
(right_stream_mts[, join_func, …])join two stream-multi-time-series based on a temporal left join strategy and optionally interpolate missing values
left_outer_join
(right_stream_mts[, …])join two stream-multi-time-series based on a temporal left outer join strategy and optionally interpolate missing values
map
(func)produce a new stream-multi-time-series where each observation’s value in this stream-time-series is mapped to a new observation value
map_with_key
(func)produce a new stream-multi-time-series where each observation’s value in this stream-time-series is mapped given a key and value function to a new observation value
partition
([num_partitions, …])partition this stream-multi-time-series such that all partitions can be run on separate threads in parallel
peek
()Optionally get the most recent values in the queue without flushing the queue.
poll
([polling_interval])Get the most recent values in the queue.
resample
(periodicity, interpolator)produce a new stream-muliti-time-series by resampling the current stream-multi-time-series to a given periodicity
right_join
(right_stream_mts[, join_func, …])join two stream-multi-time-series based on a temporal right join strategy and optionally interpolate missing values
right_outer_join
(right_stream_mts[, …])join two stream-multi-time-series based on a temporal right outer join strategy and optionally interpolate missing values
run
()run the streaming pipeline
segment
(window[, step])produce a new segment-multi-time-series from a performing a sliding-based segmentation over each time-series.
segment_by_anchor
(func, left_delta, right_delta)produce a new segment-time-series from performing an anchor-based segmentation over each time-series.
segment_by_time
(window[, step])produce a new segment-time-series from a performing a time-based segmentation over each time-series
- Returns
with_trs
([granularity, start_time])create a new stream-multi-time-series with its timestamps mapped based on a granularity and start_time.
-
add_sink
(multi_data_sink)¶ add a multi-data-sink to this piece of the streaming pipeline.
- Parameters
- multi_data_sink
MultiDataSink
the multi-data-sink to output this piece of the pipeline to
- multi_data_sink
- Returns
- :class`tspy.data_structures.StreamMultiTimeSeries.StreamMultiTimeSeries`
a new stream-multi-time-series
Examples
create a stream-multi-time-series from a queue
>>> import queue >>> import tspy >>> q = queue.Queue() >>> mts = tspy.stream_multi_time_series.queue(q)
create a datasink
>>> from tspy.io.MultiDataSink import MultiDataSink >>> class MySink(MultiDataSink): ...def dump(self, observations_dict): ... print(observations_dict)
add the datasink to the time-series
>>> mts_with_sink = mts.add_sink(MySink())
-
fillna
(interpolator, null_value=None)¶ produce a new stream-multi-time-series which is the result of filling all null values.
- Parameters
- interpolatorfunc or interpolator
the interpolator method to be used when a value is null
- null_valueany, optional
denotes a null value, for instance if nullValue = NaN, NaN would be filled
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series
Notes
see
fillna()
for usage
-
filter
(func)¶ produce a new stream-multi-time-series which is the result of filtering by each observation’s value given a filter function.
- Parameters
- funcfunc
the filter on observation’s value function
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series
Notes
see
filter()
for usage
-
flatmap
(func)¶ produce a new stream-multi-time-series where each observation’s value in this stream-multi-time-series is mapped to 0 to N new values.
- Parameters
- funcfunc
value mapping function which returns a list of values
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series with its values flat-mapped
Notes
see
flatmap()
for usagean observations time-tick will be duplicated if a single value maps to multiple values
-
full_join
(right_stream_mts, join_func=None, left_interp_func=<function StreamMultiTimeSeries.<lambda>>, right_interp_func=<function StreamMultiTimeSeries.<lambda>>)¶ join two stream-multi-time-series based on a temporal full join strategy and optionally interpolate missing values
- Parameters
- right_stream_mts
StreamMultiTimeSeries
the stream-multi-time-series to align with
- join_funcfunc, optional
function to join to values (default is join to list where left is index 0, right is index 1)
- left_interp_funcfunc or interpolator, optional
the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_interp_funcfunc or interpolator, optional
the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_stream_mts
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series
Notes
see
full_join()
for usage
-
inner_join
(right_stream_mts, join_func=None)¶ join two stream-multi-time-series based on a temporal inner join strategy
- Parameters
- right_stream_mts
StreamMultiTimeSeries
the stream-time-series to join with
- join_funcfunc, optional
function to join 2 values at a given time-tick. If None given, joined value will be in a list (default is None)
- right_stream_mts
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series
Notes
see
inner_join()
for usage
-
interval_join
(right_stream_mts, filter_func, left_delta, right_delta)¶ join two stream-multi-time-series where observations in the right stream lie within an interval of this stream. The interval is defined by a point (discovered from the filter_func) with left_delta time-ticks to the left and right_delta time-ticks to the right.
- Parameters
- right_stream_mts
StreamMultiTimeSeries
the stream-time-series to align with
- filter_funcfunc
function which given a value, will return whether to form an interval
- left_deltaint
how many time-ticks to the left of the interval-point
- right_deltaint
how many time-ticks to the right of the interval-point
- right_stream_mts
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series
-
left_join
(right_stream_mts, join_func=None, interp_func=<function StreamMultiTimeSeries.<lambda>>)¶ join two stream-multi-time-series based on a temporal left join strategy and optionally interpolate missing values
- Parameters
- right_stream_mts
StreamMultiTimeSeries
the stream-multi-time-series to align with
- join_funcfunc, optional
function to join to values (default is join to list where left is index 0, right is index 1)
- interp_funcfunc or interpolator, optional
the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_stream_mts
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series
Notes
see
left_join()
for usage
-
left_outer_join
(right_stream_mts, join_func=None, interp_func=<function StreamMultiTimeSeries.<lambda>>)¶ join two stream-multi-time-series based on a temporal left outer join strategy and optionally interpolate missing values
- Parameters
- right_stream_mts
StreamMultiTimeSeries
the stream-multi-time-series to align with
- join_funcfunc, optional
function to join to values (default is join to list where left is index 0, right is index 1)
- interp_funcfunc or interpolator, optional
the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_stream_mts
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series
Notes
see
left_outer_join()
for usage
-
map
(func)¶ produce a new stream-multi-time-series where each observation’s value in this stream-time-series is mapped to a new observation value
- Parameters
- funcfunc
value mapping function
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series with its values re-mapped
Notes
see
map()
for usage
-
map_with_key
(func)¶ produce a new stream-multi-time-series where each observation’s value in this stream-time-series is mapped given a key and value function to a new observation value
- Parameters
- funcfunc
value mapping function which given a key and value, returns a value
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series with its values re-mapped
-
partition
(num_partitions=- 1, partition_polling_interval=1000)¶ partition this stream-multi-time-series such that all partitions can be run on separate threads in parallel
- Parameters
- num_partitionsint
number of partitions
- partition_polling_intervalint, optional
how often to poll for new values on each partition (default is 1000 ms)
- Returns
- :class`tspy.data_structures.StreamMultiTimeSeries.StreamMultiTimeSeries`
a new partitioned stream-multi-time-series
-
peek
()¶ Optionally get the most recent values in the queue without flushing the queue.
- Returns
MultiTimeSeries
a multi-time-series containing the most recent resolved observations. If no observations exist, return None
-
poll
(polling_interval=- 1)¶ Get the most recent values in the queue. If no values exists, this method will block and poll at the rate of the given polling_interval. Once values have been resolved, poll will flush the queue up till the last values time-tick
- Parameters
- polling_intervalint, optional
how often to poll for values if none exist in the queue (default is 0ms)
- Returns
MultiTimeSeries
a multi-time-series containing the most recent resolved observations.
-
resample
(periodicity, interpolator)¶ produce a new stream-muliti-time-series by resampling the current stream-multi-time-series to a given periodicity
- Parameters
- periodint
the period to resample to
- funcfunc or interpolator
the interpolator method to be used when a value doesn’t exist at a given time-tick
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series
Notes
see
resample()
for usage
-
right_join
(right_stream_mts, join_func=None, interp_func=<function StreamMultiTimeSeries.<lambda>>)¶ join two stream-multi-time-series based on a temporal right join strategy and optionally interpolate missing values
- Parameters
- right_stream_mts
StreamMultiTimeSeries
the stream-multi-time-series to align with
- join_funcfunc, optional
function to join to values (default is join to list where left is index 0, right is index 1)
- interp_funcfunc or interpolator, optional
the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_stream_mts
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series
Notes
see
right_join()
for usage
-
right_outer_join
(right_stream_mts, join_func=None, interp_func=<function StreamMultiTimeSeries.<lambda>>)¶ join two stream-multi-time-series based on a temporal right outer join strategy and optionally interpolate missing values
- Parameters
- right_stream_mts
StreamMultiTimeSeries
the stream-multi-time-series to align with
- join_funcfunc, optional
function to join to values (default is join to list where left is index 0, right is index 1)
- interp_funcfunc or interpolator, optional
the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_stream_mts
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series
Notes
see
right_outer_join()
for usage
-
run
()¶ run the streaming pipeline
Notes
if the stream is backed by an infinite source, this will never end until the program is explicitly killed
-
segment
(window, step=1)¶ produce a new segment-multi-time-series from a performing a sliding-based segmentation over each time-series.
- Parameters
- windowint
number of observations per window
- stepint, optional
step size to slide (default is 1)
- Returns
SegmentStreamMultiTimeSeries
a new segment-stream-multi-time-series
Notes
see
segment()
for usageSegment size is guaranteed to be equal to given window value
-
segment_by_anchor
(func, left_delta, right_delta)¶ produce a new segment-time-series from performing an anchor-based segmentation over each time-series. An anchor point is defined as any value that satisfies the filter function. When an anchor point is determined the segment is built based on left_delta time ticks to the left of the point and right_delta time ticks to the right of the point.
- Parameters
- funcfunc
the filter anchor point function
- left_deltaint
left delta time ticks to the left of the anchor point
- right_deltaint
right delta time ticks to the right of the anchor point
- Returns
SegmentStreamMultiTimeSeries
a new segment-stream-multi-time-series
Notes
see
segment_by_anchor()
for usage
-
segment_by_time
(window, step=None)¶ produce a new segment-time-series from a performing a time-based segmentation over each time-series
- Parameters
- windowint
time-tick length of window
- stepint
time-tick length of step
- Returns
SegmentStreamMultiTimeSeries
a new segment-stream-multi-time-series
Notes
see
segment_by_time()
for usage
-
to_multi_observation_stream
()¶ - Returns
MultiObservationStream
a stream iterator of (key, observation)
-
with_trs
(granularity=datetime.timedelta(0, 0, 1000), start_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc))¶ create a new stream-multi-time-series with its timestamps mapped based on a granularity and start_time. In the scope of this method, granularity refers to the granularity at which to see time_ticks and start_time refers to the zone-date-time in which to start your stream-multi-time-series
- Parameters
- granularitydatetime.timedelta, optional
the granularity for use in stream-multi-time-series
TRS
(default is 1ms)- start_timedatetime, optional
the starting date-time of the stream-multi-time-series (default is 1970-01-01 UTC)
- Returns
StreamMultiTimeSeries
a new stream-multi-time-series with its time_ticks mapped based on a new
TRS
.
Notes
time_ticks will be mapped as follows - (current_time_tick - start_time) / granularity
if the source stream-multi-time-series does not have a time-reference-system associated with it, this method will throw and exception