Skip to content

Python API

kpops.api

clean

clean(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    dry_run: bool = True,
    verbose: bool = True,
    parallel: bool = False,
)

Clean pipeline steps.

PARAMETER DESCRIPTION
pipeline_path

Path to pipeline definition yaml file.

TYPE: Path

dotenv

Paths to dotenv files.

TYPE: list[Path] | None DEFAULT: None

config

Path to the dir containing config.yaml files.

TYPE: Path DEFAULT: Path()

steps

Set of steps (components) to apply the command on.

TYPE: set[str] | None DEFAULT: None

filter_type

Whether steps should include/exclude the steps.

TYPE: FilterType DEFAULT: INCLUDE

dry_run

Whether to dry run the command or execute it.

TYPE: bool DEFAULT: True

environment

The environment to generate and deploy the pipeline to.

TYPE: str | None DEFAULT: None

verbose

Enable verbose printing.

TYPE: bool DEFAULT: True

parallel

Enable or disable parallel execution of pipeline steps.

TYPE: bool DEFAULT: False

Source code in kpops/api/__init__.py
def clean(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    dry_run: bool = True,
    verbose: bool = True,
    parallel: bool = False,
):
    """Clean pipeline steps.

    :param pipeline_path: Path to pipeline definition yaml file.
    :param dotenv: Paths to dotenv files.
    :param config: Path to the dir containing config.yaml files.
    :param steps: Set of steps (components) to apply the command on.
    :param filter_type: Whether `steps` should include/exclude the steps.
    :param dry_run: Whether to dry run the command or execute it.
    :param environment: The environment to generate and deploy the pipeline to.
    :param verbose: Enable verbose printing.
    :param parallel: Enable or disable parallel execution of pipeline steps.
    """
    pipeline = generate(
        pipeline_path=pipeline_path,
        dotenv=dotenv,
        config=config,
        steps=steps,
        filter_type=filter_type,
        environment=environment,
        verbose=verbose,
    )

    async def clean_runner(component: PipelineComponent):
        log_action("Clean", component)
        await component.clean(dry_run)

    async def async_clean():
        if parallel:
            pipeline_tasks = pipeline.build_execution_graph(clean_runner, reverse=True)
            await pipeline_tasks
        else:
            for component in reversed(pipeline.components):
                await clean_runner(component)

    asyncio.run(async_clean())

deploy

deploy(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    dry_run: bool = True,
    verbose: bool = True,
    parallel: bool = False,
)

Deploy pipeline steps.

PARAMETER DESCRIPTION
pipeline_path

Path to pipeline definition yaml file.

TYPE: Path

dotenv

Paths to dotenv files.

TYPE: list[Path] | None DEFAULT: None

config

Path to the dir containing config.yaml files.

TYPE: Path DEFAULT: Path()

steps

Set of steps (components) to apply the command on.

TYPE: set[str] | None DEFAULT: None

filter_type

Whether steps should include/exclude the steps.

TYPE: FilterType DEFAULT: INCLUDE

dry_run

Whether to dry run the command or execute it.

TYPE: bool DEFAULT: True

environment

The environment to generate and deploy the pipeline to.

TYPE: str | None DEFAULT: None

verbose

Enable verbose printing.

TYPE: bool DEFAULT: True

parallel

Enable or disable parallel execution of pipeline steps.

TYPE: bool DEFAULT: False

Source code in kpops/api/__init__.py
def deploy(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    dry_run: bool = True,
    verbose: bool = True,
    parallel: bool = False,
):
    """Deploy pipeline steps.

    :param pipeline_path: Path to pipeline definition yaml file.
    :param dotenv: Paths to dotenv files.
    :param config: Path to the dir containing config.yaml files.
    :param steps: Set of steps (components) to apply the command on.
    :param filter_type: Whether `steps` should include/exclude the steps.
    :param dry_run: Whether to dry run the command or execute it.
    :param environment: The environment to generate and deploy the pipeline to.
    :param verbose: Enable verbose printing.
    :param parallel: Enable or disable parallel execution of pipeline steps.
    """
    pipeline = generate(
        pipeline_path=pipeline_path,
        dotenv=dotenv,
        config=config,
        steps=steps,
        filter_type=filter_type,
        environment=environment,
        verbose=verbose,
    )

    async def deploy_runner(component: PipelineComponent):
        log_action("Deploy", component)
        await component.deploy(dry_run)

    async def async_deploy():
        if parallel:
            pipeline_tasks = pipeline.build_execution_graph(deploy_runner)
            await pipeline_tasks
        else:
            for component in pipeline.components:
                await deploy_runner(component)

    asyncio.run(async_deploy())

