tspy.data_structures.stream_time_series.StreamTimeSeries module¶
-
class
tspy.data_structures.stream_time_series.StreamTimeSeries.
StreamTimeSeries
(tsc, j_stream_time_series, trs=None)¶ Bases:
object
This is the data-structure to handle time-series in motion. Stream-Time-Series can be thought of as a FIFO queue, having a peek and poll method. A unique quality of Stream-Time-Series is that it allows for a rich set of segmentation functions which are blocking by nature (will not resolve till a full window is uncovered). Because of this, Stream-Time-Series has an extremely low memory footprint when executing on large amounts of data as well.
Examples
create a simple queue
>>> import queue >>> observation_queue = queue.Queue()
create a simple stream-time-series from a queue
>>> import tspy >>> sts = tspy.stream_time_series.queue(observation_queue)
create a background thread adding to the queue
>>> from threading import Thread >>> from time import sleep >>> def thread_function(): ...c = 0 ...while True: ... observation = tspy.observation(c, c) ... c = c + 1 ... q.put_nowait(observation) ... sleep(1) >>> thread = Thread(target = thread_function) >>> thread.start()
continuously get values from the queue
>>> for ts in sts: ... print(ts)
- Attributes
- trs: TRS
a time-reference-system
TRS
Methods
add_sink
(data_sink)add a data-sink to this piece of the streaming pipeline.
fillna
(interpolator[, null_value])produce a new stream-time-series which is the result of filling all null values.
filter
(func)produce a new stream-time-series which is the result of filtering by each observation’s value given a filter function.
flatmap
(func)produce a new stream-time-series where each observation’s value in this stream-time-series is mapped to 0 to N new values.
full_join
(right_time_series[, join_func, …])join two stream-time-series based on a temporal full join strategy and optionally interpolate missing values
inner_join
(right_time_series[, join_func])join two stream-time-series based on a temporal inner join strategy
interval_join
(right_stream_ts, filter_func, …)join two stream-time-series where observations in the right stream lie within an interval of this stream.
left_join
(right_time_series[, join_func, …])join two stream-time-series based on a temporal left join strategy and optionally interpolate missing values
left_outer_join
(right_time_series[, …])join two stream-time-series based on a temporal left outer join strategy and optionally interpolate missing values
map
(func)produce a new stream-time-series where each observation’s value in this stream-time-series is mapped to a new observation value
peek
()Optionally get the most recent values in the queue without flushing the queue.
poll
([polling_interval])Get the most recent values in the queue.
resample
(periodicity, func)produce a new stream-time-series by resampling the current stream-time-series to a given periodicity
right_join
(right_time_series[, join_func, …])join two stream-time-series based on a temporal right join strategy and optionally interpolate missing values
right_outer_join
(right_time_series[, …])join two stream-time-series based on a temporal right outer join strategy and optionally interpolate missing values
run
()run the streaming pipeline
segment
(window[, step])produce a new segment-time-series from a performing a sliding-based segmentation over the time-series
segment_by_anchor
(anchor_op, left_delta, …)produce a new segment-time-series from performing an anchor-based segmentation over the time-series.
segment_by_time
(window[, step])produce a new segment-time-series from a performing a time-based segmentation over the time-series
- Returns
with_trs
([granularity, start_time])create a new stream-time-series with its timestamps mapped based on a granularity and start_time.
-
add_sink
(data_sink)¶ add a data-sink to this piece of the streaming pipeline.
- Parameters
- data_sink
DataSink
the data-sink to output this piece of the pipeline to
- data_sink
- Returns
- :class`tspy.data_structures.StreamTimeSeries.StreamTimeSeries`
a new stream-time-series
Examples
create a stream-time-series from a queue
>>> import queue >>> import tspy >>> q = queue.Queue() >>> ts = tspy.stream_time_series.queue(q)
create a datasink
>>> from tspy.io.DataSink import DataSink >>> class MySink(DataSink): ...def dump(self, observations): ... print(observations)
add the datasink to the time-series
>>> ts_with_sink = ts.add_sink(MySink())
-
fillna
(interpolator, null_value=None)¶ produce a new stream-time-series which is the result of filling all null values.
- Parameters
- interpolatorfunc or interpolator
the interpolator method to be used when a value is null
- null_valueany, optional
denotes a null value, for instance if nullValue = NaN, NaN would be filled
- Returns
StreamTimeSeries
a new stream-time-series
Notes
see
fillna()
for usage
-
filter
(func)¶ produce a new stream-time-series which is the result of filtering by each observation’s value given a filter function.
- Parameters
- funcfunc
the filter on observation’s value function
- Returns
StreamTimeSeries
a new stream-time-series
Notes
see
filter()
for usage
-
flatmap
(func)¶ produce a new stream-time-series where each observation’s value in this stream-time-series is mapped to 0 to N new values.
- Parameters
- funcfunc
value mapping function which returns a list of values
- Returns
StreamTimeSeries
a new stream-time-series with its values flat-mapped
Notes
see
flatmap()
for usagean observations time-tick will be duplicated if a single value maps to multiple values
-
full_join
(right_time_series, join_func=None, left_interp_func=<function StreamTimeSeries.<lambda>>, right_interp_func=<function StreamTimeSeries.<lambda>>)¶ join two stream-time-series based on a temporal full join strategy and optionally interpolate missing values
- Parameters
- right_time_series
StreamTimeSeries
the stream-time-series to align with
- join_funcfunc, optional
function to join to values (default is join to list where left is index 0, right is index 1)
- left_interp_funcfunc or interpolator, optional
the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_interp_funcfunc or interpolator, optional
the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_time_series
- Returns
StreamTimeSeries
a new stream-time-series
Notes
see
full_join()
for usage
-
inner_join
(right_time_series, join_func=None)¶ join two stream-time-series based on a temporal inner join strategy
- Parameters
- right_time_series
StreamTimeSeries
the stream-time-series to join with
- join_funcfunc, optional
function to join 2 values at a given time-tick. If None given, joined value will be in a list (default is None)
- right_time_series
- Returns
StreamTimeSeries
a new stream-time-series
Notes
see
inner_join()
for usage
-
interval_join
(right_stream_ts, filter_func, left_delta, right_delta)¶ join two stream-time-series where observations in the right stream lie within an interval of this stream. The interval is defined by a point (discovered from the filter_func) with left_delta time-ticks to the left and right_delta time-ticks to the right.
- Parameters
- right_stream_ts
StreamTimeSeries
the stream-time-series to align with
- filter_funcfunc
function which given a value, will return whether to form an interval
- left_deltaint
how many time-ticks to the left of the interval-point
- right_deltaint
how many time-ticks to the right of the interval-point
- right_stream_ts
- Returns
StreamTimeSeries
a new stream-time-series
-
left_join
(right_time_series, join_func=None, interp_func=<function StreamTimeSeries.<lambda>>)¶ join two stream-time-series based on a temporal left join strategy and optionally interpolate missing values
- Parameters
- right_time_series
StreamTimeSeries
the stream-time-series to align with
- join_funcfunc, optional
function to join to values (default is join to list where left is index 0, right is index 1)
- interp_funcfunc or interpolator, optional
the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_time_series
- Returns
StreamTimeSeries
a new stream-time-series
Notes
see
left_join()
for usage
-
left_outer_join
(right_time_series, join_func=None, interp_func=<function StreamTimeSeries.<lambda>>)¶ join two stream-time-series based on a temporal left outer join strategy and optionally interpolate missing values
- Parameters
- right_time_series
StreamTimeSeries
the stream-time-series to align with
- join_funcfunc, optional
function to join to values (default is join to list where left is index 0, right is index 1)
- interp_funcfunc or interpolator, optional
the right stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_time_series
- Returns
StreamTimeSeries
a new stream-time-series
Notes
see
left_outer_join()
for usage
-
map
(func)¶ produce a new stream-time-series where each observation’s value in this stream-time-series is mapped to a new observation value
- Parameters
- funcfunc
value mapping function
- Returns
StreamTimeSeries
a new stream-time-series with its values re-mapped
Notes
see
map()
for usage
-
peek
()¶ Optionally get the most recent values in the queue without flushing the queue.
- Returns
- :class:`~tspy.data_structures.time_series.TimeSeries.TimeSeries
a time-series containing the most recent resolved observations. If no observations exist, return None
-
poll
(polling_interval=- 1)¶ Get the most recent values in the queue. If no values exists, this method will block and poll at the rate of the given polling_interval. Once values have been resolved, poll will flush the queue up till the last values time-tick
- Parameters
- polling_intervalint, optional
how often to poll for values if none exist in the queue (default is 0ms)
- Returns
- :class:`~tspy.data_structures.time_series.TimeSeries.TimeSeries
a time-series containing the most recent resolved observations.
-
resample
(periodicity, func)¶ produce a new stream-time-series by resampling the current stream-time-series to a given periodicity
- Parameters
- periodint
the period to resample to
- funcfunc or interpolator
the interpolator method to be used when a value doesn’t exist at a given time-tick
- Returns
StreamTimeSeries
a new stream-time-series
Notes
see
resample()
for usage
-
right_join
(right_time_series, join_func=None, interp_func=<function StreamTimeSeries.<lambda>>)¶ join two stream-time-series based on a temporal right join strategy and optionally interpolate missing values
- Parameters
- right_time_series
StreamTimeSeries
the stream-time-series to align with
- join_funcfunc, optional
function to join to values (default is join to list where left is index 0, right is index 1)
- interp_funcfunc or interpolator, optional
the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_time_series
- Returns
StreamTimeSeries
a new stream-time-series
Notes
see
right_join()
for usage
-
right_outer_join
(right_time_series, join_func=None, interp_func=<function StreamTimeSeries.<lambda>>)¶ join two stream-time-series based on a temporal right outer join strategy and optionally interpolate missing values
- Parameters
- right_time_series
StreamTimeSeries
the stream-time-series to align with
- join_funcfunc, optional
function to join to values (default is join to list where left is index 0, right is index 1)
- interp_funcfunc or interpolator, optional
the left stream-time-series interpolator method to be used when a value doesn’t exist at a given time-tick (default is fill with None)
- right_time_series
- Returns
StreamTimeSeries
a new stream-time-series
Notes
see
right_outer_join()
for usage
-
run
()¶ run the streaming pipeline
Notes
if the stream is backed by an infinite source, this will never end until the program is explicitly killed
-
segment
(window, step=1)¶ produce a new segment-time-series from a performing a sliding-based segmentation over the time-series
- Parameters
- windowint
number of observations per window
- stepint, optional
step size to slide (default is 1)
- Returns
SegmentStreamTimeSeries
a new segment-stream-time-series
Notes
see
segment()
for usageSegment size is guaranteed to be equal to given window value
-
segment_by_anchor
(anchor_op, left_delta, right_delta)¶ produce a new segment-time-series from performing an anchor-based segmentation over the time-series. An anchor point is defined as any value that satisfies the filter function. When an anchor point is determined the segment is built based on left_delta time ticks to the left of the point and right_delta time ticks to the right of the point.
- Parameters
- funcfunc
the filter anchor point function
- left_deltaint
left delta time ticks to the left of the anchor point
- right_deltaint
right delta time ticks to the right of the anchor point
- Returns
SegmentStreamTimeSeries
a new segment-stream-time-series
Notes
see
segment_by_anchor()
for usage
-
segment_by_time
(window, step=None)¶ produce a new segment-time-series from a performing a time-based segmentation over the time-series
- Parameters
- windowint
time-tick length of window
- stepint
time-tick length of step
- Returns
SegmentStreamTimeSeries
a new segment-stream-time-series
Notes
see
segment_by_time()
for usage
-
to_observation_stream
()¶ - Returns
ObservationStream
a stream iterator of observations
-
with_trs
(granularity=datetime.timedelta(0, 0, 1000), start_time=datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc))¶ create a new stream-time-series with its timestamps mapped based on a granularity and start_time. In the scope of this method, granularity refers to the granularity at which to see time_ticks and start_time refers to the zone-date-time in which to start your stream-time-series
- Parameters
- granularitydatetime.timedelta, optional
the granularity for use in stream-time-series
TRS
(default is 1ms)- start_timedatetime, optional
the starting date-time of the stream-time-series (default is 1970-01-01 UTC)
- Returns
StreamTimeSeries
a new stream-time-series with its time_ticks mapped based on a new
TRS
.
Notes
see
with_trs()
for usagetime_ticks will be mapped as follows - (current_time_tick - start_time) / granularity
if the source stream-time-series does not have a time-reference-system associated with it, this method will throw and exception