diff --git a/config/config.go b/config/config.go index c58f16d7..2bb78f6b 100644 --- a/config/config.go +++ b/config/config.go @@ -11,13 +11,14 @@ import ( type ( RegistryConfig struct { - Environment string `mapstructure:"environment"` AuthConfig AuthConfig `mapstructure:"auth_config"` + LogConfig LogConfig `mapstructure:"log_config"` SkynetConfig SkynetConfig `mapstructure:"skynet_config"` - Host string `mapstructure:"host"` + Environment string `mapstructure:"environment"` DNSAddress string `mapstructure:"dns_address"` SkynetPortalURL string `mapstructure:"skynet_portal_url"` SigningSecret string `mapstructure:"signing_secret"` + Host string `mapstructure:"host"` Port uint `mapstructure:"port"` Debug bool `mapstructure:"debug"` } @@ -31,6 +32,14 @@ type ( ApiKey string `mapstructure:"api_key"` CustomUserAgent string `mapstructure:"custom_user_agent"` } + + LogConfig struct { + Service string `mapstructure:"service"` + Endpoint string `mapstructure:"endpoint"` + AuthMethod string `mapstructure:"auth_method"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + } ) func (r *RegistryConfig) Address() string { @@ -60,6 +69,13 @@ func LoadFromENV() (*RegistryConfig, error) { AuthConfig: AuthConfig{ SupportedServices: make(map[string]bool), }, + LogConfig: LogConfig{ + Service: viper.GetString("LOG_SERVICE_NAME"), + Endpoint: viper.GetString("LOG_SERVICE_HOST"), + AuthMethod: viper.GetString("LOG_SERVICE_AUTH_KIND"), + Username: viper.GetString("LOG_SERVICE_USER"), + Password: viper.GetString("LOG_SERVICE_PASSWORD"), + }, } for _, service := range strings.Split(viper.GetString("SUPPORTED_SERVICES"), ",") { @@ -82,11 +98,18 @@ func LoadFromENV() (*RegistryConfig, error) { func (r *RegistryConfig) Endpoint() string { switch r.Environment { - case "dev", "devel", "development", "local": + case Dev, Local: return fmt.Sprintf("http://%s:%d", r.Host, r.Port) - case "stage", "production": + case Prod, Stage: return fmt.Sprintf("https://%s", r.DNSAddress) default: return fmt.Sprintf("http://%s:%d", r.Host, r.Port) } } + +const ( + Prod = "production" + Stage = "stage" + Dev = "development" + Local = "local" +) diff --git a/env-vars.example b/env-vars.example index f60e063d..5e30ff7f 100644 --- a/env-vars.example +++ b/env-vars.example @@ -6,3 +6,6 @@ OPEN_REGISTRY_SKYNET_PORTAL_URL=https://siasky.net OPEN_REGISTRY_SIGNING_SECRET="3tYnaKp@^%hbQA%J&x3cX!r2#mK%EBfAbTvPMv5CU2DP7bAoQGnUfT2&dW" OPEN_REGISTRY_SUPPORTED_SERVICES=github,token OPEN_REGISTRY_ENVIRONMENT=local +OPEN_REGISTRY_LOG_SERVICE_NAME=grafana-loki +OPEN_REGISTRY_LOG_SERVICE_HOST=http://0.0.0.0:9880/app.log +OPEN_REGISTRY_LOG_SERVICE_AUTH_KIND=basic diff --git a/go.mod b/go.mod index ff34cdbc..75998ec7 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/spf13/viper v1.8.1 github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 + github.com/google/uuid v1.3.0 ) require ( diff --git a/go.sum b/go.sum index 268dffa6..5ef77553 100644 --- a/go.sum +++ b/go.sum @@ -229,6 +229,8 @@ github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLe github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= diff --git a/main.go b/main.go index 179e948d..8dbeaf60 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,6 @@ package main import ( - "log" "os" "github.com/containerish/OpenRegistry/auth" @@ -11,6 +10,7 @@ import ( "github.com/containerish/OpenRegistry/router" "github.com/containerish/OpenRegistry/skynet" "github.com/containerish/OpenRegistry/telemetry" + fluentbit "github.com/containerish/OpenRegistry/telemetry/fluent-bit" "github.com/fatih/color" "github.com/labstack/echo/v4" ) @@ -34,13 +34,19 @@ func main() { authSvc := auth.New(localCache, cfg) skynetClient := skynet.NewClient(cfg) - l := telemetry.SetupLogger() - reg, err := registry.NewRegistry(skynetClient, l, localCache, e.Logger) + log := telemetry.SetupLogger() + fluentBitCollector, err := fluentbit.New(cfg) + if err != nil { + color.Red("error initializing fluentbit collector: %s\n", err) + os.Exit(1) + } + + reg, err := registry.NewRegistry(skynetClient, log, localCache, e.Logger, fluentBitCollector) if err != nil { e.Logger.Errorf("error creating new container registry: %s", err) return } router.Register(e, reg, authSvc, localCache) - log.Println(e.Start(cfg.Address())) + log.Fatal().Msgf("error starting server: %s\n", e.Start(cfg.Address())) } diff --git a/registry/v2/blobs.go b/registry/v2/blobs.go index 61fc16c4..18b7b6e2 100644 --- a/registry/v2/blobs.go +++ b/registry/v2/blobs.go @@ -37,12 +37,14 @@ func (b *blobs) HEAD(ctx echo.Context) error { "skynet": "skynet link not found", } errMsg := b.errorResponse(RegistryErrorCodeManifestBlobUnknown, err.Error(), details) + b.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } size, ok := b.registry.skynet.Metadata(layerRef.Skylink) if !ok { errMsg := b.errorResponse(RegistryErrorCodeManifestBlobUnknown, "Manifest does not exist", nil) + b.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -63,6 +65,7 @@ func (b *blobs) UploadBlob(ctx echo.Context) error { "stream upload after first write are not allowed", nil, ) + b.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -85,11 +88,13 @@ func (b *blobs) UploadBlob(ctx echo.Context) error { "contentRange": contentRange, } errMsg := b.errorResponse(RegistryErrorCodeBlobUploadUnknown, err.Error(), details) + b.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusRequestedRangeNotSatisfiable, errMsg) } if start != len(b.uploads[uuid]) { errMsg := b.errorResponse(RegistryErrorCodeBlobUploadUnknown, "content range mismatch", nil) + b.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusRequestedRangeNotSatisfiable, errMsg) } @@ -101,6 +106,7 @@ func (b *blobs) UploadBlob(ctx echo.Context) error { "error while creating new buffer from existing blobs", nil, ) + b.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } // 10 ctx.Request().Body.Close() diff --git a/registry/v2/registry.go b/registry/v2/registry.go index 66111ccc..fb3e2b1a 100644 --- a/registry/v2/registry.go +++ b/registry/v2/registry.go @@ -15,6 +15,7 @@ import ( skynetsdk "github.com/NebulousLabs/go-skynet/v2" "github.com/containerish/OpenRegistry/cache" "github.com/containerish/OpenRegistry/skynet" + fluentbit "github.com/containerish/OpenRegistry/telemetry/fluent-bit" "github.com/containerish/OpenRegistry/types" "github.com/docker/distribution/uuid" "github.com/labstack/echo/v4" @@ -26,16 +27,19 @@ func NewRegistry( logger zerolog.Logger, c cache.Store, echoLogger echo.Logger, + fluentBit fluentbit.FluentBit, ) (Registry, error) { r := ®istry{ - log: logger, - debug: true, - skynet: skynetClient, + log: logger, + debug: true, + fluentbit: fluentBit, + skynet: skynetClient, b: blobs{ - mutex: sync.Mutex{}, - contents: map[string][]byte{}, - uploads: map[string][]byte{}, - layers: map[string][]string{}, + mutex: sync.Mutex{}, + contents: map[string][]byte{}, + uploads: map[string][]byte{}, + layers: map[string][]string{}, + fluentbit: fluentBit, }, localCache: c, echoLogger: echoLogger, @@ -52,14 +56,25 @@ func NewRegistry( func (r *registry) Catalog(ctx echo.Context) error { bz, err := r.localCache.ListAll() if err != nil { + logMsg := echo.Map{ + "error": err.Error(), + } + + bz, err = json.Marshal(logMsg) + if err == nil { + r.fluentbit.Send(bz) + } + return ctx.JSON(http.StatusBadRequest, echo.Map{ "error": err.Error(), }) } + var md []types.Metadata err = json.Unmarshal(bz, &md) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeTagInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } @@ -97,6 +112,7 @@ func (r *registry) DeleteTagOrManifest(ctx echo.Context) error { "digest": ref, } errMsg := r.errorResponse(RegistryErrorCodeManifestUnknown, err.Error(), details) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -113,25 +129,43 @@ func (r *registry) DeleteLayer(ctx echo.Context) error { bz, err := r.localCache.Get([]byte(namespace)) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } if err = json.Unmarshal(bz, &m); err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } } err = r.localCache.DeleteLayer(namespace, dig) if err != nil { - return ctx.JSON(http.StatusInternalServerError, echo.Map{ - "error": err.Error(), - }) + logMsg := echo.Map{ + "error": err.Error(), + "caller": "DeleteLayer", + } + + bz, err := json.Marshal(logMsg) + if err == nil { + r.fluentbit.Send(bz) + } + + return ctx.JSONBlob(http.StatusInternalServerError, bz) } if err = r.localCache.DeleteDigest(dig); err != nil { - return ctx.JSON(http.StatusInternalServerError, echo.Map{ - "error": err.Error(), - }) + logMsg := echo.Map{ + "error": err.Error(), + "caller": "DeleteLayer", + } + + bz, err := json.Marshal(logMsg) + if err == nil { + r.fluentbit.Send(bz) + } + + return ctx.JSONBlob(http.StatusInternalServerError, bz) } return ctx.NoContent(http.StatusAccepted) @@ -146,13 +180,20 @@ func (r *registry) MonolithicUpload(ctx echo.Context) error { bz, err := io.ReadAll(ctx.Request().Body) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } ctx.Request().Body.Close() link, err := r.skynet.Upload(namespace, digest, bz) if err != nil { - return err + detail := echo.Map{ + "error": err.Error(), + "caller": "MonolithicUpload", + } + errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), detail) + r.fluentbit.Send(errMsg) + return ctx.JSONBlob(http.StatusInternalServerError, bz) } metadata := types.Metadata{ @@ -167,8 +208,10 @@ func (r *registry) MonolithicUpload(ctx echo.Context) error { err = r.localCache.Update([]byte(namespace), metadata.Bytes()) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } + locationHeader := link ctx.Response().Header().Set("Location", locationHeader) return ctx.NoContent(http.StatusCreated) @@ -183,6 +226,7 @@ func (r *registry) CompleteUpload(ctx echo.Context) error { bz, err := io.ReadAll(ctx.Request().Body) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeDigestInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } _ = ctx.Request().Body.Close() @@ -197,7 +241,7 @@ func (r *registry) CompleteUpload(ctx echo.Context) error { "headerDigest": dig, "serverSideDigest": ourHash, "bodyDigest": digest(bz), } errMsg := r.errorResponse(RegistryErrorCodeDigestInvalid, "digest mismatch", details) - r.debugf(details) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -205,10 +249,12 @@ func (r *registry) CompleteUpload(ctx echo.Context) error { skylink, err := r.skynet.Upload(blobNamespace, dig, buf.Bytes()) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusRequestedRangeNotSatisfiable, errMsg) } if err := r.localCache.SetDigest(ourHash, skylink); err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } @@ -227,6 +273,7 @@ func (r *registry) CompleteUpload(ctx echo.Context) error { if err = r.localCache.Update([]byte(namespace), val.Bytes()); err != nil { errMsg := r.errorResponse(RegistryErrorCodeUnsupported, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } @@ -257,8 +304,8 @@ func (r *registry) ManifestExists(ctx echo.Context) error { "error": err.Error(), } - r.debugf(logMsg(details)) errMsg := r.errorResponse(RegistryErrorCodeManifestBlobUnknown, err.Error(), details) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -270,16 +317,14 @@ func (r *registry) ManifestExists(ctx echo.Context) error { } r.debugf(lm) errMsg := r.errorResponse(RegistryErrorCodeManifestBlobUnknown, "Manifest does not exist", nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } bz, err := r.localCache.Get([]byte(namespace)) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestInvalid, err.Error(), nil) - lm := logMsg{ - "errorGetCache": fmt.Sprintf("%s\n", errMsg), - } - r.debugf(lm) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -290,12 +335,14 @@ func (r *registry) ManifestExists(ctx echo.Context) error { "errorUnmarshal": fmt.Sprintf("%s\n", errMsg), } r.debugf(lm) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } manifest, err := md.GetManifestByRef(ref) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestUnknown, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -306,6 +353,7 @@ func (r *registry) ManifestExists(ctx echo.Context) error { } r.debugf(details) errMsg := r.errorResponse(RegistryErrorCodeManifestInvalid, "manifest digest does not match", nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -332,30 +380,35 @@ func (r *registry) PullManifest(ctx echo.Context) error { bz, err := r.localCache.Get([]byte(namespace)) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestUnknown, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } var md types.Metadata if err = json.Unmarshal(bz, &md); err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestUnknown, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } skynetLink, err := r.localCache.ResolveManifestRef(namespace, ref) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestUnknown, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } resp, err := r.skynet.Download(skynetLink) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } bz, err = io.ReadAll(resp) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } _ = resp.Close() @@ -363,6 +416,7 @@ func (r *registry) PullManifest(ctx echo.Context) error { manifest, err := md.GetManifestByRef(ref) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestUnknown, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -381,6 +435,7 @@ func (r *registry) PushManifest(ctx echo.Context) error { bz, err := io.ReadAll(ctx.Request().Body) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } ctx.Request().Body.Close() @@ -390,6 +445,7 @@ func (r *registry) PushManifest(ctx echo.Context) error { var manifest ManifestList if err = json.Unmarshal(bz, &manifest); err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -397,6 +453,7 @@ func (r *registry) PushManifest(ctx echo.Context) error { skylink, err := r.skynet.Upload(mfNamespace, dig, bz) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestBlobUnknown, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -418,6 +475,7 @@ func (r *registry) PushManifest(ctx echo.Context) error { if err = r.localCache.Update([]byte(namespace), metadata.Bytes()); err != nil { errMsg := r.errorResponse(RegistryErrorCodeManifestInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -437,12 +495,14 @@ func (r *registry) ListTags(ctx echo.Context) error { l, err := r.localCache.ListWithPrefix([]byte(namespace)) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeTagInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } var md types.Metadata err = json.Unmarshal(l, &md) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeTagInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } var tags []string @@ -453,6 +513,7 @@ func (r *registry) ListTags(ctx echo.Context) error { n, err := strconv.ParseInt(limit, 10, 32) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeTagInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } if n > 0 { @@ -462,6 +523,7 @@ func (r *registry) ListTags(ctx echo.Context) error { tags = []string{} } } + sort.Strings(tags) return ctx.JSON(http.StatusOK, echo.Map{ "name": namespace, @@ -483,6 +545,7 @@ func (r *registry) PullLayer(ctx echo.Context) error { skynetLink, err := r.localCache.GetSkynetURL(namespace, clientDigest) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } layerRef = &types.LayerRef{ @@ -497,6 +560,7 @@ func (r *registry) PullLayer(ctx echo.Context) error { } e := fmt.Errorf("skylink is empty").Error() errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, e, detail) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -506,12 +570,14 @@ func (r *registry) PullLayer(ctx echo.Context) error { "error": err.Error(), } errMsg := r.errorResponse(RegistryErrorCodeBlobUnknown, err.Error(), detail) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } bz, err := io.ReadAll(resp) if err != nil { errMsg := r.errorResponse(RegistryErrorCodeBlobUploadInvalid, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } _ = resp.Close() @@ -527,6 +593,7 @@ func (r *registry) PullLayer(ctx echo.Context) error { "client digest is different than computed digest", details, ) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } @@ -546,7 +613,6 @@ func (r *registry) PushImage(ctx echo.Context) error { } func (r *registry) StartUpload(ctx echo.Context) error { - namespace := ctx.Param("username") + "/" + ctx.Param("imagename") clientDigest := ctx.QueryParam("digest") @@ -561,7 +627,18 @@ func (r *registry) StartUpload(ctx echo.Context) error { bz, err := io.ReadAll(ctx.Request().Body) if err != nil { - panic(err) + details := map[string]interface{}{ + "clientDigest": clientDigest, + "namespace": namespace, + } + errMsg := r.errorResponse( + RegistryErrorCodeBlobUploadInvalid, + "error while reading request body", + details, + ) + r.fluentbit.Send(errMsg) + return ctx.JSONBlob(http.StatusNotFound, errMsg) + } ctx.Request().Body.Close() // why defer? body is already read :) dig := digest(bz) @@ -576,6 +653,7 @@ func (r *registry) StartUpload(ctx echo.Context) error { "client digest does not meet computed digest", details, ) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusBadRequest, errMsg) } @@ -587,6 +665,7 @@ func (r *registry) StartUpload(ctx echo.Context) error { "digest": dig, } r.debugf(lm) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusRequestedRangeNotSatisfiable, errMsg) } @@ -605,6 +684,7 @@ func (r *registry) StartUpload(ctx echo.Context) error { link := r.getHttpUrlFromSkylink(skylink) if err = r.localCache.Update([]byte(namespace), val.Bytes()); err != nil { errMsg := r.errorResponse(RegistryErrorCodeUnsupported, err.Error(), nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusInternalServerError, errMsg) } ctx.Response().Header().Set("Location", link) @@ -642,6 +722,7 @@ func (r *registry) UploadProgress(ctx echo.Context) error { ctx.Response().Header().Set("Docker-Upload-UUID", uuid) return ctx.NoContent(http.StatusNoContent) } + locationHeader := fmt.Sprintf("/v2/%s/blobs/uploads/%s", namespace, uuid) ctx.Response().Header().Set("Location", locationHeader) ctx.Response().Header().Set("Range", fmt.Sprintf("bytes=0-%d", size)) @@ -660,6 +741,7 @@ func (r *registry) PushLayer(ctx echo.Context) error { // Must have a path of form /v2/{name}/blobs/{upload,sha256:} if len(elem) < 4 { errMsg := r.errorResponse(RegistryErrorCodeNameInvalid, "blobs must be attached to a repo", nil) + r.fluentbit.Send(errMsg) return ctx.JSONBlob(http.StatusNotFound, errMsg) } diff --git a/registry/v2/types.go b/registry/v2/types.go index 7f147fa7..526770e4 100644 --- a/registry/v2/types.go +++ b/registry/v2/types.go @@ -5,6 +5,7 @@ import ( "github.com/containerish/OpenRegistry/cache" "github.com/containerish/OpenRegistry/skynet" + fluentbit "github.com/containerish/OpenRegistry/telemetry/fluent-bit" "github.com/labstack/echo/v4" "github.com/rs/zerolog" ) @@ -91,14 +92,16 @@ type ( localCache cache.Store echoLogger echo.Logger mu *sync.RWMutex + fluentbit fluentbit.FluentBit } blobs struct { - mutex sync.Mutex - contents map[string][]byte - uploads map[string][]byte - layers map[string][]string - registry *registry + mutex sync.Mutex + contents map[string][]byte + uploads map[string][]byte + layers map[string][]string + fluentbit fluentbit.FluentBit + registry *registry } logMsg map[string]interface{} diff --git a/telemetry/fluent-bit/fluent-bit.toml.example b/telemetry/fluent-bit/fluent-bit.toml.example new file mode 100644 index 00000000..3871d586 --- /dev/null +++ b/telemetry/fluent-bit/fluent-bit.toml.example @@ -0,0 +1,19 @@ +[INPUT] + name http + host 0.0.0.0 + port 9880 + +[OUTPUT] + name stdout + match * + +[OUTPUT] + name loki + match * + line_format json + host logs-prod-us-central1.grafana.net + port 443 + tls.verify on + tls on + http_user + http_passwd diff --git a/telemetry/fluent-bit/fluent_bit.go b/telemetry/fluent-bit/fluent_bit.go new file mode 100644 index 00000000..09deca20 --- /dev/null +++ b/telemetry/fluent-bit/fluent_bit.go @@ -0,0 +1,147 @@ +package fluentbit + +import ( + "bytes" + "context" + "net/http" + "sync" + "time" + + "github.com/containerish/OpenRegistry/config" + "github.com/google/uuid" +) + +type ( + FluentBit interface { + Send(logBytes []byte) + } + + fluentBit struct { + wg sync.WaitGroup + client *http.Client + retryMessages map[string]retryLogMsg + gate chan struct{} + config *config.RegistryConfig + } + + retryLogMsg struct { + content []byte + count int64 + done bool + } +) + +func New(config *config.RegistryConfig) (FluentBit, error) { + httpClient := &http.Client{ + Timeout: time.Duration(time.Second * 30), + } + + fbClient := &fluentBit{ + client: httpClient, + config: config, + wg: sync.WaitGroup{}, + gate: make(chan struct{}, 5), + } + + go fbClient.retry() + + return fbClient, nil +} + +func (fb *fluentBit) Send(logBytes []byte) { + // don't send logs to grafana from local instances of OpenRegistry + if fb.config.Environment == config.Dev || fb.config.Environment == config.Local { + return + } + + body := bytes.NewBuffer(logBytes) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, fb.config.LogConfig.Endpoint, body) + if err != nil { + fb.queueForRetry(logBytes) + return + } + + logConfig := fb.config.LogConfig + + // set basic auth creds if auth is enabled + if logConfig.AuthMethod != "" { + req.SetBasicAuth(logConfig.Username, logConfig.Password) + } + + req.Header.Set("content-type", "application/json") + resp, err := fb.client.Do(req) + if err != nil { + fb.queueForRetry(logBytes) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusCreated { + fb.queueForRetry(logBytes) + } +} + +func (fb *fluentBit) queueForRetry(logBytes []byte) { + id := uuid.New() + + fb.retryMessages[id.String()] = retryLogMsg{ + content: logBytes, + count: 0, + done: false, + } + +} + +func (fb *fluentBit) retry() { + ticker := time.NewTicker(time.Second * 5) // sort of retry every 5 seconds + + // lets not do more than 5 req/second just to not flood our free instance of grafana cloud + for range ticker.C { + for id, logMsg := range fb.retryMessages { + fb.gate <- struct{}{} + fb.wg.Add(1) + go fb.retrier(logMsg.content, id) + } + } +} + +func (fb *fluentBit) retrier(logBytes []byte, id string) { + defer func() { + fb.wg.Done() + <-fb.gate + }() + + // TODO - (@jay-dee7) what to do then? maybe have a different way to ship these logs? like via promtail? + if msg, ok := fb.retryMessages[id]; ok && msg.count > 3 { + delete(fb.retryMessages, id) + return + } + + body := bytes.NewBuffer(logBytes) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, fb.config.LogConfig.Endpoint, body) + if err != nil { + fb.queueForRetry(logBytes) + return + } + + req.Header.Set("content-type", "application/json") + resp, err := fb.client.Do(req) + if err != nil { + fb.queueForRetry(logBytes) + return + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusCreated { + delete(fb.retryMessages, id) + return + } + + item := fb.retryMessages[id] + item.count++ + fb.retryMessages[id] = item +} diff --git a/telemetry/log.go b/telemetry/log.go index f28eb9e2..8c8dc267 100644 --- a/telemetry/log.go +++ b/telemetry/log.go @@ -11,6 +11,7 @@ import ( func SetupLogger() zerolog.Logger { zerolog.TimeFieldFormat = zerolog.TimeFormatUnix zerolog.SetGlobalLevel(zerolog.DebugLevel) + l := zerolog.New(os.Stdout) l.With().Caller().Logger() @@ -18,8 +19,10 @@ func SetupLogger() zerolog.Logger { } func EchoLogger() echo.MiddlewareFunc { - logFmt := "method=${method}, uri=${uri}, status=${status} " + - "latency=${latency}, bytes_in=${bytes_in}, bytes_out=${bytes_out}\n" + logFmt := `{"time":"${time_rfc3339_nano}","id":"${id}","remote_ip":"${remote_ip}",` + + `"host":"${host}","method":"${method}","uri":"${uri}","user_agent":"${user_agent}",` + + `"status":${status},"error":"${error}","latency":${latency},"latency_human":"${latency_human}"` + + `,"bytes_in":${bytes_in},"bytes_out":${bytes_out}}` + "\n" return middleware.LoggerWithConfig(middleware.LoggerConfig{ Skipper: func(echo.Context) bool {