0%

完整 Kubeflow 使用教學 - 開發 ML 模型、進行分散式訓練與部署服務

簡介

一個 深度學習模型 (Deep Learning Model) 的開發牽涉到許多的步驟,包含從一開始的資料收集分析與處理、模型的開發與訓練
,到最後將模型進行部署並提供服務使用。這其中牽涉到許多開發環境與開發工具的轉換,對開發人員是一個不小的負擔。而 Kubeflow 便是一個建立在 Kubernetes 之上的模型開發平台,提供開發模型所需的所有工具,並且藉由 Kubernetes 達到資源、網路的彈性控管。

今天要來介紹的便是如何使用 Kubeflow 來完成深度學習模型開發、分散式訓練以及部署模型服務的典型應用情境,如下圖。

事前準備與前言

本篇教學牽涉使用 GKE (google kubernetes engine)

  1. 部署 Kubeflow
  2. 使用 Kubeflow/Jupyterhub 開啟 Notebook 開發模型
  3. 使用 Kubeflow/tf-operator 部署 TFJob 進行模型訓練
  4. 使用 Kubeflow/KFServing 部署訓練好的模型

本篇教學用到的程式碼與 yaml 檔,可以在我的 Github Repo 找到,歡迎參考。

另外本篇也大量參考此 範例,也可以至此參考。

使用 GKE 部署 Kubeflow

本篇文章範例將使用 Google Kubernetes Engine (GKE) 來部署 Kubernetes 與安裝 Kubeflow,同時將會使用 Google Cloud Storage (GCS) 來儲存訓練好之模型。

使用 GKE 部署 Kubernetes

GKE 在最近的版本中加入了 Workload Identity 的功能,使得Kubernetes 集群中的 Pod 可以使用代表 Google service accountKubernetes service account 來使用其他的Google 雲端服務。詳細的介紹可以參考這邊: Workload Identity

因此在我們創建 Kubernetes 集群的設定中有幾點要注意。

  1. 選擇 1.15版本 ,創建4個節點,每一個包含 2 vcpu / 7.5GB mem

  1. 開啟 GCS (儲存空間) 的 API 存取權

  1. 於「可用性、網路、安全性和其他功能」的選項中開啟 Workload Identity

部署 Kubeflow

我們使用此文件部署 Kubeflow ,此版本為無認證功能的版本。

Kubeflow 目前透過子專案 kfctl 來發布與安裝 Kubeflow

  1. 下載 kfctl v1.0 發布版本 Kubeflow releases page
  2. 解壓縮 tar -xvf kfctl_v0.7.1_<platform>.tar.gz
  3. 設定環境變數,方便後續部署。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    # 以利後需使用kfctl指令
    $ export PATH=$PATH:"<path-to-kfctl>"

    # 部署將會使用這個設定檔
    $ export CONFIG_URI="https://raw.githubusercontent.com/kubeflow/manifests/v0.7-branch/kfdef/kfctl_k8s_istio.0.7.1.yaml"

    # Kubeflow部署過程生成的設定檔存放的資料夾名,可以設定成`my-kubeflow`或是`kf-test`
    $ export KF_NAME=<your choice of name for the Kubeflow deployment>

    # 放置Kubeflow專案的資料夾路徑
    $ export BASE_DIR=<path to a base directory>

    # 此次部署Kubeflow的的完整路徑
    $ export KF_DIR=${BASE_DIR}/${KF_NAME}
  4. 使用官方的 kfctl 即可方便的一個指令部署。

1
2
3
4
5
6
# 來到Kubeflow專案底下
$ mkdir -p ${KF_DIR}

$ cd ${KF_DIR}

$ kfctl apply -V -f ${CONFIG_URI}
  1. 檢驗部署完成

等候大約幾分鐘後,即會看到以下部署成功的字樣

1
2
INFO[0193] Applied the configuration Successfully!       
filename="cmd/apply.go:72"

