Skip to content

KFP Cookbook⚓︎

Pipeline Examples⚓︎

Simple example of a pipeline made up of two lightweight components:

from kfp import dsl
from kfp.dsl import Output, Dataset, Input, component, pipeline
from kfp import compiler
from kfp.client import Client



@component(packages_to_install=['pandas', 'pyarrow', 'numpy'])
def write_random(name: str, out: Output[Dataset]):
    import pandas as pd
    import numpy as np

    df = pd.DataFrame({name: np.random.random(100)})
    df.to_parquet(out.path)

@component(packages_to_install=['pandas', 'pyarrow'])
def calc_sum(data_in: Input[Dataset]) -> float:
    import pandas as pd
    df = pd.read_parquet(data_in.path)
    return df.sum().sum()

@pipeline
def example_pipeline(column_name: str) -> float:
    task_random = write_random(name=column_name)
    task_sum = calc_sum(data_in=task_random.outputs['out'])
    return task_sum.output


compiler.Compiler().compile(example_pipeline, 'pipeline.yaml')


client = Client()
run = client.create_run_from_pipeline_package(
    'pipeline.yaml',
    arguments={
        'column_name': 'random',
    },
)

How do I save secret credentials in my pipelines?⚓︎

You can use kfp-kubernetes to mount a secret as a volume or use it as an environment variable. Make sure to install it first, e.g. pip install kfp[kubernetes] .

