KFP Cookbook⚓︎
Pipeline Examples⚓︎
Simple example of a pipeline made up of two lightweight components:
from kfp import dsl
from kfp.dsl import Output, Dataset, Input, component, pipeline
from kfp import compiler
from kfp.client import Client
@component(packages_to_install=['pandas', 'pyarrow', 'numpy'])
def write_random(name: str, out: Output[Dataset]):
import pandas as pd
import numpy as np
df = pd.DataFrame({name: np.random.random(100)})
df.to_parquet(out.path)
@component(packages_to_install=['pandas', 'pyarrow'])
def calc_sum(data_in: Input[Dataset]) -> float:
import pandas as pd
df = pd.read_parquet(data_in.path)
return df.sum().sum()
@pipeline
def example_pipeline(column_name: str) -> float:
task_random = write_random(name=column_name)
task_sum = calc_sum(data_in=task_random.outputs['out'])
return task_sum.output
compiler.Compiler().compile(example_pipeline, 'pipeline.yaml')
client = Client()
run = client.create_run_from_pipeline_package(
'pipeline.yaml',
arguments={
'column_name': 'random',
},
)
How do I save secret credentials in my pipelines?⚓︎
You can use kfp-kubernetes to mount a secret as a volume or use it as an environment variable. Make sure to install it first, e.g. pip install kfp[kubernetes] .
Example for using a secret as environment variables (this way, it won't end up in the pipeline yaml):
from kfp import dsl
from kfp import kubernetes
import kfp
@dsl.component
def comp():
import os
for key in ['key_id', 'key_secret']:
print(key, os.environ[key])
@dsl.pipeline
def pipeline_secret_env():
task = comp()
kubernetes.use_secret_as_env(
task,
secret_name='my-secret',
secret_key_to_env={'user': 'key_id', 'password': 'key_secret'})
client = kfp.Client()
client.create_run_from_pipeline_func(pipeline_secret_env)
where the matching secret was created with kubectl create secret generic
my-secret -n my-namespace --from-literal=password=test1234
--from-literal=user=user@example.com (which creates a secret with two keys
user and password in the namespace my-namespace).
I want to get the run ID in a component⚓︎
Unfortunately, in kfp v2 the environment variable KFP_RUN_ID is not set in the
container. You can use the following workaround (either copy the complete
component or use the relevant part):
from kfp import dsl
@dsl.component
def get_run_id(name: str) -> str:
from ast import literal_eval
import os
import json
argo_template = json.loads(os.getenv('ARGO_TEMPLATE'))
input_parameters = argo_template['inputs']['parameters'][0]
input_params = literal_eval(input_parameters['value'])
command_list = input_params['containers'][0]['command']
# get the index of '--run_id'
index = command_list.index('--run_id')
# get the value after '--run_id'
run_id = command_list[index + 1]
print(f'run_id: {run_id}')
return run_id
KFP_RUN_ID can then be set inside the pipeline component as follows:
from kfp import dsl, kubernetes
@dsl.component
def comp():
import os
print(os.environ["KFP_RUN_ID"])
@dsl.pipeline
def example_pipeline():
task = comp()
kubernetes.use_field_path_as_env(
task,
env_name="KFP_RUN_ID",
field_path="metadata.labels['pipeline/runid']"
)
MinIO⚓︎
I want to write a pandas DataFrame into MinIO⚓︎
If key and secret are defined as environment variables (as they are in notebooks), you can use this:
import os
DATA_PATH = 'bucket_name/folder/file.parquet'
storage_options={
"key": os.environ.get('AWS_ACCESS_KEY_ID'),
"secret": os.environ.get('AWS_SECRET_ACCESS_KEY'),
"client_kwargs": {"endpoint_url": 'http://minio.minio'}}
df.to_parquet(f's3://{DATA_PATH}', storage_options=storage_options)
Make sure to use s3:// as the protocol (not minio://).
You can populate environment variables from a secret.
from kfp import dsl
import kfp
from kfp import kubernetes
@dsl.component(packages_to_install=['pandas', 'pyarrow', 'fsspec', 's3fs'])
def comp():
import os
import pandas as pd
df = pd.util.testing.makeDataFrame()
DATA_PATH = 'integration-tests-temporary/from_pipeline.parquet'
storage_options={
"key": os.environ.get('AWS_ACCESS_KEY_ID'),
"secret": os.environ.get('AWS_SECRET_ACCESS_KEY'),
"client_kwargs": {"endpoint_url": 'http://minio.minio'}}
df.to_parquet(f's3://{DATA_PATH}', storage_options=storage_options)
@dsl.pipeline
def pipeline_secret_env():
task = comp()
kubernetes.use_secret_as_env(
task,
secret_name='s3creds',
secret_key_to_env={
'AWS_ACCESS_KEY_ID': 'AWS_ACCESS_KEY_ID',
'AWS_SECRET_ACCESS_KEY': 'AWS_SECRET_ACCESS_KEY',
}
)
client = kfp.Client()
client.create_run_from_pipeline_func(pipeline_secret_env)
I want to read a pandas DataFrame from MinIO⚓︎
DATA_PATH = 'bucket_name/folder/file.parquet'
storage_options={
"key": os.getenv('AWS_ACCESS_KEY_ID'),
"secret": os.getenv('AWS_SECRET_ACCESS_KEY'),
"client_kwargs": {"endpoint_url": 'http://minio.minio'}}
df = pd.read_parquet(f's3://{DATA_PATH}', storage_options=storage_options)
df.head()
I want to run a pipeline daily, but some parts should only be calculated once a month⚓︎
Question: Getting data from a database is very costly, and the data gets updated only once a month anyway. How can I run my pipeline daily, but make sure that data is pulled from the pipeline only once a month?
Answer: Make your components that should run only occasionally accept another input argument that is only used for cache in-validation. Then create a new component, whose output only changes when needed, e.g. once a new month is reached.
Example⚓︎
which has one component which is re-run every minute and one that is re-run every hour:
import kfp
from kfp import dsl, compiler, Client
from kfp.dsl import Output, Metrics, component
@dsl.component
def add(a: float, b: float) -> float:
return a + b
@dsl.component
def log(a: float, b: float, c: float, metric: Output[Metrics]):
metric.log_metric("a", float(a))
metric.log_metric("b", float(b))
metric.log_metric("c", float(c))
@dsl.component
def get_data(cache: str) -> float:
"""simulate an expensive operation getting or crunching some data
:arg cache: is used for cache invalidation
"""
import random
return int(1000 * random.random())
@dsl.component
def generate_minute() -> str:
import datetime as dt
return dt.datetime.now().strftime('%Y-%m-%dT%H:%M')
@dsl.component
def generate_hour() -> str:
import datetime as dt
return dt.datetime.now().strftime('%Y-%m-%dT%H:XX')
@dsl.component
def generate_day() -> str:
import datetime as dt
return dt.datetime.now().strftime('%Y-%m-%d')
@dsl.component
def generate_month() -> str:
import datetime as dt
return dt.datetime.now().strftime('%Y-%m-XX')
@dsl.pipeline
def test_pipeline():
# setting enable_caching=False is important here for this to work,
# otherwise minute and hour are also always cached
minute = generate_minute().set_caching_options(enable_caching=False)
hour = generate_hour().set_caching_options(enable_caching=False)
data1 = get_data(cache=minute.output)
data2 = get_data(cache=hour.output)
result = add(a=data1.output, b=data2.output)
log(a=data1.output, b=data2.output, c=result.output)
compiler.Compiler().compile(test_pipeline, 'pipeline.yaml')
run = Client().create_run_from_pipeline_package('pipeline.yaml')
Make sure not to set enable_cache=True or enable_cache=False when creating the run.
I want to get notified when my pipeline finished/failed⚓︎
ExitHandler can be used to run a component when the pipeline is exiting.
An example that uses a Teams webhook to send a message to a channel (replace the URL):
from kfp.dsl import container_component, ContainerSpec, PipelineTaskFinalStatus
import json
@container_component
def notify(
title: str = 'Pipeline Status',
message: str = 'Your Pipeline finished',
):
return ContainerSpec(
image='alpine/curl',
command=[
'curl',
'-X',
'POST',
'-H' ,
'Content-Type: application/json',
'-d',
json.dumps({"title": str(title), "text": str(message)}),
'https://justaddai.webhook.office.com/webhookb2/XXXXXX',
]
)
@dsl.pipeline
def test_pipeline(
a: float = 42,
b: float = 23,
):
notifier = notify(message='test')
with dsl.ExitHandler(exit_task=notifier):
c = add(a=a, b=b)
d = add(a=c.output, b=42.0)
notify(title='pipeline')
log(result=d.output)
client = Client()
compiler.Compiler().compile(test_pipeline, 'pipeline.yaml')
run = client.create_run_from_pipeline_package('pipeline.yaml', enable_caching=False)
Getting the status of a pipeline is not yet supported by the open source KFP backend
I want to start pipelines from my local machine⚓︎
You can use also start a pipeline from outside of the cluster (or use the kfp pipeline client in any other way). For that to work, you need a valid session cookie to authenticate. The following code snippet shows how to start the pipeline:
from kfp.client import Client
import os
domain = os.getenv('KUBEFLOW_DOMAIN')
username = os.getenv('KUBEFLOW_USERNAME')
password = os.getenv('KUBEFLOW_PASSWORD')
namespace = os.environ.get('KUBEFLOW_NAMESPACE', None) or username.split("@")[0].replace(".", "-")
auth_session = get_istio_auth_session(
url=f"https://{domain}/",
username=username,
password=password,
)
client = Client(host=f"{domain}/pipeline", namespace=namespace, cookies=auth_session["session_cookie"])
print(client.list_experiments())
If you are using a self-signed certificate, you need to add verify_ssl=False to the Client constructor.
The above code expects the following environment variables to be set:
KUBEFLOW_USERNAMEKUBEFLOW_PASSWORDKUBEFLOW_DOMAINKUBEFLOW_NAMESPACE(optional), only if it doesn't match the part before the@in the username
If you are using Dex for authentication, you can use the following code snippet to get the session cookie (which you can also find on the Kubeflow Documentation.
import re
import requests
from urllib.parse import urlsplit
def get_istio_auth_session(url: str, username: str, password: str) -> dict:
"""
Determine if the specified URL is secured by Dex and try to obtain a session cookie.
WARNING: only Dex `staticPasswords` and `LDAP` authentication are currently supported
(we default default to using `staticPasswords` if both are enabled)
:param url: Kubeflow server URL, including protocol
:param username: Dex `staticPasswords` or `LDAP` username
:param password: Dex `staticPasswords` or `LDAP` password
:return: auth session information
"""
# define the default return object
auth_session = {
"endpoint_url": url, # KF endpoint URL
"redirect_url": None, # KF redirect URL, if applicable
"dex_login_url": None, # Dex login URL (for POST of credentials)
"is_secured": None, # True if KF endpoint is secured
"session_cookie": None # Resulting session cookies in the form "key1=value1; key2=value2"
}
# use a persistent session (for cookies)
with requests.Session() as s:
################
# Determine if Endpoint is Secured
################
resp = s.get(url, allow_redirects=True, verify=False)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for GET against: {url}"
)
auth_session["redirect_url"] = resp.url
# if we were NOT redirected, then the endpoint is UNSECURED
if len(resp.history) == 0:
auth_session["is_secured"] = False
return auth_session
else:
auth_session["is_secured"] = True
################
# Get Dex Login URL
################
redirect_url_obj = urlsplit(auth_session["redirect_url"])
# if we are at `/auth?=xxxx` path, we need to select an auth type
if re.search(r"/auth$", redirect_url_obj.path):
#######
# TIP: choose the default auth type by including ONE of the following
#######
# OPTION 1: set "staticPasswords" as default auth type
redirect_url_obj = redirect_url_obj._replace(
path=re.sub(r"/auth$", "/auth/local", redirect_url_obj.path)
)
# OPTION 2: set "ldap" as default auth type
# redirect_url_obj = redirect_url_obj._replace(
# path=re.sub(r"/auth$", "/auth/ldap", redirect_url_obj.path)
# )
# if we are at `/auth/xxxx/login` path, then no further action is needed (we can use it for login POST)
if re.search(r"/auth/.*/login$", redirect_url_obj.path):
auth_session["dex_login_url"] = redirect_url_obj.geturl()
# else, we need to be redirected to the actual login page
else:
# this GET should redirect us to the `/auth/xxxx/login` path
resp = s.get(redirect_url_obj.geturl(), allow_redirects=True, verify=False)
if resp.status_code != 200:
raise RuntimeError(
f"HTTP status code '{resp.status_code}' for GET against: {redirect_url_obj.geturl()}"
)
# set the login url
auth_session["dex_login_url"] = resp.url
################
# Attempt Dex Login
################
resp = s.post(
auth_session["dex_login_url"],
data={"login": username, "password": password},
allow_redirects=True
)
if len(resp.history) == 0:
raise RuntimeError(
f"Login credentials were probably invalid - "
f"No redirect after POST to: {auth_session['dex_login_url']}"
)
# store the session cookies in a "key1=value1; key2=value2" string
auth_session["session_cookie"] = "; ".join([f"{c.name}={c.value}" for c in s.cookies])
return auth_session
How do I access the return value of a component that has multiple outputs?⚓︎
If a component does not just have one return value but also additional outputs
defined in the function header, you can no longer simply use the .output
attribute of the task to access the return value. In this case, the return
value can be accessed and passed to the next task via task.outputs['Output'].
from kfp.dsl import Output, Dataset, component, pipeline
from kfp.client import Client
from typing import Dict
@component(packages_to_install=['pandas', 'pyarrow', 'numpy'])
def write_random_df(name: str, df_out: Output[Dataset]) -> Dict:
import pandas as pd
import numpy as np
df = pd.DataFrame({name: np.random.random(100)})
df.to_parquet(df_out.path)
return df.describe().to_dict()
@component()
def print_df_info_dictionary(df_info_dict: Dict) -> Dict:
print(df_info_dict)
return df_info_dict
@pipeline
def example_pipeline(column_name: str):
write_random_df_task = write_random_df(name=column_name)
print_df_info_task = print_df_info_dictionary(
df_info_dict=write_random_df_task.outputs['Output']
)
client = Client()
run = client.create_run_from_pipeline_func(
example_pipeline,
arguments={'column_name': 'random'},
)