可以檢查所有 Kubeflow namespace 底下的 pod 來確認是否部署成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
$ kubectl get po -n kubeflow
NAME READY STATUS RESTARTS AGE
admission-webhook-deployment-6cf94887bf-t4zpv 1/1 Running 0 10h
application-controller-stateful-set-0 1/1 Running 0 10h
argo-ui-7fd48cdc66-dt6mf 1/1 Running 0 10h
centraldashboard-64c4cb4c6-fp6z8 1/1 Running 0 10h
jupyter-web-app-deployment-5f9bbcb94b-pvn67 1/1 Running 0 10h
katib-controller-5645cb8675-kk86l 1/1 Running 1 10h
katib-db-57d7c5b64b-x6qxp 1/1 Running 0 10h
katib-manager-5679669f96-vvf92 1/1 Running 1 10h
katib-ui-68c6747887-2fbjw 1/1 Running 0 10h
metadata-db-7f78b5fbf8-cfngw 1/1 Running 0 10h
metadata-deployment-6d47b67dcd-lzcng 1/1 Running 0 10h
metadata-deployment-6d47b67dcd-rzdlv 1/1 Running 2 10h
metadata-envoy-deployment-75d958657-nb9cg 1/1 Running 0 10h
metadata-grpc-deployment-9d79d848-csnmj 1/1 Running 4 10h
metadata-grpc-deployment-9d79d848-zkm7z 1/1 Running 3 10h
metadata-ui-547b468655-7l9m2 1/1 Running 0 10h
minio-d96d4f4cf-9tml2 1/1 Running 0 10h
ml-pipeline-86cf8589b9-662mq 1/1 Running 0 10h
ml-pipeline-ml-pipeline-visualizationserver-d494fcb84-g9kdr 1/1 Running 0 10h
ml-pipeline-persistenceagent-5c4d6c5b54-qfjm6 1/1 Running 0 10h
ml-pipeline-scheduledworkflow-7fbfb9c745-7tbcj 1/1 Running 0 10h
ml-pipeline-ui-55cd89c8fc-gf8sc 1/1 Running 0 10h
ml-pipeline-viewer-controller-deployment-68c746f8d5-hvzrs 1/1 Running 0 10h
mysql-74578b646b-tbsmk 1/1 Running 0 10h
notebook-controller-deployment-6f45dbcd6d-k6jrw 1/1 Running 0 10h
profiles-deployment-6f7ddff899-n4hwh 2/2 Running 0 10h
pytorch-operator-7fcb66d589-xb7bp 1/1 Running 0 10h
seldon-operator-controller-manager-0 1/1 Running 1 10h
spartakus-volunteer-ddc6fcd5f-pqs6z 1/1 Running 0 10h
tensorboard-55d9cd67bc-59nqx 0/1 Pending 0 10h
tf-job-operator-76dd589674-dkpqp 1/1 Running 0 10h
workflow-controller-645f7bd769-nznh4 1/1 Running 0 10h

因為此版本會同時部署 istio ,因此也可以檢查 istio 相關資源是否成功部署。

1
$ kubectl get po -n istio-system

創建 Service Account

Service Account 的目的是使得 Kubernetes 內部的 pod 可以直接使用 Google Service Account 來存取 Google 雲端服務。以下 Service Account 名稱可以自己替換。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 創建 Google Service Account
$ gcloud iam service-accounts create gcp-sa # $GSA_NAME

# 創建 Kubernetes Service Account
$ kubectl create serviceaccount --namespace kubeflow k8s-sa # $KSA_NAME

# 將上述兩個 Service Account 綁上關聯
$ gcloud iam service-accounts add-iam-policy-binding \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:[PROJECT_ID].svc.id.goog[kubeflow/[KSA_NAME]]" \
[GSA_NAME]@[PROJECT_ID].iam.gserviceaccount.com

$ kubectl annotate serviceaccount \
--namespace kubeflow \
[KSA_NAME] \
iam.gke.io/gcp-service-account=[GSA_NAME]@[PROJECT_ID].iam.gserviceaccount.com

Kubeflow 部署完成

至此 Kubeflow 即部署完成,你可以透過 Port-forward 來透過 istio-gateway 使用 Kubeflow UI。

1
2
# 透過localhost:8080連上
kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80

使用 Notebook Servers 來打開 Jupyter Notebook 進行模型開發。

Jupyter Notebook

使用 tf-operator 進行分散式訓練

tf-operator 與 TFJob 介紹