destroy

destroy(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    dry_run: bool = True,
    verbose: bool = True,
    parallel: bool = False,
)

Destroy pipeline steps.

PARAMETER DESCRIPTION
pipeline_path

Path to pipeline definition yaml file.

TYPE: Path

dotenv

Paths to dotenv files.

TYPE: list[Path] | None DEFAULT: None

config

Path to the dir containing config.yaml files.

TYPE: Path DEFAULT: Path()

steps

Set of steps (components) to apply the command on.

TYPE: set[str] | None DEFAULT: None

filter_type

Whether steps should include/exclude the steps.

TYPE: FilterType DEFAULT: INCLUDE

dry_run

Whether to dry run the command or execute it.

TYPE: bool DEFAULT: True

environment

The environment to generate and deploy the pipeline to.

TYPE: str | None DEFAULT: None

verbose

Enable verbose printing.

TYPE: bool DEFAULT: True

parallel

Enable or disable parallel execution of pipeline steps.

TYPE: bool DEFAULT: False

Source code in kpops/api/__init__.py
def destroy(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    dry_run: bool = True,
    verbose: bool = True,
    parallel: bool = False,
):
    """Destroy pipeline steps.

    :param pipeline_path: Path to pipeline definition yaml file.
    :param dotenv: Paths to dotenv files.
    :param config: Path to the dir containing config.yaml files.
    :param steps: Set of steps (components) to apply the command on.
    :param filter_type: Whether `steps` should include/exclude the steps.
    :param dry_run: Whether to dry run the command or execute it.
    :param environment: The environment to generate and deploy the pipeline to.
    :param verbose: Enable verbose printing.
    :param parallel: Enable or disable parallel execution of pipeline steps.
    """
    pipeline = generate(
        pipeline_path=pipeline_path,
        dotenv=dotenv,
        config=config,
        steps=steps,
        filter_type=filter_type,
        environment=environment,
        verbose=verbose,
    )

    async def destroy_runner(component: PipelineComponent):
        log_action("Destroy", component)
        await component.destroy(dry_run)

    async def async_destroy():
        if parallel:
            pipeline_tasks = pipeline.build_execution_graph(
                destroy_runner, reverse=True
            )
            await pipeline_tasks
        else:
            for component in reversed(pipeline.components):
                await destroy_runner(component)

    asyncio.run(async_destroy())

generate

generate(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    verbose: bool = False,
    operation_mode: OperationMode = OperationMode.MANAGED,
) -> Pipeline

Generate enriched pipeline representation.

PARAMETER DESCRIPTION
pipeline_path

Path to pipeline definition yaml file.

TYPE: Path

dotenv

Paths to dotenv files.

TYPE: list[Path] | None DEFAULT: None

config

Path to the dir containing config.yaml files.

TYPE: Path DEFAULT: Path()

steps

Set of steps (components) to apply the command on.

TYPE: set[str] | None DEFAULT: None

filter_type

Whether steps should include/exclude the steps.

TYPE: FilterType DEFAULT: INCLUDE

environment

The environment to generate and deploy the pipeline to.

TYPE: str | None DEFAULT: None

verbose

Enable verbose printing.

TYPE: bool DEFAULT: False

operation_mode

How KPOps should operate.

TYPE: OperationMode DEFAULT: MANAGED

RETURNS DESCRIPTION
Pipeline

Generated Pipeline object.

