Skip to content

Commit

Permalink
Improve Model Endpoint and Batch Job's Logging UX (#164)
Browse files Browse the repository at this point in the history
* Update lazy-log package

* Refactor stream logs API

* Introducing ContainerLogsView component

* Support logging for pyfunc image builder and batch job

* Fix batch job's image builder log. Support prefixing log with pod & container name

* Add batch job executor log

* Dockerfile: Add git so we Yarn installation can succeed

* Use node:14 as node-builder base image

* Colorized the pod + container in log

* Use actions/setup-node@v2 and node v14

* Update react-lazylog package to use gojekfarm to sovle yarn install issue

https://github.com/yarnpkg/yarn/issues/7212\#issuecomment-493720324

* We still need react-lazylog's prepare script.

so we revert to roman's react-lazylog version and use yarn install --network-concurrency 1

* Refactor stackdriver log

* Fix API's unit test first

* Add more test to cluster and log_service

* Fix UI wording

* Update swagger

* Make sure color lib turned on

* Add build-essential and etc isntallation on Mlflows' Dockerfile

* Update API test

* Use gojekfarm's react-lazylog fork

* Update how to close channel; getContainerLogs async

* Use request.Context() for termintation

* Fix API test

* Modularize pprof routes into a spearate function

* Address aria's review

* Use unbuffered channel for sending log line

* Periodically update component list and address reviews

* Simplify component refresh & log api context cancellation
  • Loading branch information
ariefrahmansyah authored Jul 29, 2021
1 parent c0a72fc commit 8740175
Show file tree
Hide file tree
Showing 33 changed files with 2,424 additions and 1,541 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ jobs:
steps:
- name: Checkout to the target branch
uses: actions/checkout@v2
- uses: actions/setup-node@v1
- uses: actions/setup-node@v2
with:
node-version: 12
node-version: 14
- name: Install dependencies
run: make init-dep-ui
- name: Lint UI files
Expand Down Expand Up @@ -64,9 +64,9 @@ jobs:
steps:
- name: Checkout to the target branch
uses: actions/checkout@v2
- uses: actions/setup-node@v1
- uses: actions/setup-node@v2
with:
node-version: 12
node-version: 14
- name: Install dependencies
run: make init-dep-ui
- name: Test UI files
Expand All @@ -86,9 +86,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-node@v1
- uses: actions/setup-node@v2
with:
node-version: 12
node-version: 14
- name: Install dependencies
run: make init-dep-ui
- name: Build UI static files
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ RUN go build -o bin/merlin_api ./cmd/api
# ============================================================
# Build stage 2: Build UI
# ============================================================
FROM node:14-alpine as node-builder
FROM node:14 as node-builder
WORKDIR /src/ui
COPY ui .
RUN yarn
RUN yarn install --network-concurrency 1
RUN yarn run build

# ============================================================
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ all: setup init-dep lint test clean build run
.PHONY: setup
setup:
@echo "> Setting up tools ..."
@test -x ${GOPATH}/bin/goimports || go get -u golang.org/x/tools/cmd/goimports
@test -x ${GOPATH}/bin/golint || go get -u golang.org/x/lint/golint
@test -x ${GOPATH}/bin/gotest || go get -u github.com/rakyll/gotest

Expand All @@ -24,7 +25,7 @@ init-dep: init-dep-ui init-dep-api
.PHONY: init-dep-ui
init-dep-ui:
@echo "> Initializing UI dependencies ..."
@cd ${UI_PATH} && yarn
@cd ${UI_PATH} && yarn install --network-concurrency 1

.PHONY: init-dep-api
init-dep-api:
Expand Down
66 changes: 37 additions & 29 deletions api/api/log_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package api