tf-operator 可以說是 Kubeflow 裡面最早的服務,作為一個operator 其任務便是部署與管理一個 tensorflow 分散式訓練任務的生命週期。一個 tensorflow 分散式訓練可以分為幾個角色,

  1. PS: 儲存模型的參數,接收來自 Worker 的參數更新
  2. Worker: 實際進行模型訓練的角色,每一次的 epoch 結束都會將更新的參數上傳至 PS ,並取得新的參數進行訓練。
  3. Chief: 一樣也是 Worker 會進行模型訓練,但是同時負責儲存訓練好的模型或是 checkpoint 的角色,如果沒有特別指定則預設會是 Worker0

而使用者可以透過 tfjob CRD 來定義一組分散式訓練,包含每一個角色所使用的 image 、資源與副本數量。

服務發現

如果曾經使用實體機部署過分散式訓練,有一個很大的重點在於如何讓上述的三個角色在開始執行後能夠發現彼此。

tensorflow distributed training 文件中提到,必須建立一個環境變數 TF_Config,是先填寫每一個角色的所在的 host 與 port,如此一來 tensorflow 才能建立起 clusterspecsession 來開始訓練。

而當環境轉移到 Kuburnetes 上時,因為每一個角色以 pod 存在,並透過 service 建立服務與外界互動,因此在實體機上面必須手動設定位址這件事情,我們直接交由 service 與 kubernetes 內部的服務發現運作即可。

簡單來說,tf-operator 除了創建每一個角色的 pods/services 外,同時也會幫你在每一個 pod 的環境變數加上 TF_CONFIG 來記錄所有角色的位置,使得每一個角色可以參考此記錄來建立 tf.train.clusterspec

1
2
3
4
5
6
7
8
# 舉例: 
# 下面的網路位置為從 tf-operator 幫你附上的 env 取得
# 這些位址代表 kubernetes 內部的服務名稱
# 由 Kubernetes 內部的 DNS 維護與進行服務發現
cluster = tf.train.ClusterSpec({
"worker": ["worker0.example.com:2222","worker1.example.com:2222","worker2.example.com:2222"],
"ps": ["ps0.example.com:2222","ps1.example.com:2222"]
})

而當 Chief 正常結束 ( exit 0 ) ,tf-operator會判定此訓練成功結束,將根據資源釋放。

下圖為一個分散式訓練的示意圖:

進行 Mnist 分散式訓練

這邊我們使用大家都非常熟悉的手寫數字辨識範例,但是做了一點修改成為分散式訓練的版本: model.py

為了後續將會使用 TFServing 來進行 Inference ,我們做的修改包含

  1. tf.estimator.train_and_evaluate 來進行訓練
  2. classifier.export_savedmodel 儲存模型

Note: 這篇教學不包含將模型程式碼打包成 docker image,詳細文件可以參考原文。此處我們直接使用範例包好的 image 。

部署 TFJob 進行訓練

首先我們先創建一個 GCS Bucket,將在最後作為儲存模型的空間。

1
gsutil mb gs://$BUCKET/

接著至 Google Cloud Console 的 GCS 頁面,編輯你的 BUCKET 權限加入剛剛的 Google Service Account ( 以先前的例子來說即為 gcp-sa@XXX ) 成為 管理員

