PyStream API
Staged Pipeline
- class pystream.Pipeline(input_generator: Optional[Callable[[], Any]] = None)
The pipeline constructor
- Parameters
input_generator (Optional[Callable[[], Any]], optional) – Function that will be used to generate input data if you want the pipeline to run autonomously. If None, the input needs to be given by invoking “forward” method. Defaults to None.
- add(stage: Union[Callable[[T], T], Stage]) None
Add a stage into the pipeline
The stage is in type of StageCallable, which is Union[Callable[[T], T], Stage]. Thus, a stage can be defined in two ways:
(1) A stage can be a function that takes an input data (of any type) and then returns an output data of the same type.
(2) A stage can be a class that inherits from pystream.Stage class, which is an abstract class. Methods __call__ and cleanup need to be defined there. Use this if the stage need a special cleanup procedure.
- Parameters
stage (StageCallable) – the stage to be added
- cleanup() None
Stop and cleanup the pipeline. Do nothing if the pipeline has not been initialized
- forward(data: Any) bool
Forward data into the pipeline
- Parameters
data (Any) – the data. If data none, data generated from the input generator will be pushed instead.
- Raises
PipelineUndefined – raised if method serialize and parallelize has not been invoked.
- Returns
True if the data has been forwarded successfully, False otherwise.
- Return type
bool
- get_results() Any
Get latest results from the pipeline
- Raises
PipelineUndefined – raised if method serialize and parallelize has not been invoked.
- Returns
- the last data from the pipeline. The same data cannot be
read twice. If the new data is not available, None is returned.
- Return type
Any
- parallelize(block_input: bool = True, input_timeout: float = 10) Pipeline
Turn the pipeline into independent stage pipeline. Each stage will live in different thread and work asynchronously. However, the data will be passed to the stages in the same order as defined
- Parameters
block_input (bool, optional) – Whether to set the forward method into blocking mode with the specified timeout in input_timeout. Defaults to True.
input_timeout (float, optional) – Blocking timeout for the forward method in seconds. Defaults to 10.
- Returns
this pipeline itself
- Return type
- serialize() Pipeline
Turn the pipeline into serial pipeline. All stages will be run in sequential and blocking mode.
- Returns
this pipeline itself
- Return type
- start_loop(period: float = 0.01) None
Start the pipeline in autonomous mode. Data generated from input generator will be pushed into the pipeline at each defined period of time.
- Parameters
period (float, optional) – Period to push the data. Defaults to 0.01.
- stop_loop() None
Stop the autonomous operation of the pipeline
- class pystream.Stage
Abstract class for the pipeline stage. All stage have to be cleaned up should be defined as a subclass of this class.
- abstract __call__(data: T) T
Main process of the stage.
- Parameters
data (T) – Input data
- Returns
output data, should be in the same type as input data
- Return type
T
- abstract cleanup() None
Cleanup method for the stage. This method will be invoked during pipeline cleanup step
Functional Pipeline
- pystream.functional.func_serial(funcs: List[Callable[[Any], Any]]) Callable[[Any], Any]
Create a function made of functions that are executed in serial
- Parameters
funcs (List[Callable[[Any], Any]]) – the list of functions to be executed. It only takes one argument which supposed to be anything and returns the processing results. The output of function i will be used as the input argument for function i + 1.
- Returns
The returned function to execute the serial functions. It takes one argument ‘x’ which supposed to be the input data. That data will be passed to funcs[0]. This function will return the output of the last function in funcs.
- Return type
Callable[[Any], Any]
- pystream.functional.func_parallel_thread(funcs: ~typing.List[~typing.Callable[[~typing.Any], ~typing.Any]], executor: ~concurrent.futures.thread.ThreadPoolExecutor = <concurrent.futures.thread.ThreadPoolExecutor object>) Callable[[Any], Any]
Create a function made of functions that are executed in parallel using ThreadPoolExecutor from concurrent module.
The input functions must only take one mandatory data argument. Note that in the wrapper function returned by this functions, this data will be passed and shared to all of the input functions. And the same object will be returned by the wrapper. Therefore, all input functions must modify the input data inplace to get meaningful results.
If no executor is provided, a shared default ThreadPoolExecutor is used. This executor will not be killed by shutdown method. Therefore, make sure that the functions passed here can exit properly. To be safe, please pass your own executor.
- Parameters
funcs (List[Callable[[Any], Any]]) – the list of functions to be executed. It only takes one argument which supposed to be anything. The output of the functions will not be used.
executor (ThreadPoolExecutor, optional) – ThreadPoolExecutor instance from concurrent.futures that handles the threads. By default, executor managed by this package will be used.
- Returns
The returned function to execute the parallelized input functions. It takes one argument ‘x’ which will be passed to the input functions and returns the same object ‘x’ as the input after execution.
- Return type
Callable[[Any], Any]