Source code in kpops/api/__init__.py
def generate(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    verbose: bool = False,
    operation_mode: OperationMode = OperationMode.MANAGED,
) -> Pipeline:
    """Generate enriched pipeline representation.

    :param pipeline_path: Path to pipeline definition yaml file.
    :param dotenv: Paths to dotenv files.
    :param config: Path to the dir containing config.yaml files.
    :param steps: Set of steps (components) to apply the command on.
    :param filter_type: Whether `steps` should include/exclude the steps.
    :param environment: The environment to generate and deploy the pipeline to.
    :param verbose: Enable verbose printing.
    :param operation_mode: How KPOps should operate.
    :return: Generated `Pipeline` object.
    """
    kpops_config = KpopsConfig.create(
        config, dotenv, environment, verbose, operation_mode
    )
    pipeline = _create_pipeline(pipeline_path, kpops_config, environment)
    log.info(f"Picked up pipeline '{pipeline_path.parent.name}'")
    if steps:
        component_names = steps
        log.debug(
            f"KPOPS_PIPELINE_STEPS is defined with values: {component_names} and filter type of {filter_type.value}"
        )

        predicate = filter_type.create_default_step_names_filter_predicate(
            component_names
        )
        pipeline.filter(predicate)
        log.info(f"Filtered pipeline:\n{pipeline.step_names}")
    return pipeline

init

init(path: Path, config_include_optional: bool = False)

Initiate a default empty project.

PARAMETER DESCRIPTION
path

Directory in which the project should be initiated.

TYPE: Path

config_include_optional

Whether to include non-required settings in the generated config file.

TYPE: bool DEFAULT: False

Source code in kpops/api/__init__.py
def init(
    path: Path,
    config_include_optional: bool = False,
):
    """Initiate a default empty project.

    :param path: Directory in which the project should be initiated.
    :param config_include_optional: Whether to include non-required settings
        in the generated config file.
    """
    if not path.exists():
        path.mkdir(parents=False)
    elif next(path.iterdir(), False):
        log.warning("Please provide a path to an empty directory.")
        return
    init_project(path, config_include_optional)

manifest_clean

manifest_clean(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    verbose: bool = True,
    operation_mode: OperationMode = OperationMode.MANIFEST,
) -> Iterator[tuple[KubernetesManifest, ...]]
Source code in kpops/api/__init__.py
def manifest_clean(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    verbose: bool = True,
    operation_mode: OperationMode = OperationMode.MANIFEST,
) -> Iterator[tuple[KubernetesManifest, ...]]:
    pipeline = generate(
        pipeline_path=pipeline_path,
        dotenv=dotenv,
        config=config,
        steps=steps,
        filter_type=filter_type,
        environment=environment,
        verbose=verbose,
        operation_mode=operation_mode,
    )
    for component in pipeline.components:
        resource = component.manifest_clean()
        yield resource

manifest_deploy

manifest_deploy(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    verbose: bool = True,
    operation_mode: OperationMode = OperationMode.MANIFEST,
) -> Iterator[tuple[KubernetesManifest, ...]]
Source code in kpops/api/__init__.py
def manifest_deploy(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    verbose: bool = True,
    operation_mode: OperationMode = OperationMode.MANIFEST,
) -> Iterator[tuple[KubernetesManifest, ...]]:
    pipeline = generate(
        pipeline_path=pipeline_path,
        dotenv=dotenv,
        config=config,
        steps=steps,
        filter_type=filter_type,
        environment=environment,
        verbose=verbose,
        operation_mode=operation_mode,
    )
    for component in pipeline.components:
        resource = component.manifest_deploy()
        yield resource

manifest_destroy

manifest_destroy(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    verbose: bool = True,
    operation_mode: OperationMode = OperationMode.MANIFEST,
) -> Iterator[tuple[KubernetesManifest, ...]]
Source code in kpops/api/__init__.py
def manifest_destroy(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    verbose: bool = True,
    operation_mode: OperationMode = OperationMode.MANIFEST,
) -> Iterator[tuple[KubernetesManifest, ...]]:
    pipeline = generate(
        pipeline_path=pipeline_path,
        dotenv=dotenv,
        config=config,
        steps=steps,
        filter_type=filter_type,
        environment=environment,
        verbose=verbose,
        operation_mode=operation_mode,
    )
    for component in pipeline.components:
        resource = component.manifest_destroy()
        yield resource

manifest_reset

manifest_reset(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    verbose: bool = True,
    operation_mode: OperationMode = OperationMode.MANIFEST,
) -> Iterator[tuple[KubernetesManifest, ...]]
Source code in kpops/api/__init__.py
def manifest_reset(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    verbose: bool = True,
    operation_mode: OperationMode = OperationMode.MANIFEST,
) -> Iterator[tuple[KubernetesManifest, ...]]:
    pipeline = generate(
        pipeline_path=pipeline_path,
        dotenv=dotenv,
        config=config,
        steps=steps,
        filter_type=filter_type,
        environment=environment,
        verbose=verbose,
        operation_mode=operation_mode,
    )
    for component in pipeline.components:
        resource = component.manifest_reset()
        yield resource