有了模型程式碼,且創建好物件儲存空間後後,接下來我們使用以下 yaml 檔來部署一個 TFJob 進行分散式訓練。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
name: mnist-train-dist
namespace: kubeflow
spec:
tfReplicaSpecs: # 分成三種角色各自填寫
Chief:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- command:
- /usr/bin/python
- /opt/model.py
- --tf-model-dir=$(modelDir)
- --tf-export-dir=$(exportDir)
- --tf-train-steps=$(trainSteps)
- --tf-batch-size=$(batchSize)
- --tf-learning-rate=$(learningRate)
env:
- name: modelDir
value: gs://${BUCKET}/my-model
- name: exportDir
value: gs://${BUCKET}/my-model/export
- name: trainSteps
value: "200"
- name: batchSize
value: "100"
- name: learningRate
value: "0.01"
image: gcr.io/kubeflow-examples/mnist/model:build-1202842504546750464
name: tensorflow
workingDir: /opt
restartPolicy: OnFailure
serviceAccount: k8s-sa
Ps:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- command:
- /usr/bin/python
- /opt/model.py
- --tf-model-dir=$(modelDir)
- --tf-export-dir=$(exportDir)
- --tf-train-steps=$(trainSteps)
- --tf-batch-size=$(batchSize)
- --tf-learning-rate=$(learningRate)
env:
- name: modelDir
value: gs://${BUCKET}/my-model
- name: exportDir
value: gs://${BUCKET}/my-model/export
- name: trainSteps
value: "200"
- name: batchSize
value: "100"
- name: learningRate
value: "0.01"
image: gcr.io/kubeflow-examples/mnist/model:build-1202842504546750464
name: tensorflow
workingDir: /opt
restartPolicy: OnFailure
serviceAccount: k8s-sa
Worker:
replicas: 2
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- command:
- /usr/bin/python
- /opt/model.py
- --tf-model-dir=$(modelDir)
- --tf-export-dir=$(exportDir)
- --tf-train-steps=$(trainSteps)
- --tf-batch-size=$(batchSize)
- --tf-learning-rate=$(learningRate)
env:
- name: modelDir
value: gs://${BUCKET}/my-model
- name: exportDir
value: gs://${BUCKET}/my-model/export
- name: trainSteps
value: "200"
- name: batchSize
value: "100"
- name: learningRate
value: "0.01"
image: gcr.io/kubeflow-examples/mnist/model:build-1202842504546750464
name: tensorflow
workingDir: /opt
restartPolicy: OnFailure
serviceAccount: k8s-sa

將上面的 TFJob 儲存成 mnist_tfjob.yaml ,進行部署

1
2
3
4
5
6
7
8
9
10
11
12
$ kubectl apply -f mnist_tfjob.yaml

# 將會看到 tf-operator 自動幫你創建了相對應的 TFJob 與 pods

$ kubectl get tfjob -n kubeflow
mnist-train-dist Running 1s

$ kubectl get pod -n kubeflow
mnist-train-dist-chief-0 0/1 ContainerCreating 0 1s
mnist-train-dist-ps-0 0/1 ContainerCreating 0 1s
mnist-train-dist-worker-0 0/1 ContainerCreating 0 1s
mnist-train-dist-worker-1 0/1 ContainerCreating 0 1s

且每一個 pod 會多一個環境變數 TF_CONFIG

1
2
3
4
5
6
7
8
9
10
11
$ kubectl get pod/mnist-train-dist-worker-0 -n kubeflow -oyaml

...
- name: TF_CONFIG
value: '{"cluster":{
"chief":["mnist-train-dist-chief-0.kubeflow.svc:2222"],
"ps":["mnist-train-dist-ps-0.kubeflow.svc:2222"],
"worker":["mnist-train-dist-worker-0.kubeflow.svc:2222","mnist-train-dist-worker-1.kubeflow.svc:2222"]},
"task":{"type":"worker","index":0},
"environment":"cloud"}'
...

觀察 Chief 的 log,可以看到成功訓練完成,且將訓練好的模型上傳至 GCS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
INFO:tensorflow:Start Tensorflow server.
2020-01-20 11:59:30.410103: I tensorflow/core/platform/cpu_feature_guard.cc:140] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2020-01-20 11:59:30.411281: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job chief -> {0 -> localhost:2222}
2020-01-20 11:59:30.411312: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> mnist-train-dist-ps-0.kubeflow.svc:2222}
2020-01-20 11:59:30.411327: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> mnist-train-dist-worker-0.kubeflow.svc:2222, 1 -> mnist-train-dist-worker-1.kubeflow.svc:2222}
2020-01-20 11:59:30.412206: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:333] Started server with target: grpc://localhost:2222
INFO:tensorflow:Skipping training since max_steps has already saved.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Signatures INCLUDED in export for Classify: None
INFO:tensorflow:Signatures INCLUDED in export for Regress: None
INFO:tensorflow:Signatures INCLUDED in export for Predict: ['serving_default', 'classes']
INFO:tensorflow:Restoring parameters from gs://${BUCKET}/my-model/model.ckpt-203
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: gs://${BUCKET}/my-model/export/temp-1579521577/saved_model.pb
Successfully downloaded train-images-idx3-ubyte.gz 9912422 bytes.
Extracting /tmp/data/train-images-idx3-ubyte.gz
Successfully downloaded train-labels-idx1-ubyte.gz 28881 bytes.
Extracting /tmp/data/train-labels-idx1-ubyte.gz
Successfully downloaded t10k-images-idx3-ubyte.gz 1648877 bytes.
Extracting /tmp/data/t10k-images-idx3-ubyte.gz
Successfully downloaded t10k-labels-idx1-ubyte.gz 4542 bytes.
Extracting /tmp/data/t10k-labels-idx1-ubyte.gz
Train and evaluate
Training done
Export saved model
Done exporting the model

