PyStream API
This page describes important components’ API in PyStream.
Classes
- class pystream.Pipeline(input_generator: Callable[[], Any] | None = None, use_profiler: bool = False)
The pipeline constructor
- Parameters:
input_generator (Optional[Callable[[], Any]], optional) – Function that will be used to generate input data if you want the pipeline to run autonomously. If None, the input needs to be given by invoking “forward” method. Defaults to None.
use_profiler (bool, optional) – Whether to implement profiler to the pipeline. Defaults to False.
- add(stage: Callable[[T], T] | Stage, name: str | None = None) None
Add a stage into the pipeline
The stage is in type of StageCallable, which is Union[Callable[[T], T], Stage]. Thus, a stage can be defined in two ways:
(1) A stage can be a function that takes an input data (of any type) and then returns an output data of the same type.
(2) A stage can be a class that inherits from pystream.Stage class, which is an abstract class. Methods __call__ and cleanup need to be defined there. Use this if the stage need a special cleanup procedure.
- Parameters:
stage (StageCallable) – the stage to be added
name (Optional[str]) – the stage name. If None default stage name will be given, i.e. Stage_i where i is the stage sequence number. Defaults to None.
- as_stage() Stage
Get the base pipeline executor, which can be treated as a stage. Useful if you want to create pipeline inside pipeline
- Raises:
PipelineUndefined – raised if method serialize or parallelize has not been invoked.
- Returns:
the base pipeline executor
- Return type:
- cleanup() None
Stop and cleanup the pipeline. Do nothing if the pipeline has not been initialized
- forward(data: ~typing.Any = <pystream.data.pipeline_data.InputGeneratorRequest object>) bool
Forward data into the pipeline
- Parameters:
data (Any) – the data. If data none, data generated from the input generator will be pushed instead.
- Raises:
PipelineUndefined – raised if method serialize and parallelize has not been invoked.
- Returns:
True if the data has been forwarded successfully, False otherwise.
- Return type:
bool
- get_profiles() Tuple[Dict[str, float], Dict[str, float]]
Get profiles data
- Returns:
dictionary of the latency (in seconds) and throughput (in data/second) data respectively. The data is a dict where the key is the stage name.
- Return type:
Tuple[Dict[str, float], Dict[str, float]]
- get_results() Any
Get latest results from the pipeline
- Raises:
PipelineUndefined – raised if method serialize or parallelize has not been invoked.
- Returns:
- the last data from the pipeline. The same data cannot be
read twice. If the new data is not available, None is returned.
- Return type:
Any
- parallelize(block_input: bool = True, input_timeout: float = 10, block_output: bool = False, output_timeout: float = 10) Pipeline
Turn the pipeline into independent stage pipeline. Each stage will live in different thread and work asynchronously. However, the data will be passed to the stages in the same order as defined
- Parameters:
block_input (bool, optional) – Whether to set the forward method into blocking mode if the first stage is busy with the specified timeout in input_timeout. Defaults to True.
input_timeout (float, optional) – Blocking timeout for the forward method in seconds. Defaults to 10.
block_output (bool, optional) – Whether to set the get_results method into blocking mode if there is not available data from the last stage. Defaults to False.
output_timeout (float, optional) – Blocking timeout for the get_results method in seconds. Defaults to 10.
- Returns:
this pipeline itself
- Return type:
- serialize() Pipeline
Turn the pipeline into serial pipeline. All stages will be run in sequential and blocking mode.
- Returns:
this pipeline itself
- Return type:
- start_loop(period: float = 0.01) None
Start the pipeline in autonomous mode. Data generated from input generator will be pushed into the pipeline at each defined period of time.
- Parameters:
period (float, optional) – Period to push the data. Defaults to 0.01.
- stop_loop() None
Stop the autonomous operation of the pipeline
- class pystream.Stage
Base class for the pipeline stage. As an example, stages that have a cleanup routine should be defined as a subclass of this class.
- Useful property:
- name (str):
the stage name, if not defined by the child instance, the name will be assigned automatically after the pipeline is constructed. Defaults to “”
- abstract __call__(data: T) T
Main process of the stage.
- Parameters:
data (T) – Input data
- Returns:
output data, should be in the same type as input data
- Return type:
T
- abstract cleanup() None
Cleanup method for the stage. This method will be invoked during pipeline cleanup step
Functions
- pystream.get_profiler_db_folder() str
Get the folder for the profiler SQLite database
- Returns:
path to directory that contains the database
- Return type:
str
- pystream.set_profiler_db_folder(folder_path: str) None
Set the folder for the profiler SQLite database. This should be called before the Pipeline instantiation
- Parameters:
folder_path (str) – path to directory that contains the database
Constants
- pystream.logger = <Logger PyStream (INFO)>
The PyStream logger object from logging package
- pystream.MAIN_PIPELINE_NAME = 'MainPipeline'
The name of main pipeline in profiling result