Example for using a secret as environment variables (this way, it won't end up in the pipeline yaml):

from kfp import dsl
from kfp import kubernetes
import kfp

@dsl.component
def comp():
    import os
    for key in ['key_id', 'key_secret']:
        print(key, os.environ[key])


@dsl.pipeline
def pipeline_secret_env():
    task = comp()
    kubernetes.use_secret_as_env(
        task,
        secret_name='my-secret',
        secret_key_to_env={'user': 'key_id', 'password': 'key_secret'})

client = kfp.Client()
client.create_run_from_pipeline_func(pipeline_secret_env)

where the matching secret was created with kubectl create secret generic my-secret -n my-namespace --from-literal=password=test1234 --from-literal=user=user@example.com (which creates a secret with two keys user and password in the namespace my-namespace).

I want to get the run ID in a component⚓︎

Unfortunately, in kfp v2 the environment variable KFP_RUN_ID is not set in the container. You can use the following workaround (either copy the complete component or use the relevant part):

from kfp import dsl

@dsl.component
def get_run_id(name: str) -> str:
    from ast import literal_eval
    import os
    import json

    argo_template = json.loads(os.getenv('ARGO_TEMPLATE'))

    input_parameters = argo_template['inputs']['parameters'][0]
    input_params = literal_eval(input_parameters['value'])
    command_list = input_params['containers'][0]['command']

    # get the index of '--run_id'
    index = command_list.index('--run_id')

    # get the value after '--run_id'
    run_id = command_list[index + 1]

    print(f'run_id: {run_id}')

    return run_id
kfp/pipelines#10496 is going to make this process easier, merging was completed here. This functionality will only be available when KFP SDK > 2.7.0 is released. Environment variable KFP_RUN_ID can then be set inside the pipeline component as follows:
from kfp import dsl, kubernetes


@dsl.component
def comp():
    import os
    print(os.environ["KFP_RUN_ID"])


@dsl.pipeline
def example_pipeline():
    task = comp()
    kubernetes.use_field_path_as_env(
        task,
        env_name="KFP_RUN_ID",
        field_path="metadata.labels['pipeline/runid']"
    )

MinIO⚓︎

I want to write a pandas DataFrame into MinIO⚓︎

If key and secret are defined as environment variables (as they are in notebooks), you can use this:

import os

DATA_PATH = 'bucket_name/folder/file.parquet'
storage_options={
        "key": os.environ.get('AWS_ACCESS_KEY_ID'),
        "secret": os.environ.get('AWS_SECRET_ACCESS_KEY'),
        "client_kwargs": {"endpoint_url": 'http://minio.minio'}}

df.to_parquet(f's3://{DATA_PATH}', storage_options=storage_options)

Make sure to use s3:// as the protocol (not minio://).

You can populate environment variables from a secret.

from kfp import dsl
import kfp
from kfp import kubernetes


@dsl.component(packages_to_install=['pandas', 'pyarrow', 'fsspec', 's3fs'])
def comp():
    import os
    import pandas as pd

    df = pd.util.testing.makeDataFrame()

    DATA_PATH = 'integration-tests-temporary/from_pipeline.parquet'
    storage_options={
            "key": os.environ.get('AWS_ACCESS_KEY_ID'),
            "secret": os.environ.get('AWS_SECRET_ACCESS_KEY'),
            "client_kwargs": {"endpoint_url": 'http://minio.minio'}}

    df.to_parquet(f's3://{DATA_PATH}', storage_options=storage_options)


@dsl.pipeline
def pipeline_secret_env():
    task = comp()
    kubernetes.use_secret_as_env(
        task,
        secret_name='s3creds',
        secret_key_to_env={
            'AWS_ACCESS_KEY_ID': 'AWS_ACCESS_KEY_ID',
            'AWS_SECRET_ACCESS_KEY': 'AWS_SECRET_ACCESS_KEY',
        }
    )


client = kfp.Client()
client.create_run_from_pipeline_func(pipeline_secret_env)

I want to read a pandas DataFrame from MinIO⚓︎

DATA_PATH = 'bucket_name/folder/file.parquet'
storage_options={
        "key": os.getenv('AWS_ACCESS_KEY_ID'),
        "secret": os.getenv('AWS_SECRET_ACCESS_KEY'),
        "client_kwargs": {"endpoint_url": 'http://minio.minio'}}
df = pd.read_parquet(f's3://{DATA_PATH}', storage_options=storage_options)
df.head()

I want to run a pipeline daily, but some parts should only be calculated once a month⚓︎

Question: Getting data from a database is very costly, and the data gets updated only once a month anyway. How can I run my pipeline daily, but make sure that data is pulled from the pipeline only once a month?

Answer: Make your components that should run only occasionally accept another input argument that is only used for cache in-validation. Then create a new component, whose output only changes when needed, e.g. once a new month is reached.

Example⚓︎

which has one component which is re-run every minute and one that is re-run every hour:

import kfp
from kfp import dsl, compiler, Client
from kfp.dsl import Output, Metrics, component

@dsl.component
def add(a: float, b: float) -> float:
    return a + b

@dsl.component
def log(a: float, b: float, c: float, metric: Output[Metrics]):
    metric.log_metric("a", float(a))
    metric.log_metric("b", float(b))
    metric.log_metric("c", float(c))

@dsl.component
def get_data(cache: str) -> float:
    """simulate an expensive operation getting or crunching some data

    :arg cache: is used for cache invalidation
    """
    import random
    return int(1000 * random.random())

@dsl.component
def generate_minute() -> str:
    import datetime as dt
    return dt.datetime.now().strftime('%Y-%m-%dT%H:%M')

@dsl.component
def generate_hour() -> str:
    import datetime as dt
    return dt.datetime.now().strftime('%Y-%m-%dT%H:XX')

@dsl.component
def generate_day() -> str:
    import datetime as dt
    return dt.datetime.now().strftime('%Y-%m-%d')

@dsl.component
def generate_month() -> str:
    import datetime as dt
    return dt.datetime.now().strftime('%Y-%m-XX')

@dsl.pipeline
def test_pipeline():
    # setting enable_caching=False is important here for this to work,
    # otherwise minute and hour are also always cached
    minute = generate_minute().set_caching_options(enable_caching=False)
    hour = generate_hour().set_caching_options(enable_caching=False)

    data1 = get_data(cache=minute.output)
    data2 = get_data(cache=hour.output)

    result = add(a=data1.output, b=data2.output)

    log(a=data1.output, b=data2.output, c=result.output)

compiler.Compiler().compile(test_pipeline, 'pipeline.yaml')
run = Client().create_run_from_pipeline_package('pipeline.yaml')

Make sure not to set enable_cache=True or enable_cache=False when creating the run.

I want to get notified when my pipeline finished/failed⚓︎

ExitHandler can be used to run a component when the pipeline is exiting.

An example that uses a Teams webhook to send a message to a channel (replace the URL):

from kfp.dsl import container_component, ContainerSpec, PipelineTaskFinalStatus
import json

@container_component
def notify(
    title: str = 'Pipeline Status',
    message: str = 'Your Pipeline finished',
):
    return ContainerSpec(
      image='alpine/curl',
      command=[
          'curl',
          '-X',
          'POST',
          '-H' ,
          'Content-Type: application/json',
          '-d',
          json.dumps({"title": str(title), "text": str(message)}),
          'https://justaddai.webhook.office.com/webhookb2/XXXXXX',
      ]
  )

@dsl.pipeline
def test_pipeline(
    a: float = 42,
    b: float = 23,
):
    notifier = notify(message='test')
    with dsl.ExitHandler(exit_task=notifier):
        c = add(a=a, b=b)
        d = add(a=c.output, b=42.0)
        notify(title='pipeline')
        log(result=d.output)

client = Client()
compiler.Compiler().compile(test_pipeline, 'pipeline.yaml')
run = client.create_run_from_pipeline_package('pipeline.yaml', enable_caching=False)

Getting the status of a pipeline is not yet supported by the open source KFP backend

ExitHandlers only support tasks with .ignore_upstream_failure() since KFP SDK v2.3 (only released with kubeflow 1.8)

I want to start pipelines from my local machine⚓︎

You can use also start a pipeline from outside of the cluster (or use the kfp pipeline client in any other way). For that to work, you need a valid session cookie to authenticate. The following code snippet shows how to start the pipeline:

from kfp.client import Client
import os

domain = os.getenv('KUBEFLOW_DOMAIN')
username = os.getenv('KUBEFLOW_USERNAME')
password = os.getenv('KUBEFLOW_PASSWORD')
namespace = os.environ.get('KUBEFLOW_NAMESPACE', None) or username.split("@")[0].replace(".", "-")

auth_session = get_istio_auth_session(
    url=f"https://{domain}/",
    username=username,
    password=password,
)

client = Client(host=f"{domain}/pipeline", namespace=namespace, cookies=auth_session["session_cookie"])
print(client.list_experiments())

If you are using a self-signed certificate, you need to add verify_ssl=False to the Client constructor.

The above code expects the following environment variables to be set:

  • KUBEFLOW_USERNAME
  • KUBEFLOW_PASSWORD
  • KUBEFLOW_DOMAIN
  • KUBEFLOW_NAMESPACE (optional), only if it doesn't match the part before the @ in the username

If you are using Dex for authentication, you can use the following code snippet to get the session cookie (which you can also find on the Kubeflow Documentation.

import re
import requests
from urllib.parse import urlsplit


def get_istio_auth_session(url: str, username: str, password: str) -> dict:
    """
    Determine if the specified URL is secured by Dex and try to obtain a session cookie.
    WARNING: only Dex `staticPasswords` and `LDAP` authentication are currently supported
             (we default default to using `staticPasswords` if both are enabled)

    :param url: Kubeflow server URL, including protocol
    :param username: Dex `staticPasswords` or `LDAP` username
    :param password: Dex `staticPasswords` or `LDAP` password
    :return: auth session information
    """
    # define the default return object
    auth_session = {
        "endpoint_url": url,  # KF endpoint URL
        "redirect_url": None,  # KF redirect URL, if applicable
        "dex_login_url": None,  # Dex login URL (for POST of credentials)
        "is_secured": None,  # True if KF endpoint is secured
        "session_cookie": None  # Resulting session cookies in the form "key1=value1; key2=value2"
    }

    # use a persistent session (for cookies)
    with requests.Session() as s:

        ################
        # Determine if Endpoint is Secured
        ################
        resp = s.get(url, allow_redirects=True, verify=False)
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {url}"
            )

        auth_session["redirect_url"] = resp.url

        # if we were NOT redirected, then the endpoint is UNSECURED
        if len(resp.history) == 0:
            auth_session["is_secured"] = False
            return auth_session
        else:
            auth_session["is_secured"] = True

        ################
        # Get Dex Login URL
        ################
        redirect_url_obj = urlsplit(auth_session["redirect_url"])

        # if we are at `/auth?=xxxx` path, we need to select an auth type
        if re.search(r"/auth$", redirect_url_obj.path):
            #######
            # TIP: choose the default auth type by including ONE of the following
            #######

            # OPTION 1: set "staticPasswords" as default auth type
            redirect_url_obj = redirect_url_obj._replace(
                path=re.sub(r"/auth$", "/auth/local", redirect_url_obj.path)
            )
            # OPTION 2: set "ldap" as default auth type
            # redirect_url_obj = redirect_url_obj._replace(
            #     path=re.sub(r"/auth$", "/auth/ldap", redirect_url_obj.path)
            # )

        # if we are at `/auth/xxxx/login` path, then no further action is needed (we can use it for login POST)
        if re.search(r"/auth/.*/login$", redirect_url_obj.path):
            auth_session["dex_login_url"] = redirect_url_obj.geturl()

        # else, we need to be redirected to the actual login page
        else:
            # this GET should redirect us to the `/auth/xxxx/login` path
            resp = s.get(redirect_url_obj.geturl(), allow_redirects=True, verify=False)
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {redirect_url_obj.geturl()}"
                )

            # set the login url
            auth_session["dex_login_url"] = resp.url

        ################
        # Attempt Dex Login
        ################
        resp = s.post(
            auth_session["dex_login_url"],
            data={"login": username, "password": password},
            allow_redirects=True
        )
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials were probably invalid - "
                f"No redirect after POST to: {auth_session['dex_login_url']}"
            )

        # store the session cookies in a "key1=value1; key2=value2" string
        auth_session["session_cookie"] = "; ".join([f"{c.name}={c.value}" for c in s.cookies])

    return auth_session

How do I access the return value of a component that has multiple outputs?⚓︎

If a component does not just have one return value but also additional outputs defined in the function header, you can no longer simply use the .output attribute of the task to access the return value. In this case, the return value can be accessed and passed to the next task via task.outputs['Output'].

from kfp.dsl import Output, Dataset, component, pipeline
from kfp.client import Client
from typing import Dict

@component(packages_to_install=['pandas', 'pyarrow', 'numpy'])
def write_random_df(name: str, df_out: Output[Dataset]) -> Dict:
    import pandas as pd
    import numpy as np

    df = pd.DataFrame({name: np.random.random(100)})
    df.to_parquet(df_out.path)
    return df.describe().to_dict()

@component()
def print_df_info_dictionary(df_info_dict: Dict) -> Dict:
    print(df_info_dict)
    return df_info_dict

@pipeline
def example_pipeline(column_name: str):
    write_random_df_task = write_random_df(name=column_name)
    print_df_info_task = print_df_info_dictionary(
        df_info_dict=write_random_df_task.outputs['Output']
    )

client = Client()
run = client.create_run_from_pipeline_func(
    example_pipeline,
    arguments={'column_name': 'random'},
)