最後在 GCS 上面可以看到剛剛成功訓練完的模型,包含一個 .pb 檔與參數。

至此便完成了使用 Kubeflow 執行分散式訓練任務。
當然如果你的應用情境是單機訓練,也可以使用 TFJob ,但是只需填寫 Worker spec 並且設定 replcas 為1即可。

Inference

訓練模型的最終目的就是讓模型自己找出模型內最佳參數,使得預測結果盡量符合訓練集。而訓練完成後我們便可以使用該模型,輸入不在訓練集內的同類型資料,可能是一段文字或是一張圖片,並期望得到一個準確的預測結果或是分類,這就稱為 Inference。

比方說輸入一個圖片,模型會產出一個描述圖片的文字。下圖是一個結合 CNN 與 RNN 的 Image Caption Generator 模型

TFServing 與使用

如何將訓練完的模型上線,其重要性不亞於訓練本身,畢竟模型可以被使用才有價值。但是隨著時間推移與資訊的更新,模型的準確率可能下降因為不符合現在的趨勢,模型可能每過一段時間就要再訓練一次再重新部署。也就如上面的圖,模型會不斷進行training並推出v1, v2…。

而為了優化整個部署與 Inference 流程,Google 推出了Tensorflow Serving,來解決模型部署問題。

使用方式非常簡單,有了剛剛的 .pb 模型檔後,即可直接使用 Tensorflow 提供的 Docker image 來部署該模型並提供一個服務端口,使得使用者可以直接透過該端口傳入一個請求,可能是一個圖片或是文字來得到預測結果。

一個簡單的範例,模型也僅僅是使用 Tensorflow 寫一個將矩陣中的每一個元素 除以2加上1 的模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 下載 tensorflow serving image
docker pull tensorflow/serving

git clone https://github.com/tensorflow/serving
# 使用 repo 中已經提的範例
TESTDATA="$(pwd)/serving/tensorflow_serving/servables/tensorflow/testdata"

# Start TensorFlow Serving container and open the REST API port
docker run -t --rm -p 8501:8501 \
-v "$TESTDATA/saved_model_half_plus_two_cpu:/models/half_plus_two" \
-e MODEL_NAME=half_plus_two \
tensorflow/serving &

# Query the model using the predict API
curl -d '{"instances": [1.0, 2.0, 5.0]}' \
-X POST http://localhost:8501/v1/models/half_plus_two:predict

# Returns => { "predictions": [2.5, 3.0, 4.5] }

上面的 $TESTDATA/saved_model_half_plus_two_cpu 指到的目錄底下即是一個模型的 .pb 檔與參數。

以上就是如何使用 TFServing 與 Docker 來部署模型提供服務。更多介紹可以查看 官方文件

KFServing

KFServing 介紹

既然 Tensorflow 已經可以做到模型部署,又為什麼需要 Kubeflow 提供的 KFServing 呢?原因如下,

  1. 直接透過 Docker 部署服務沒辦法維護服務的可用性與彈性。 KFServing 搭配 Kubernetes ,服務將以 pod/service 存在,即可直接交由 Kubernetes 來維護 pod 生命週期,並且可以做到 auto-sacling ( 支援 scale down to 0 ) 應付流量,或是 rolling update 來 0-down-time 的更新模型。

  2. Tensorflow Serving 只能支援 Tensorflow 產出的模型,但是框架如此之多, KFServing 作為工具箱即盡力支援每一種框架。

  3. KFServing 底層由 Knative 與 istio 實作,因此可以做到同時部署兩個版本的模型進行 金絲雀部署 (canary deployment) 進行 A/B test

  4. 一個輸入可能是一張圖或是一段文字,在輸入給模型前需要進行前處理轉成 np.array 或是 tensor ,而原始的作法需要加上一個簡易的 HTTP Server,來做資料前處理在輸入進模型。而 KFServing 提供使用者簡單將前處理的函式實作完後,交由 KFServing 部署,並建立一個 pipeline 來串起前處理與實際的模型使用。如下圖,

