PyStream API

Staged Pipeline

class pystream.Pipeline(input_generator: Callable[[], Any] | None = None, use_profiler: bool = False)

The pipeline constructor

Parameters:
  • input_generator (Optional[Callable[[], Any]], optional) – Function that will be used to generate input data if you want the pipeline to run autonomously. If None, the input needs to be given by invoking “forward” method. Defaults to None.

  • use_profiler (bool, optional) – Whether to implement profiler to the pipeline. Defaults to False.

add(stage: Callable[[T], T] | Stage, name: str | None = None) None

Add a stage into the pipeline

The stage is in type of StageCallable, which is Union[Callable[[T], T], Stage]. Thus, a stage can be defined in two ways:

(1) A stage can be a function that takes an input data (of any type) and then returns an output data of the same type.

(2) A stage can be a class that inherits from pystream.Stage class, which is an abstract class. Methods __call__ and cleanup need to be defined there. Use this if the stage need a special cleanup procedure.

Parameters:
  • stage (StageCallable) – the stage to be added

  • name (Optional[str]) – the stage name. If None default stage name will be given, i.e. Stage_i where i is the stage sequence number. Defaults to None.

cleanup() None

Stop and cleanup the pipeline. Do nothing if the pipeline has not been initialized

forward(data: ~typing.Any = <pystream.data.pipeline_data.InputGeneratorRequest object>) bool

Forward data into the pipeline

Parameters:

data (Any) – the data. If data none, data generated from the input generator will be pushed instead.

Raises:

PipelineUndefined – raised if method serialize and parallelize has not been invoked.

Returns:

True if the data has been forwarded successfully, False otherwise.

Return type:

bool

get_profiles() Tuple[Dict[str, float], Dict[str, float]]

Get profiles data

Returns:

dictionary of the latency and throughput data respectively. The data is a dict where the key is the stage name

Return type:

Tuple[Dict[str, float], Dict[str, float]]

get_results() Any

Get latest results from the pipeline

Raises:

PipelineUndefined – raised if method serialize and parallelize has not been invoked.

Returns:

the last data from the pipeline. The same data cannot be

read twice. If the new data is not available, None is returned.

Return type:

Any

parallelize(block_input: bool = True, input_timeout: float = 10) Pipeline

Turn the pipeline into independent stage pipeline. Each stage will live in different thread and work asynchronously. However, the data will be passed to the stages in the same order as defined

Parameters:
  • block_input (bool, optional) – Whether to set the forward method into blocking mode if the first stage is busy with the specified timeout in input_timeout. Defaults to True.

  • input_timeout (float, optional) – Blocking timeout for the forward method in seconds. Defaults to 10.

Returns:

this pipeline itself

Return type:

Pipeline

serialize() Pipeline

Turn the pipeline into serial pipeline. All stages will be run in sequential and blocking mode.

Returns:

this pipeline itself

Return type:

Pipeline

start_loop(period: float = 0.01) None

Start the pipeline in autonomous mode. Data generated from input generator will be pushed into the pipeline at each defined period of time.

Parameters:

period (float, optional) – Period to push the data. Defaults to 0.01.

stop_loop() None

Stop the autonomous operation of the pipeline

class pystream.Stage

Base class for the pipeline stage. As an example, stages that have a cleanup routine should be defined as a subclass of this class.

Useful property:
name (str): the stage name, if not defined by the child instance,

the name will be assigned automatically after the pipeline is constructed. Defaults to “”

abstract __call__(data: T) T

Main process of the stage.

Parameters:

data (T) – Input data

Returns:

output data, should be in the same type as input data

Return type:

T

abstract cleanup() None

Cleanup method for the stage. This method will be invoked during pipeline cleanup step

Functional Pipeline

pystream.functional.func_serial(funcs: List[Callable[[Any], Any]]) Callable[[Any], Any]

Create a function made of functions that are executed in serial

Parameters:

funcs (List[Callable[[Any], Any]]) – the list of functions to be executed. It only takes one argument which supposed to be anything and returns the processing results. The output of function i will be used as the input argument for function i + 1.

Returns:

The returned function to execute the serial functions. It takes one argument ‘x’ which supposed to be the input data. That data will be passed to funcs[0]. This function will return the output of the last function in funcs.

Return type:

Callable[[Any], Any]

pystream.functional.func_parallel_thread(funcs: ~typing.List[~typing.Callable[[~typing.Any], ~typing.Any]], executor: ~concurrent.futures.thread.ThreadPoolExecutor = <concurrent.futures.thread.ThreadPoolExecutor object>) Callable[[Any], Any]

Create a function made of functions that are executed in parallel using ThreadPoolExecutor from concurrent module.

The input functions must only take one mandatory data argument. Note that in the wrapper function returned by this functions, this data will be passed and shared to all of the input functions. And the same object will be returned by the wrapper. Therefore, all input functions must modify the input data inplace to get meaningful results.

If no executor is provided, a shared default ThreadPoolExecutor is used. This executor will not be killed by shutdown method. Therefore, make sure that the functions passed here can exit properly. To be safe, please pass your own executor.

Parameters:
  • funcs (List[Callable[[Any], Any]]) – the list of functions to be executed. It only takes one argument which supposed to be anything. The output of the functions will not be used.

  • executor (ThreadPoolExecutor, optional) – ThreadPoolExecutor instance from concurrent.futures that handles the threads. By default, executor managed by this package will be used.

Returns:

The returned function to execute the parallelized input functions. It takes one argument ‘x’ which will be passed to the input functions and returns the same object ‘x’ as the input after execution.

Return type:

Callable[[Any], Any]