diff --git a/Tiltfile b/Tiltfile index d7cc2bf1f9a7..0a764259b0f9 100644 --- a/Tiltfile +++ b/Tiltfile @@ -330,7 +330,7 @@ def deploy_observability(): if "loki" in settings.get("deploy_observability", []): k8s_yaml(read_file("./.tiltbuild/yaml/loki.observability.yaml"), allow_duplicates = True) - k8s_resource(workload = "loki", extra_pod_selectors = [{"app": "loki"}], labels = ["observability"]) + k8s_resource(workload = "loki", port_forwards = "3100:3100", extra_pod_selectors = [{"app": "loki"}], labels = ["observability"]) if "grafana" in settings.get("deploy_observability", []): k8s_yaml(read_file("./.tiltbuild/yaml/grafana.observability.yaml"), allow_duplicates = True) diff --git a/hack/observability/grafana/values.yaml b/hack/observability/grafana/values.yaml index 117b8a8e6dd8..8b4387a8d5e7 100644 --- a/hack/observability/grafana/values.yaml +++ b/hack/observability/grafana/values.yaml @@ -1,5 +1,9 @@ # Configuration for grafana chart, see https://github.com/grafana/helm-charts/tree/main/charts/grafana +# Set a password explicitly to avoid infinite tilt reloads because +# of a random password. +adminPassword: admin + grafana.ini: # Disable the grafana login form. auth: diff --git a/hack/tools/e2e-log-sync/README.md b/hack/tools/e2e-log-sync/README.md new file mode 100644 index 000000000000..ae4c29d7d120 --- /dev/null +++ b/hack/tools/e2e-log-sync/README.md @@ -0,0 +1,33 @@ + +# E2e log sync + +## Prerequisites + +Start the Tilt development environment via `tilt up`. + +*Notes*: +* If you only want to see imported logs, disable promtail. +* If you want to drop all logs from Loki, just delete the Loki Pod in the `observability` namespace. + +## Import logs + +Imports logs into Loki: +```bash +go run ./hack/tools/e2e-log-sync --bucket=kubernetes-jenkins --controller-folder=pr-logs/pull/kubernetes-sigs_cluster-api/6150/pull-cluster-api-e2e-main/1496099075710259200/artifacts/clusters/bootstrap/controllers +``` + +## View logs + +Now the logs are available: +* via Grafana: `http://localhost:3001/explore` +* via the Loki `logcli`: + ```bash + logcli query '{app="capd-controller-manager"}' --timezone=UTC --from="2022-02-22T10:00:00Z" + ``` + +## Caveats + +* Make sure you query the correct time range. +* Right now it takes ~5m until the logs can be queried in Loki + * Maybe it's https://community.grafana.com/t/how-to-build-a-loki-cluster-and-query-logs-instantly/45995/5 + * TODO check tail --delay-for diff --git a/hack/tools/e2e-log-sync/main.go b/hack/tools/e2e-log-sync/main.go new file mode 100644 index 000000000000..b9596f774709 --- /dev/null +++ b/hack/tools/e2e-log-sync/main.go @@ -0,0 +1,231 @@ +//go:build tools +// +build tools + +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "net/http" + "os" + "strconv" + "strings" + "time" + + "cloud.google.com/go/storage" + "github.com/valyala/fastjson" + "google.golang.org/api/iterator" + "google.golang.org/api/option" + "k8s.io/klog/v2" +) + +var ( + // FIXME + bucket = flag.String("bucket", "kubernetes-jenkins", "Bucket to download the logs from.") + controllerFolder = flag.String("controller-folder", "pr-logs/pull/kubernetes-sigs_cluster-api/6150/pull-cluster-api-e2e-main/1496099075710259200/artifacts/clusters/bootstrap/controllers", "Folder to get the controller-logs from.") + lokiURL = flag.String("loki-url", "http://localhost:3100/loki/api/v1/push", "URL to push the logs to.") +) + +func main() { + flag.Parse() + + if err := run(); err != nil { + fmt.Printf("Error occured: %v\n", err) + os.Exit(1) + } +} + +type logData struct { + logFile string + logMetadata string +} + +func run() error { + ctx := context.Background() + client, err := storage.NewClient(ctx, option.WithoutAuthentication()) + if err != nil { + return fmt.Errorf("storage.NewClient: %v", err) + } + defer client.Close() + + logs, err := getLogData(ctx, client, *bucket, *controllerFolder) + + for _, ld := range logs { + logFile, err := downloadFile(ctx, client, *bucket, ld.logFile) + if err != nil { + return err + } + logMetadata, err := downloadFile(ctx, client, *bucket, ld.logMetadata) + if err != nil { + return err + } + + metadata := map[string]string{} + if err := json.Unmarshal(logMetadata, &metadata); err != nil { + return err + } + + klog.Infof("Uploading logs from: %s", ld.logFile) + // FIXME: batch log lines. Just pushing individual lines for now for debugging. + for _, logLine := range strings.Split(string(logFile), "\n") { + streams := Streams{} + + lineMetadata := map[string]string{} + for k, v := range metadata { + lineMetadata[k] = v + } + + s := Stream{ + Stream: lineMetadata, + } + + parsedLogLine, err := fastjson.Parse(logLine) + if err != nil { + continue + } + if !parsedLogLine.Exists("ts") { + continue + } + + tsMilli, err := parsedLogLine.Get("ts").Float64() + if err != nil { + return err + } + tsNano := tsMilli * 1000 * 1000 + + if parsedLogLine.Exists("cluster") { + cluster := parsedLogLine.Get("cluster").String() + cluster, err = strconv.Unquote(cluster) + if err != nil { + return err + } + s.Stream["cluster"] = cluster + } + if parsedLogLine.Exists("machine") { + machine := parsedLogLine.Get("machine").String() + machine, err = strconv.Unquote(machine) + if err != nil { + return err + } + s.Stream["machine"] = machine + } + + s.Values = append(s.Values, []string{ + strconv.Itoa(int(tsNano)), + logLine, + }) + + streams.Streams = append(streams.Streams, s) + + body, err := json.Marshal(streams) + if err != nil { + return err + } + + // See: https://grafana.com/docs/loki/latest/api/#post-lokiapiv1push + req, err := http.NewRequest(http.MethodPost, *lokiURL, bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + fmt.Println(resp.StatusCode) + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + // move this into separate func + if err := resp.Body.Close(); err != nil { + return err + } + fmt.Println(string(respBody)) + } + } + + return nil +} + +type Streams struct { + Streams []Stream `json:"streams"` +} + +type Stream struct { + Stream map[string]string `json:"stream"` + Values [][]string `json:"values"` +} + +func getLogData(ctx context.Context, client *storage.Client, bucket, controllerFolder string) (map[string]logData, error) { + data := map[string]logData{} + query := &storage.Query{ + Prefix: controllerFolder, + Delimiter: "", + } + it := client.Bucket(bucket).Objects(ctx, query) + for { + attrs, err := it.Next() + if err != nil { + if err == iterator.Done { + break + } + return nil, err + } + if !strings.HasSuffix(attrs.Name, "manager.log") && !strings.HasSuffix(attrs.Name, "manager.json") { + continue + } + dir := attrs.Name[:strings.LastIndex(attrs.Name, "/")] + + ld, _ := data[dir] + if strings.HasSuffix(attrs.Name, "manager.log") { + ld.logFile = attrs.Name + } else { + ld.logMetadata = attrs.Name + } + data[dir] = ld + fmt.Println(attrs.Prefix + attrs.Name) + } + + return data, nil +} + +// downloadFile downloads an object. +func downloadFile(ctx context.Context, client *storage.Client, bucket, object string) ([]byte, error) { + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + rc, err := client.Bucket(bucket).Object(object).NewReader(ctx) + if err != nil { + return nil, fmt.Errorf("Object(%q).NewReader: %v", object, err) + } + defer rc.Close() + + data, err := ioutil.ReadAll(rc) + if err != nil { + return nil, fmt.Errorf("ioutil.ReadAll: %v", err) + } + return data, nil +}