事前準備

如同部署一個 TFJob 一樣,使用 KFServing 來部署剛剛訓練好的模型也只需要編寫一個 CRD spec,且更簡單的是只需提供 GCS 的位址, KFServing 會幫你下載模型檔案建立起 pod/service 提供服務。

在部署前須先在在 kubeflow namespace 標上 label

1
$ kubectl label namespace kubeflow serving.kubeflow.org/inferenceservice=enabled

Note:
出於某種原因 GCS 上面的資料夾可能出現同名的檔案,導致接下來的部署出錯,需要先手動刪除,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
gs://${BUCKET}/my-model/export/:
gs://${BUCKET}/my-model/export/ # 多餘的,需刪除

gs://${BUCKET}/my-model/export/1579521445/:
gs://${BUCKET}/my-model/export/1579521445/ # 多餘的,需刪除
gs://${BUCKET}/my-model/export/1579521445/saved_model.pb

gs://${BUCKET}/my-model/export/1579521445/variables/:
gs://${BUCKET}/my-model/export/1579521445/variables/ # 多餘的,需刪除
gs://${BUCKET}/my-model/export/1579521445/variables/variables.data-00000-of-00001
gs://${BUCKET}/my-model/export/1579521445/variables/variables.index

# 使用以下指令刪除
$ gsutil rm gs://${BUCKET}/my-model/export/
...

使用 KFServing 部署模型

我們的模型是 Mnist 手寫數字辨識模型,因此我們需要一個前處理的步驟將圖片透過 openCV 來做處理,以下是前處理的程式碼,需要按照 KFServing 提供的 framework 來實作成一個 Module。

前處理的程式碼 image_transformer.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import kfserving
from typing import List, Dict
from PIL import Image
import numpy as np
import io
import base64
import cv2

def image_transform(instance):
byte_array = base64.b64decode(instance['image_bytes']['b64'])
image = Image.open(io.BytesIO(byte_array))
#img = cv2.imread(image, cv2.IMREAD_GRAYSCALE)
g = cv2.resize(255 - np.asarray(image), (28, 28))
g = g.flatten() / 255.0
return g.tolist()


class ImageTransformer(kfserving.KFModel): # 繼承 kfserving.KFModel
def __init__(self, name: str, predictor_host: str):
super().__init__(name)
self.predictor_host = predictor_host
self._key = None

def preprocess(self, inputs: Dict) -> Dict: # 實作 preprocess 函式
return {'instances': [image_transform(instance) for instance in inputs['instances']]}

主程式 main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import kfserving
import argparse
from .image_transformer import ImageTransformer

DEFAULT_MODEL_NAME = "model"

parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser])
parser.add_argument('--model_name', default=DEFAULT_MODEL_NAME,
help='The name that the model is served under.')
parser.add_argument('--predictor_host',
help='The URL for the model predict function', required=True)

args, _ = parser.parse_known_args()

if __name__ == "__main__":
transformer = ImageTransformer(
args.model_name, predictor_host=args.predictor_host)
kfserver = kfserving.KFServer()
kfserver.start(models=[transformer])

最後打包成 Docker image ,細節可以參考我的 github repo ,下面直接用我已經包好的 image 即可。

有了模型以及前處理的 Docker image,我們就可以完成 KFServing 的部署 spec。

將以下檔案存成 mnist_inference.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
name: "mnist"
namespace: "kubeflow"
spec:
default:
predictor: # 模型
minReplicas: 1
serviceAccountName: k8s-sa # 必須填寫 SA
tensorflow:
storageUri: "gs://${BUCKET}/my-model/export" # 只需提供模型位置
transformer: # 前處理
minReplicas: 1
custom:
container:
image: jackfantasy/image-transformer:v1 # 將上述的前處理程式碼打包的 images
name: kfserving-container

部署該檔案,並等待該部署成功。

1
2
3
4
5
6
7
8
9
10
11
$ kubectl apply -f mnist_inference.yaml

$ kubectl get inferenceservice -n kubeflow
NAME URL READY DEFAULT TRAFFIC CANARY TRAFFIC AGE
mnist http://mnist.kubeflow.example.com/v1/models/mnist True 100 2m11s 91s