reset

reset(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    dry_run: bool = True,
    verbose: bool = True,
    parallel: bool = False,
)

Reset pipeline steps.

PARAMETER DESCRIPTION
pipeline_path

Path to pipeline definition yaml file.

TYPE: Path

dotenv

Paths to dotenv files.

TYPE: list[Path] | None DEFAULT: None

config

Path to the dir containing config.yaml files.

TYPE: Path DEFAULT: Path()

steps

Set of steps (components) to apply the command on.

TYPE: set[str] | None DEFAULT: None

filter_type

Whether steps should include/exclude the steps.

TYPE: FilterType DEFAULT: INCLUDE

dry_run

Whether to dry run the command or execute it.

TYPE: bool DEFAULT: True

environment

The environment to generate and deploy the pipeline to.

TYPE: str | None DEFAULT: None

verbose

Enable verbose printing.

TYPE: bool DEFAULT: True

parallel

Enable or disable parallel execution of pipeline steps.

TYPE: bool DEFAULT: False

Source code in kpops/api/__init__.py
def reset(
    pipeline_path: Path,
    dotenv: list[Path] | None = None,
    config: Path = Path(),
    steps: set[str] | None = None,
    filter_type: FilterType = FilterType.INCLUDE,
    environment: str | None = None,
    dry_run: bool = True,
    verbose: bool = True,
    parallel: bool = False,
):
    """Reset pipeline steps.

    :param pipeline_path: Path to pipeline definition yaml file.
    :param dotenv: Paths to dotenv files.
    :param config: Path to the dir containing config.yaml files.
    :param steps: Set of steps (components) to apply the command on.
    :param filter_type: Whether `steps` should include/exclude the steps.
    :param dry_run: Whether to dry run the command or execute it.
    :param environment: The environment to generate and deploy the pipeline to.
    :param verbose: Enable verbose printing.
    :param parallel: Enable or disable parallel execution of pipeline steps.
    """
    pipeline = generate(
        pipeline_path=pipeline_path,
        dotenv=dotenv,
        config=config,
        steps=steps,
        filter_type=filter_type,
        environment=environment,
        verbose=verbose,
    )

    async def reset_runner(component: PipelineComponent):
        log_action("Reset", component)
        await component.reset(dry_run)

    async def async_reset():
        if parallel:
            pipeline_tasks = pipeline.build_execution_graph(reset_runner, reverse=True)
            await pipeline_tasks
        else:
            for component in reversed(pipeline.components):
                await reset_runner(component)

    asyncio.run(async_reset())

kpops.pipeline.Pipeline

Bases: BaseModel

Pipeline representation.

