pypeliner.runners.base

Base runners module, contains base runner and base stream runner.

Classes

BaseProcessor

Base class for processors.

BaseReader

A base class for readers

RunnerConfiguration

A configuration class used to configure the runner behaviour.

BaseRunner

Base class for runners, can be used with bulk readers to run processors in

BaseStreamRunner

Base for runners that uses stream readers.

Functions

exec_timer([name, decimal_places])

When a function is wrapped with this decorator, it will time the function

Module Contents

class pypeliner.runners.base.BaseProcessor

Base class for processors.

Attributes:

PROCESSOR_NAME: verbose name for processor.

PROCESSOR_NAME = ''
__call__(state: Any) Any
__str__() str

Return str(self).

abstract process(state: Any) Any

process method defines the processing operations for the current processor. it is not recommended to be called directly unless the processor is used a standalone unit.

Args:

state: processor input state.

Returns:

processed state.

class pypeliner.runners.base.BaseReader

A base class for readers

abstract read() Any

An abstract method that defines reading logic.

Returns:

data to be processed.

class pypeliner.runners.base.RunnerConfiguration

A configuration class used to configure the runner behaviour.

pre_processors: A list of preprocessors that will run before each processor. post_processors: A list of preprocessors that will run after each processor. deep_copy_state: Whether to copy the state or not. Defaults to True. verbose: Whether to output running status on terminal. Defaults to False. run_timers: Whether to print the timing for each processor. Defaults to False. timers_decimal_places: when run_timers is True,

it will specify the number of decimal points for the measured time, if run_timers is False and this parameter is specified it will have no effect. Defaults to 3.

pre_processors: List[pypeliner.processors.base.BaseProcessor] | None = None
post_processors: List[pypeliner.processors.base.BaseProcessor] | None = None
deep_copy_state: bool = True
verbose: bool = False
run_timers: bool = False
timers_decimal_places: int = 3
pypeliner.runners.base.exec_timer(name: str = '', decimal_places: int = 3)

When a function is wrapped with this decorator, it will time the function execution.

Args:
name: verbose function name, if not specified it will use the

function’s __name__ magic attribute as a default value.

decimal_places: the number of decimal points for the measured time.

Returns:

same as the function return value.

class pypeliner.runners.base.BaseRunner(processors: List[pypeliner.processors.base.BaseProcessor], reader: pypeliner.readers.base.BaseReader, configuration: pypeliner.runner_configuration.RunnerConfiguration | None = None)

Base class for runners, can be used with bulk readers to run processors in a linear pipeline.

Args:

processors: processors objects to be run by this runner. reader: reader object to read the data. configuration: configuration object to be used.

decorate_processor(processor: pypeliner.processors.base.BaseProcessor) pypeliner.processors.base.BaseProcessor
display_verbose_message(message: str) str

Args method that will print a verbose message. Args:

message: The verbose message to be displayed.

Returns:

None

run_hooks(state, processors: List[pypeliner.processors.base.BaseProcessor]) Any
run_processors_loop(initial_state: Any) None

A method defines the loop for running the processors in a linear manner.

Returns:

Processed state.

run()

A method used to read the state and start the processing loop.

Returns:

processed state.

class pypeliner.runners.base.BaseStreamRunner(processors: List[pypeliner.processors.base.BaseProcessor], reader: pypeliner.readers.base.BaseReader, configuration: pypeliner.runner_configuration.RunnerConfiguration | None = None)

Bases: BaseRunner

Base for runners that uses stream readers.

run() Any

A method that uses the stream reader to update the state, run the processors loop and yield the result. ths method keeps reading until the reader stops providing any new updated state.

Returns:

processed state.