$ kubectl get pods -l serving.kubeflow.org/inferenceservice=mnist -n kubeflow

NAME READY STATUS RESTARTS AGE
mnist-predictor-default-w7bb5-deployment-78fff767c9-czwgp 2/2 Running 0 94s
mnist-transformer-default-2fmkf-deployment-7bc5c85594-qn6v4 2/2 Running 0 94s

使用模型進行 Inference

將測試用的圖片轉成base64,將下圖存成 0.png

1
$ cat 0.png | base64 # 將結果複製到下面的 input.json 中之 b64 的值

將以下檔案存成 input.json

1
2
3
4
5
6
7
8
9
{
"instances": [
{
"image_bytes": {
"b64": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAABC0lEQVR4nGNgGHCg0/NvOw8Oudqnf//+dcQq5bLk5j+g5OMlfphyDt/+/v33dxtQehuGrMcHoPA/F9FNX//+/WyCJvfi798f9714GRjKgYp8USUfAIUawCyVa3//vrBAkmJb+Pvv3xIWCEcNqC4DSbIJaGYDVI6BfRGKpMrNv3/vI7htf/8eQPAu//t3XwfBbf//7z+cEw70oTmSJUCdf+Gcor9/v0njkgQGWiuSXNh+YCAhSX4QgLFZvSuAQfTBEy455e/PXDEQQ0h+1o6/ILAPYU4mkHurDQjO/gNL/a2TQUiyXfkLBf9BxKcQRiQXMBjOg0q+uXC8cQlalDCwZ7z5+3fJoww7hoEGAMUNp28BRiGTAAAAAElFTkSuQmCC"
}
}
]
}

接著我們就可以送出請求,使用我們訓練好的mnist模型做預測吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ MODEL_NAME=mnist
$ CLUSTER_IP=127.0.0.1:8080 # 要另外開一個 terminal 做 port-forward,才能使用 Kubeflow 服務
$ SERVICE_HOSTNAME=$(kubectl get inferenceservice ${MODEL_NAME} -n kubeflow -o jsonpath='{.status.url}' | cut -d "/" -f 3)
$ INPUT_PATH=@./input.json

# 利用 KFServing 提供的 endpoint 來使用模型進行預測
$ curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to 127.0.0.1 (127.0.0.1) port 8080 (#0)
> POST /v1/models/mnist:predict HTTP/1.1
> Host: mnist.kubeflow.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Length: 541
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 541 out of 541 bytes
< HTTP/1.1 200 OK
< content-length: 189
< content-type: text/html; charset=UTF-8
< date: Mon, 20 Jan 2020 20:07:11 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 47
<
* Connection #0 to host 127.0.0.1 left intact
{"predictions": [{
"predictions": [0.219836846, 5.65285372e-05, 0.183175534, 0.140095666, 0.00257251319, 0.432028651, 0.00421907, 0.00483499933, 0.0129189081, 0.00026121721],
"classes": 5 # 預測為 5,可見模型不太準確,可以再換個超參數重新訓練看看
}]}* Closing connection 0

小結

以上就是使用 Kubeflow 提供的 Notebook Servers 、 tf-operator 、 KFServing 來完成一個典型的模型開發流程。可以發現還有許多不足的地方,

  1. 比方說目前的模型不夠精準,可能需要進行 Hyperparameter Tuning , Kubeflow 提出了 Katib 服務。
  2. 開發流程的每一個步驟仍然需要手動的部署與執行,且步驟與步驟之間缺乏銜接,使用者體驗仍然不夠流暢。 Kubeflow 提出了 Pipeline 來建立一個工作流,使用者可以在一個工作流完成每一個階段的部署、執行與驗證。
  3. Jupyterhub Notebook 開發完模型後,使用者必須手動打包 Container image 來供後續使用, Kubeflow 提出了 Fairing ,使用者可以從 Notebook 直接打包 image 。

下面這張圖是完整個 Kubeflow Scope

而 Kubeflow 將在近期推出 v1.0 ,而此版本最重要的就是確保 Kubeflow Community 認為最重要的 Custom User Journey 可用且可靠。

本篇的教學已經基本涵括了大部分的 Custom User Journey , 將會在接下來幾篇針對 Pipeline 、與其餘服務做介紹與使用教學。