Source code in kpops/pipeline/__init__.py
class Pipeline(BaseModel):
    """Pipeline representation."""

    _component_index: dict[str, PipelineComponent] = {}
    _graph: nx.DiGraph = nx.DiGraph()

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @property
    def step_names(self) -> list[str]:
        return [step.name for step in self.components]

    @computed_field(title="Components")
    @property
    def components(self) -> list[SerializeAsAny[PipelineComponent]]:
        return list(self._component_index.values())

    @property
    def last(self) -> PipelineComponent:
        return self.components[-1]

    def add(self, component: PipelineComponent) -> None:
        if self._component_index.get(component.id) is not None:
            msg = (
                f"Pipeline steps must have unique id, '{component.id}' already exists."
            )
            raise ValidationError(msg)
        self._component_index[component.id] = component
        self.__add_to_graph(component)

    def remove(self, component_id: str) -> None:
        self._component_index.pop(component_id)

    def get(self, component_id: str) -> PipelineComponent | None:
        return self._component_index.get(component_id)

    def find(self, predicate: ComponentFilterPredicate) -> Iterator[PipelineComponent]:
        """Find pipeline components matching a custom predicate.

        :param predicate: Filter function,
            returns boolean value whether the component should be kept or removed
        :returns: Iterator of components matching the predicate
        """
        for component in self.components:
            if predicate(component):
                yield component

    def filter(self, predicate: ComponentFilterPredicate) -> None:
        """Filter pipeline components using a custom predicate.

        :param predicate: Filter function,
            returns boolean value whether the component should be kept or removed
        """
        for component in self.components:
            # filter out components not matching the predicate
            if not predicate(component):
                self.remove(component.id)

    def validate(self) -> None:  # pyright: ignore [reportIncompatibleMethodOverride]
        self.__validate_graph()

    def to_yaml(self) -> str:
        return yaml.dump(
            self.model_dump(mode="json", by_alias=True, exclude_none=True)[
                "components"
            ],
            Dumper=CustomSafeDumper,
        )

    def build_execution_graph(
        self,
        runner: Callable[[PipelineComponent], Coroutine[Any, Any, None]],
        /,
        reverse: bool = False,
    ) -> Awaitable[None]:
        async def run_parallel_tasks(
            coroutines: list[Coroutine[Any, Any, None]],
        ) -> None:
            tasks: list[asyncio.Task[None]] = []
            for coro in coroutines:
                tasks.append(asyncio.create_task(coro))
            await asyncio.gather(*tasks)

        async def run_graph_tasks(pending_tasks: list[Awaitable[None]]) -> None:
            for pending_task in pending_tasks:
                await pending_task

        graph: nx.DiGraph = self._graph.copy()  # pyright: ignore[reportAssignmentType, reportGeneralTypeIssues] imprecise type hint in networkx

        # We add an extra node to the graph, connecting all the leaf nodes to it
        # in that way we make this node the root of the graph, avoiding backtracking
        root_node = "root_node_bfs"
        graph.add_node(root_node)

        for node in graph:
            predecessors = list(graph.predecessors(node))
            if not predecessors:
                graph.add_edge(root_node, node)

        layers_graph: list[list[str]] = list(nx.bfs_layers(graph, root_node))

        sorted_tasks: list[Awaitable[None]] = []
        for layer in layers_graph[1:]:
            if parallel_tasks := self.__get_parallel_tasks_from(layer, runner):
                sorted_tasks.append(run_parallel_tasks(parallel_tasks))

        if reverse:
            sorted_tasks.reverse()

        return run_graph_tasks(sorted_tasks)

    def __getitem__(self, component_id: str) -> PipelineComponent:
        try:
            return self._component_index[component_id]
        except KeyError as exc:
            msg = f"Component {component_id} not found"
            raise ValueError(msg) from exc

    def __bool__(self) -> bool:
        return bool(self._component_index)

    def __iter__(self) -> Iterator[PipelineComponent]:  # pyright: ignore [reportIncompatibleMethodOverride]
        yield from self._component_index.values()

    def __len__(self) -> int:
        return len(self.components)

    def __add_to_graph(self, component: PipelineComponent):
        self._graph.add_node(component.id)

        for input_topic in component.inputs:
            self.__add_input(input_topic.id, component.id)

        for output_topic in component.outputs:
            self.__add_output(output_topic.id, component.id)

    def __add_output(self, topic_id: str, source: str) -> None:
        self._graph.add_node(topic_id)
        self._graph.add_edge(source, topic_id)

    def __add_input(self, topic_id: str, target: str) -> None:
        self._graph.add_node(topic_id)
        self._graph.add_edge(topic_id, target)

    def __get_parallel_tasks_from(
        self,
        layer: list[str],
        runner: Callable[[PipelineComponent], Coroutine[Any, Any, None]],
    ) -> list[Coroutine[Any, Any, None]]:
        def gen_parallel_tasks():
            for node_in_layer in layer:
                # check if component, skip topics
                if (component := self._component_index.get(node_in_layer)) is not None:
                    yield runner(component)

        return list(gen_parallel_tasks())

    def __validate_graph(self) -> None:
        if not nx.is_directed_acyclic_graph(self._graph):
            msg = "Pipeline is not a valid DAG."
            raise ValueError(msg)

components property

components: list[SerializeAsAny[PipelineComponent]]

last property

last: PipelineComponent

step_names property

step_names: list[str]

add

