PyStream API

This page describes important components’ API in PyStream.

Classes

class pystream.Pipeline(input_generator: Callable[[], Any] | None = None, use_profiler: bool = False)

The pipeline constructor

Parameters:
  • input_generator (Optional[Callable[[], Any]], optional) – Function that will be used to generate input data if you want the pipeline to run autonomously. If None, the input needs to be given by invoking “forward” method. Defaults to None.

  • use_profiler (bool, optional) – Whether to implement profiler to the pipeline. Defaults to False.

add(stage: Callable[[T], T] | Stage, name: str | None = None) None

Add a stage into the pipeline

The stage is in type of StageCallable, which is Union[Callable[[T], T], Stage]. Thus, a stage can be defined in two ways:

(1) A stage can be a function that takes an input data (of any type) and then returns an output data of the same type.

(2) A stage can be a class that inherits from pystream.Stage class, which is an abstract class. Methods __call__ and cleanup need to be defined there. Use this if the stage need a special cleanup procedure.

Parameters:
  • stage (StageCallable) – the stage to be added

  • name (Optional[str]) – the stage name. If None default stage name will be given, i.e. Stage_i where i is the stage sequence number. Defaults to None.

as_stage() Stage

Get the base pipeline executor, which can be treated as a stage. Useful if you want to create pipeline inside pipeline

Raises:

PipelineUndefined – raised if method serialize or parallelize has not been invoked.

Returns:

the base pipeline executor

Return type:

Stage

cleanup() None

Stop and cleanup the pipeline. Do nothing if the pipeline has not been initialized

forward(data: ~typing.Any = <pystream.data.pipeline_data.InputGeneratorRequest object>) bool

Forward data into the pipeline

Parameters:

data (Any) – the data. If data none, data generated from the input generator will be pushed instead.

Raises:

PipelineUndefined – raised if method serialize and parallelize has not been invoked.

Returns:

True if the data has been forwarded successfully, False otherwise.

Return type:

bool

get_profiles() Tuple[Dict[str, float], Dict[str, float]]

Get profiles data

Returns:

dictionary of the latency (in seconds) and throughput (in data/second) data respectively. The data is a dict where the key is the stage name.

Return type:

Tuple[Dict[str, float], Dict[str, float]]

get_results() Any

Get latest results from the pipeline

Raises:

PipelineUndefined – raised if method serialize or parallelize has not been invoked.

Returns:

the last data from the pipeline. The same data cannot be

read twice. If the new data is not available, None is returned.

Return type:

Any

parallelize(block_input: bool = True, input_timeout: float = 10, block_output: bool = False, output_timeout: float = 10) Pipeline

Turn the pipeline into independent stage pipeline. Each stage will live in different thread and work asynchronously. However, the data will be passed to the stages in the same order as defined

Parameters:
  • block_input (bool, optional) – Whether to set the forward method into blocking mode if the first stage is busy with the specified timeout in input_timeout. Defaults to True.

  • input_timeout (float, optional) – Blocking timeout for the forward method in seconds. Defaults to 10.

  • block_output (bool, optional) – Whether to set the get_results method into blocking mode if there is not available data from the last stage. Defaults to False.

  • output_timeout (float, optional) – Blocking timeout for the get_results method in seconds. Defaults to 10.

Returns:

this pipeline itself

Return type:

Pipeline

serialize() Pipeline

Turn the pipeline into serial pipeline. All stages will be run in sequential and blocking mode.

Returns:

this pipeline itself

Return type:

Pipeline

start_loop(period: float = 0.01) None

Start the pipeline in autonomous mode. Data generated from input generator will be pushed into the pipeline at each defined period of time.

Parameters:

period (float, optional) – Period to push the data. Defaults to 0.01.

stop_loop() None

Stop the autonomous operation of the pipeline

class pystream.Stage

Base class for the pipeline stage. As an example, stages that have a cleanup routine should be defined as a subclass of this class.

Useful property:
name (str):

the stage name, if not defined by the child instance, the name will be assigned automatically after the pipeline is constructed. Defaults to “”

abstract __call__(data: T) T

Main process of the stage.

Parameters:

data (T) – Input data

Returns:

output data, should be in the same type as input data

Return type:

T

abstract cleanup() None

Cleanup method for the stage. This method will be invoked during pipeline cleanup step

Functions

pystream.get_profiler_db_folder() str

Get the folder for the profiler SQLite database

Returns:

path to directory that contains the database

Return type:

str

pystream.set_profiler_db_folder(folder_path: str) None

Set the folder for the profiler SQLite database. This should be called before the Pipeline instantiation

Parameters:

folder_path (str) – path to directory that contains the database

Constants

pystream.logger = <Logger PyStream (INFO)>

The PyStream logger object from logging package

pystream.MAIN_PIPELINE_NAME = 'MainPipeline'

The name of main pipeline in profiling result