import (
"bufio"
"fmt"
"net/http"

Expand All @@ -34,6 +33,15 @@ type LogController struct {

// ReadLog parses log requests and fetches logs.
func (l *LogController) ReadLog(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

// Make sure that the writer supports flushing.
flusher, ok := w.(http.Flusher)
if !ok {
InternalServerError("Streaming unsupported!").WriteTo(w)
return
}

var query service.LogQuery
err := decoder.Decode(&query, r.URL.Query())
if err != nil {
Expand All @@ -42,38 +50,38 @@ func (l *LogController) ReadLog(w http.ResponseWriter, r *http.Request) {
return
}

res, err := l.LogService.ReadLog(&query)
if err != nil {
log.Errorf("Error while retrieving log %v", err)
InternalServerError(fmt.Sprintf("Error while retrieving log for container %s: %s", query.Name, err)).WriteTo(w)
return
}
logLineCh := make(chan string)
stopCh := make(chan struct{})

// send status code and content-type
w.Header().Set("Content-Type", "plain/text; charset=UTF-8")
w.WriteHeader(http.StatusOK)
// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")

// stream the response body
defer res.Close()
buff := bufio.NewReader(res)
for {
line, readErr := buff.ReadString('\n')
_, writeErr := w.Write([]byte(line))
if writeErr != nil {
// connection from caller is closed
return
}
go func() {
for {
select {
case <-ctx.Done():
close(stopCh)
return
case logLine := <-logLineCh:
// Write to the ResponseWriter
_, err := w.Write(([]byte(logLine)))
if err != nil {
InternalServerError(err.Error()).WriteTo(w)
return
}

// send the response over network
// although it's not guaranteed to reach client if it sits behind proxy
flusher, ok := w.(http.Flusher)
if ok {
flusher.Flush()
// Send the response over network
// although it's not guaranteed to reach client if it sits behind proxy
flusher.Flush()
}
}
}()

if readErr != nil {
// unable to read log from container anymore most likely EOF
return
}
if err := l.LogService.StreamLogs(logLineCh, stopCh, &query); err != nil {
InternalServerError(err.Error()).WriteTo(w)
return
}
}
4 changes: 1 addition & 3 deletions api/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ func NewRouter(appCtx AppContext) *mux.Router {
}

rawRoutes := []RawRoutes{
{
http.MethodGet, "/logs", http.HandlerFunc(logController.ReadLog), "ReadLogs",
},
{http.MethodGet, "/logs", http.HandlerFunc(logController.ReadLog), "ReadLogs"},
}

var authzMiddleware *middleware.Authorizer
Expand Down
28 changes: 14 additions & 14 deletions api/cluster/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,25 @@ func (cf *containerFetcher) GetContainers(namespace string, labelSelector string
containers := make([]*models.Container, 0)
for _, pod := range podList.Items {
for _, c := range pod.Spec.Containers {
container := &models.Container{
Name: c.Name,
PodName: pod.Name,
Namespace: pod.Namespace,
Cluster: cf.metadata.ClusterName,
GcpProject: cf.metadata.GcpProject,
}
container := models.NewContainer(
c.Name,
pod.Name,
pod.Namespace,
cf.metadata.ClusterName,
cf.metadata.GcpProject,
)

containers = append(containers, container)
}

for _, ic := range pod.Spec.InitContainers {
container := &models.Container{
Name: ic.Name,
PodName: pod.Name,
Namespace: pod.Namespace,
Cluster: cf.metadata.ClusterName,
GcpProject: cf.metadata.GcpProject,
}
container := models.NewContainer(
ic.Name,
pod.Name,
pod.Namespace,
cf.metadata.ClusterName,
cf.metadata.GcpProject,
)

containers = append(containers, container)
}
Expand Down
14 changes: 14 additions & 0 deletions api/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package cluster

import (
"io"
"time"

kfsv1alpha2 "github.com/kubeflow/kfserving/pkg/apis/serving/v1alpha2"
kfservice "github.com/kubeflow/kfserving/pkg/client/clientset/versioned/typed/serving/v1alpha2"
"github.com/kubeflow/kfserving/pkg/constants"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -35,6 +37,10 @@ import (
type Controller interface {
Deploy(modelService *models.Service) (*models.Service, error)
Delete(modelService *models.Service) (*models.Service, error)

ListPods(namespace, labelSelector string) (*v1.PodList, error)
StreamPodLogs(namespace, podName string, opts *v1.PodLogOptions) (io.ReadCloser, error)

ContainerFetcher
}

Expand Down Expand Up @@ -219,3 +225,11 @@ func (k *controller) waitInferenceServiceReady(service *kfsv1alpha2.InferenceSer
}
}
}

func (c *controller) ListPods(namespace, labelSelector string) (*v1.PodList, error) {
return c.clusterClient.Pods(namespace).List(metav1.ListOptions{LabelSelector: labelSelector})
}

func (c *controller) StreamPodLogs(namespace, podName string, opts *v1.PodLogOptions) (io.ReadCloser, error) {
return c.clusterClient.Pods(namespace).GetLogs(podName, opts).Stream()
}
58 changes: 58 additions & 0 deletions api/cluster/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,3 +698,61 @@ func isIn(container v1.Container, containers []*models.Container, podName string
}
return false
}

func Test_controller_ListPods(t *testing.T) {
namespace := "test-namespace"

v1Client := fake.NewSimpleClientset()
v1Client.PrependReactor(listMethod, podResource, func(action ktesting.Action) (bool, runtime.Object, error) {
return true, &v1.PodList{
Items: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-model-1-predictor-default-a",
Labels: map[string]string{
"serving.knative.dev/service": "test-model-1-predictor-default",
},
},
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{Name: "storage-initializer"},
},
Containers: []v1.Container{
{Name: "kfserving-container"},
{Name: "queue-proxy"},
{Name: "inferenceservice-logger"},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-model-1-predictor-default-b",
Labels: map[string]string{
"serving.knative.dev/service": "test-model-1-predictor-default",
},
},
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{Name: "storage-initializer"},
},
Containers: []v1.Container{
{Name: "kfserving-container"},
{Name: "queue-proxy"},
{Name: "inferenceservice-logger"},
},
},
},
}}, nil
})

ctl := &controller{
clusterClient: v1Client.CoreV1(),
}

podList, err := ctl.ListPods(namespace, "serving.knative.dev/service=test-model-1-predictor-default")

assert.Nil(t, err)
assert.Equal(t, 2, len(podList.Items))
assert.Equal(t, "test-model-1-predictor-default-a", podList.Items[0].ObjectMeta.Name)
assert.Equal(t, "test-model-1-predictor-default-b", podList.Items[1].ObjectMeta.Name)
}
52 changes: 51 additions & 1 deletion api/cluster/mocks/controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8740175

Please sign in to comment.