add(component: PipelineComponent) -> None
Source code in kpops/pipeline/__init__.py
def add(self, component: PipelineComponent) -> None:
    if self._component_index.get(component.id) is not None:
        msg = (
            f"Pipeline steps must have unique id, '{component.id}' already exists."
        )
        raise ValidationError(msg)
    self._component_index[component.id] = component
    self.__add_to_graph(component)

build_execution_graph

build_execution_graph(
    runner: Callable[
        [PipelineComponent], Coroutine[Any, Any, None]
    ],
    /,
    reverse: bool = False,
) -> Awaitable[None]
Source code in kpops/pipeline/__init__.py
def build_execution_graph(
    self,
    runner: Callable[[PipelineComponent], Coroutine[Any, Any, None]],
    /,
    reverse: bool = False,
) -> Awaitable[None]:
    async def run_parallel_tasks(
        coroutines: list[Coroutine[Any, Any, None]],
    ) -> None:
        tasks: list[asyncio.Task[None]] = []
        for coro in coroutines:
            tasks.append(asyncio.create_task(coro))
        await asyncio.gather(*tasks)

    async def run_graph_tasks(pending_tasks: list[Awaitable[None]]) -> None:
        for pending_task in pending_tasks:
            await pending_task

    graph: nx.DiGraph = self._graph.copy()  # pyright: ignore[reportAssignmentType, reportGeneralTypeIssues] imprecise type hint in networkx

    # We add an extra node to the graph, connecting all the leaf nodes to it
    # in that way we make this node the root of the graph, avoiding backtracking
    root_node = "root_node_bfs"
    graph.add_node(root_node)

    for node in graph:
        predecessors = list(graph.predecessors(node))
        if not predecessors:
            graph.add_edge(root_node, node)

    layers_graph: list[list[str]] = list(nx.bfs_layers(graph, root_node))

    sorted_tasks: list[Awaitable[None]] = []
    for layer in layers_graph[1:]:
        if parallel_tasks := self.__get_parallel_tasks_from(layer, runner):
            sorted_tasks.append(run_parallel_tasks(parallel_tasks))

    if reverse:
        sorted_tasks.reverse()

    return run_graph_tasks(sorted_tasks)

filter

filter(predicate: ComponentFilterPredicate) -> None

Filter pipeline components using a custom predicate.

PARAMETER DESCRIPTION
predicate

Filter function, returns boolean value whether the component should be kept or removed

TYPE: ComponentFilterPredicate

Source code in kpops/pipeline/__init__.py
def filter(self, predicate: ComponentFilterPredicate) -> None:
    """Filter pipeline components using a custom predicate.

    :param predicate: Filter function,
        returns boolean value whether the component should be kept or removed
    """
    for component in self.components:
        # filter out components not matching the predicate
        if not predicate(component):
            self.remove(component.id)

find

find(
    predicate: ComponentFilterPredicate,
) -> Iterator[PipelineComponent]

Find pipeline components matching a custom predicate.

PARAMETER DESCRIPTION
predicate

Filter function, returns boolean value whether the component should be kept or removed

TYPE: ComponentFilterPredicate

RETURNS DESCRIPTION
Iterator[PipelineComponent]

Iterator of components matching the predicate

Source code in kpops/pipeline/__init__.py
def find(self, predicate: ComponentFilterPredicate) -> Iterator[PipelineComponent]:
    """Find pipeline components matching a custom predicate.

    :param predicate: Filter function,
        returns boolean value whether the component should be kept or removed
    :returns: Iterator of components matching the predicate
    """
    for component in self.components:
        if predicate(component):
            yield component

get

get(component_id: str) -> PipelineComponent | None
Source code in kpops/pipeline/__init__.py
def get(self, component_id: str) -> PipelineComponent | None:
    return self._component_index.get(component_id)

remove

remove(component_id: str) -> None
Source code in kpops/pipeline/__init__.py
def remove(self, component_id: str) -> None:
    self._component_index.pop(component_id)

to_yaml

to_yaml() -> str
Source code in kpops/pipeline/__init__.py
def to_yaml(self) -> str:
    return yaml.dump(
        self.model_dump(mode="json", by_alias=True, exclude_none=True)[
            "components"
        ],
        Dumper=CustomSafeDumper,
    )

validate

validate() -> None
Source code in kpops/pipeline/__init__.py
def validate(self) -> None:  # pyright: ignore [reportIncompatibleMethodOverride]
    self.__validate_graph()