diff --git a/cmd/varlogadm/cli.go b/cmd/varlogadm/cli.go index 933627905..b10ddae42 100644 --- a/cmd/varlogadm/cli.go +++ b/cmd/varlogadm/cli.go @@ -13,6 +13,7 @@ import ( "github.com/kakao/varlog/internal/flags" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/log" + "github.com/kakao/varlog/pkg/util/telemetry" ) func newAdminApp() *cli.App { @@ -60,6 +61,14 @@ func newStartCommand() *cli.Command { flags.LogFileCompression, flags.LogHumanReadable, flags.LogLevel, + + // telemetry + flags.TelemetryExporter, + flags.TelemetryExporterStopTimeout, + flags.TelemetryOTLPEndpoint, + flags.TelemetryOTLPInsecure, + flags.TelemetryHost, + flags.TelemetryRuntime, }, } } @@ -84,6 +93,21 @@ func start(c *cli.Context) error { _ = logger.Sync() }() + meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "adm", clusterID.String()) + if err != nil { + return err + } + mp, stop, err := telemetry.NewMeterProvider(meterProviderOpts...) + if err != nil { + return err + } + telemetry.SetGlobalMeterProvider(mp) + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), c.Duration(flags.TelemetryExporterStopTimeout.Name)) + defer cancel() + _ = stop(ctx) + }() + mrMgr, err := mrmanager.New(context.TODO(), mrmanager.WithAddresses(c.StringSlice(flagMetadataRepository.Name)...), mrmanager.WithInitialMRConnRetryCount(c.Int(flagInitMRConnRetryCount.Name)), diff --git a/cmd/varlogmr/metadata_repository.go b/cmd/varlogmr/metadata_repository.go index 6052476fb..3eac6f00e 100644 --- a/cmd/varlogmr/metadata_repository.go +++ b/cmd/varlogmr/metadata_repository.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "os" "os/signal" @@ -14,6 +15,7 @@ import ( "github.com/kakao/varlog/internal/metarepos" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/log" + "github.com/kakao/varlog/pkg/util/telemetry" "github.com/kakao/varlog/pkg/util/units" ) @@ -53,10 +55,27 @@ func start(c *cli.Context) error { return err } + raftAddr := c.String(flagRaftAddr.Name) + nodeID := types.NewNodeIDFromURL(raftAddr) + meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "mr", nodeID.String()) + if err != nil { + return err + } + mp, stop, err := telemetry.NewMeterProvider(meterProviderOpts...) + if err != nil { + return err + } + telemetry.SetGlobalMeterProvider(mp) + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), c.Duration(flags.TelemetryExporterStopTimeout.Name)) + defer cancel() + _ = stop(ctx) + }() + opts := []metarepos.Option{ metarepos.WithClusterID(cid), metarepos.WithRPCAddress(c.String(flagRPCAddr.Name)), - metarepos.WithRaftAddress(c.String(flagRaftAddr.Name)), + metarepos.WithRaftAddress(raftAddr), metarepos.WithDebugAddress(c.String(flagDebugAddr.Name)), metarepos.WithRaftDirectory(c.String(flagRaftDir.Name)), metarepos.WithReplicationFactor(c.Int(flagReplicationFactor.Name)), @@ -123,10 +142,14 @@ func initCLI() *cli.App { flagReportCommitterWriteBufferSize.StringFlag(false, units.ToByteSizeString(metarepos.DefaultReportCommitterWriteBufferSize)), flagMaxTopicsCount, flagMaxLogStreamsCountPerTopic, - flagTelemetryCollectorName.StringFlag(false, metarepos.DefaultTelemetryCollectorName), - flagTelemetryCollectorEndpoint.StringFlag(false, metarepos.DefaultTelmetryCollectorEndpoint), - //flagLogDir.StringFlag(false, metarepos.DefaultLogDir), + // telemetry + flags.TelemetryExporter, + flags.TelemetryExporterStopTimeout, + flags.TelemetryOTLPEndpoint, + flags.TelemetryOTLPInsecure, + flags.TelemetryHost, + flags.TelemetryRuntime, // logger options flags.LogDir, diff --git a/cmd/varlogsn/cli.go b/cmd/varlogsn/cli.go index c63760202..ce11b5b53 100644 --- a/cmd/varlogsn/cli.go +++ b/cmd/varlogsn/cli.go @@ -1,8 +1,6 @@ package main import ( - "time" - "github.com/urfave/cli/v2" "github.com/kakao/varlog/internal/flags" @@ -87,10 +85,13 @@ func newStartCommand() *cli.Command { flags.LogHumanReadable, flags.LogLevel, - flagExporterType.StringFlag(false, "noop"), - flagExporterStopTimeout.DurationFlag(false, 5*time.Second), - flagOTLPExporterInsecure.BoolFlag(), - flagOTLPExporterEndpoint.StringFlag(false, ""), + // telemetry + flags.TelemetryExporter, + flags.TelemetryExporterStopTimeout, + flags.TelemetryOTLPEndpoint, + flags.TelemetryOTLPInsecure, + flags.TelemetryHost, + flags.TelemetryRuntime, }, } } diff --git a/cmd/varlogsn/flags.go b/cmd/varlogsn/flags.go index 5d61be69a..14c98037b 100644 --- a/cmd/varlogsn/flags.go +++ b/cmd/varlogsn/flags.go @@ -195,26 +195,4 @@ var ( EnvVars: []string{"STORAGE_METRICS_LOG_INTERVAL"}, Value: storage.DefaultMetricsLogInterval, } - - // flags for telemetry. - flagExporterType = flags.FlagDesc{ - Name: "exporter-type", - Usage: "exporter type: stdout, otlp or noop", - Envs: []string{"EXPORTER_TYPE"}, - } - flagExporterStopTimeout = flags.FlagDesc{ - Name: "expoter-stop-timeout", - Usage: "timeout for stopping exporter", - Envs: []string{"EXPORTER_STOP_TIMEOUT"}, - } - flagOTLPExporterInsecure = flags.FlagDesc{ - Name: "exporter-otlp-insecure", - Usage: "disable client transport security for the OTLP exporter", - Envs: []string{"EXPORTER_OTLP_INSECURE"}, - } - flagOTLPExporterEndpoint = flags.FlagDesc{ - Name: "exporter-otlp-endpoint", - Usage: "the endpoint that exporter connects", - Envs: []string{"EXPORTER_OTLP_ENDPOINT"}, - } ) diff --git a/cmd/varlogsn/varlogsn.go b/cmd/varlogsn/varlogsn.go index 353247674..7c59c9559 100644 --- a/cmd/varlogsn/varlogsn.go +++ b/cmd/varlogsn/varlogsn.go @@ -7,15 +7,9 @@ import ( "math" "os" "os/signal" - "strings" "syscall" "github.com/urfave/cli/v2" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" - "go.opentelemetry.io/otel/metric" - metricsdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.20.0" _ "go.uber.org/automaxprocs" "go.uber.org/multierr" "go.uber.org/zap" @@ -100,15 +94,19 @@ func start(c *cli.Context) error { logger = logger.Named("sn").With(zap.Uint32("cid", uint32(clusterID)), zap.Int32("snid", int32(storageNodeID))) - mp, stop, err := initTelemetry(context.Background(), c, storageNodeID) + meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "sn", storageNodeID.String()) + if err != nil { + return err + } + mp, stop, err := telemetry.NewMeterProvider(meterProviderOpts...) if err != nil { return err } telemetry.SetGlobalMeterProvider(mp) defer func() { - ctx, cancel := context.WithTimeout(context.Background(), c.Duration(flagExporterStopTimeout.Name)) + ctx, cancel := context.WithTimeout(context.Background(), c.Duration(flags.TelemetryExporterStopTimeout.Name)) defer cancel() - stop(ctx) + _ = stop(ctx) }() storageOpts, err := parseStorageOptions(c) @@ -178,54 +176,6 @@ func start(c *cli.Context) error { return g.Wait() } -func initTelemetry(ctx context.Context, c *cli.Context, snid types.StorageNodeID) (metric.MeterProvider, telemetry.StopMeterProvider, error) { - var ( - err error - exporter metricsdk.Exporter - shutdown telemetry.ShutdownExporter - ) - - res, err := resource.New(ctx, - resource.WithFromEnv(), - resource.WithHost(), - resource.WithAttributes( - semconv.ServiceName("sn"), - semconv.ServiceNamespace("varlog"), - semconv.ServiceInstanceID(snid.String()), - )) - if err != nil { - return nil, nil, err - } - - meterProviderOpts := []telemetry.MeterProviderOption{ - telemetry.WithResource(res), - telemetry.WithRuntimeInstrumentation(), - } - switch strings.ToLower(c.String(flagExporterType.Name)) { - case "stdout": - exporter, shutdown, err = telemetry.NewStdoutExporter() - case "otlp": - var opts []otlpmetricgrpc.Option - if c.Bool(flagOTLPExporterInsecure.Name) { - opts = append(opts, otlpmetricgrpc.WithInsecure()) - } - if !c.IsSet(flagOTLPExporterEndpoint.Name) { - return nil, nil, errors.New("no exporter endpoint") - } - opts = append(opts, otlpmetricgrpc.WithEndpoint(c.String(flagOTLPExporterEndpoint.Name))) - exporter, shutdown, err = telemetry.NewOLTPExporter(context.Background(), opts...) - } - if err != nil { - return nil, nil, err - } - - if exporter != nil { - meterProviderOpts = append(meterProviderOpts, telemetry.WithExporter(exporter, shutdown)) - } - - return telemetry.NewMeterProvider(meterProviderOpts...) -} - func parseStorageOptions(c *cli.Context) (opts []storage.Option, err error) { l0CompactionFileThreshold, err := getStorageDBFlagValues(c.IntSlice(flagStorageL0CompactionFileThreshold.Name)) if err != nil { diff --git a/internal/flags/telemetry.go b/internal/flags/telemetry.go new file mode 100644 index 000000000..3b96d371b --- /dev/null +++ b/internal/flags/telemetry.go @@ -0,0 +1,136 @@ +package flags + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/urfave/cli/v2" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + metricsdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + + "github.com/kakao/varlog/pkg/util/telemetry" +) + +const ( + CategoryTelemetry = "Telemetry:" + + TelemetryExporterNOOP = "noop" + TelemetryExporterStdout = "stdout" + TelemetryExporterOTLP = "otlp" + + DefaultTelemetryOTLPEndpoint = "localhost:4317" + + DefaultTelemetryStopTimeout = 3 * time.Second +) + +var ( + TelemetryExporter = &cli.StringFlag{ + Name: "telemetry-exporter", + Category: CategoryTelemetry, + Aliases: []string{"exporter-type"}, + Usage: fmt.Sprintf("Exporter type: %s, %s or %s.", TelemetryExporterNOOP, TelemetryExporterStdout, TelemetryExporterOTLP), + EnvVars: []string{"TELEMETRY_EXPORTER", "EXPORTER_TYPE"}, + Value: TelemetryExporterNOOP, + Action: func(_ *cli.Context, value string) error { + switch strings.ToLower(value) { + case TelemetryExporterNOOP, TelemetryExporterStdout, TelemetryExporterOTLP: + return nil + default: + return fmt.Errorf("invalid value \"%s\" for flag --telemetry-exporter", value) + } + }, + } + TelemetryOTLPEndpoint = &cli.StringFlag{ + Name: "telemetry-otlp-endpoint", + Category: CategoryTelemetry, + Aliases: []string{"exporter-otlp-endpoint"}, + Usage: "Endpoint for OTLP exporter.", + EnvVars: []string{"TELEMETRY_OTLP_ENDPOINT", "EXPORTER_OTLP_ENDPOINT"}, + Value: DefaultTelemetryOTLPEndpoint, + Action: func(c *cli.Context, value string) error { + if c.String(TelemetryExporter.Name) != TelemetryExporterOTLP || value != "" { + return nil + } + return errors.New("no value for flag --telemetry-otlp-endpoint") + }, + } + TelemetryOTLPInsecure = &cli.BoolFlag{ + Name: "telemetry-otlp-insecure", + Category: CategoryTelemetry, + Aliases: []string{"exporter-otlp-insecure"}, + Usage: "Disable gRPC client transport security for OTLP exporter.", + EnvVars: []string{"TELEMETRY_OTLP_INSECURE", "EXPORTER_OTLP_INSECURE"}, + } + TelemetryExporterStopTimeout = &cli.DurationFlag{ + Name: "telemetry-exporter-stop-timeout", + Category: CategoryTelemetry, + Aliases: []string{"expoter-stop-timeout"}, + Usage: "Timeout for stopping OTLP exporter.", + EnvVars: []string{"TELEMETRY_EXPORTER_STOP_TIMEOUT", "EXPORTER_STOP_TIMEOUT"}, + Value: DefaultTelemetryStopTimeout, + } + TelemetryHost = &cli.BoolFlag{ + Name: "telemetry-host", + Category: CategoryTelemetry, + Usage: "Export host metrics.", + EnvVars: []string{"TELEMETRY_HOST"}, + } + TelemetryRuntime = &cli.BoolFlag{ + Name: "telemetry-runtime", + Category: CategoryTelemetry, + Usage: "Export runtime metrics.", + EnvVars: []string{"TELEMETRY_RUNTIME"}, + } +) + +func ParseTelemetryFlags(ctx context.Context, c *cli.Context, serviceName, serviceInstanceID string) (opts []telemetry.MeterProviderOption, err error) { + const serviceNamespace = "varlog" + + res, err := resource.New(ctx, + resource.WithFromEnv(), + resource.WithHost(), + resource.WithAttributes( + semconv.ServiceName(serviceName), + semconv.ServiceNamespace(serviceNamespace), + semconv.ServiceInstanceID(serviceInstanceID), + )) + if err != nil { + return nil, err + } + opts = append(opts, telemetry.WithResource(res)) + + var exporter metricsdk.Exporter + + switch strings.ToLower(c.String(TelemetryExporter.Name)) { + case TelemetryExporterStdout: + exporter, err = telemetry.NewStdoutExporter() + case TelemetryExporterOTLP: + var opts []otlpmetricgrpc.Option + if c.Bool(TelemetryOTLPInsecure.Name) { + opts = append(opts, otlpmetricgrpc.WithInsecure()) + } + opts = append(opts, otlpmetricgrpc.WithEndpoint(c.String(TelemetryOTLPEndpoint.Name))) + exporter, err = telemetry.NewOLTPExporter(context.Background(), opts...) + case TelemetryExporterNOOP: + exporter = nil + } + if err != nil { + return nil, err + } + + opts = append(opts, telemetry.WithExporter(exporter)) + + if c.Bool(TelemetryHost.Name) { + opts = append(opts, telemetry.WithHostInstrumentation()) + } + if c.Bool(TelemetryRuntime.Name) { + opts = append(opts, telemetry.WithRuntimeInstrumentation()) + } + + return opts, nil +} diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index 87155f493..931eda9b8 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -276,7 +276,6 @@ func (mr *RaftMetadataRepository) Close() error { // FIXME (jun, pharrell): Stop gracefully mr.server.Stop() - mr.tmStub.close(context.TODO()) return nil } diff --git a/internal/metarepos/telemetry.go b/internal/metarepos/telemetry.go index 2f00d6bd7..3ee95176b 100644 --- a/internal/metarepos/telemetry.go +++ b/internal/metarepos/telemetry.go @@ -2,71 +2,23 @@ package metarepos import ( "context" - "strings" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/metric" - metricsdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.20.0" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/telemetry" ) type telemetryStub struct { - mp metric.MeterProvider - stop telemetry.StopMeterProvider - mb *metricsBag + mp metric.MeterProvider + mb *metricsBag } func newTelemetryStub(ctx context.Context, name string, nodeID types.NodeID, endpoint string) (*telemetryStub, error) { - // resources - res, err := resource.New(ctx, - resource.WithFromEnv(), - resource.WithHost(), - resource.WithAttributes( - semconv.ServiceName("mr"), - semconv.ServiceNamespace("varlog"), - semconv.ServiceInstanceID(nodeID.String()), - )) - if err != nil { - return nil, err - } - - var ( - exporter metricsdk.Exporter - shutdown telemetry.ShutdownExporter - meterProviderOpts = []telemetry.MeterProviderOption{ - telemetry.WithResource(res), - telemetry.WithHostInstrumentation(), - telemetry.WithRuntimeInstrumentation(), - } - ) - - switch strings.ToLower(name) { - case "stdout": - exporter, shutdown, err = telemetry.NewStdoutExporter() - case "otlp": - exporter, shutdown, err = telemetry.NewOLTPExporter(ctx, otlpmetricgrpc.WithInsecure(), otlpmetricgrpc.WithEndpoint(endpoint)) - } - if err != nil { - return nil, err - } - if exporter != nil { - meterProviderOpts = append(meterProviderOpts, telemetry.WithExporter(exporter, shutdown)) - } - - mp, stop, err := telemetry.NewMeterProvider(meterProviderOpts...) - if err != nil { - return nil, err - } - - telemetry.SetGlobalMeterProvider(mp) + mp := telemetry.GetGlobalMeterProvider() ts := &telemetryStub{ - mp: mp, - stop: stop, + mp: mp, } meter := ts.mp.Meter("varlog.mr") @@ -75,10 +27,6 @@ func newTelemetryStub(ctx context.Context, name string, nodeID types.NodeID, end return ts, nil } -func (ts *telemetryStub) close(ctx context.Context) { - ts.stop(ctx) -} - func newNopTelmetryStub() *telemetryStub { ts, _ := newTelemetryStub(context.Background(), "nop", types.InvalidNodeID, "") return ts diff --git a/pkg/util/telemetry/config.go b/pkg/util/telemetry/config.go index dc90a2207..9989ab7cc 100644 --- a/pkg/util/telemetry/config.go +++ b/pkg/util/telemetry/config.go @@ -7,9 +7,8 @@ import ( ) type meterProviderConfig struct { - resource *resource.Resource - exporter metricsdk.Exporter - shutdownExporter ShutdownExporter + resource *resource.Resource + exporter metricsdk.Exporter hostInstrumentation bool @@ -38,10 +37,9 @@ func WithResource(resource *resource.Resource) MeterProviderOption { } // WithExporter sets exporter and its shutdown function. -func WithExporter(exporter metricsdk.Exporter, shutdownExporter ShutdownExporter) MeterProviderOption { +func WithExporter(exporter metricsdk.Exporter) MeterProviderOption { return func(cfg *meterProviderConfig) { cfg.exporter = exporter - cfg.shutdownExporter = shutdownExporter } } diff --git a/pkg/util/telemetry/exporter.go b/pkg/util/telemetry/exporter.go index ff8aa7c77..54c6acf5a 100644 --- a/pkg/util/telemetry/exporter.go +++ b/pkg/util/telemetry/exporter.go @@ -3,30 +3,20 @@ package telemetry import ( "context" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" metricsdk "go.opentelemetry.io/otel/sdk/metric" ) -type ShutdownExporter func(context.Context) - -func NewStdoutExporter(opts ...stdoutmetric.Option) (metricsdk.Exporter, ShutdownExporter, error) { +func NewStdoutExporter(opts ...stdoutmetric.Option) (metricsdk.Exporter, error) { exp, err := stdoutmetric.New(opts...) - return exp, func(context.Context) {}, err + return exp, err } -func NewOLTPExporter(ctx context.Context, clientOpts ...otlpmetricgrpc.Option) (metricsdk.Exporter, ShutdownExporter, error) { +func NewOLTPExporter(ctx context.Context, clientOpts ...otlpmetricgrpc.Option) (metricsdk.Exporter, error) { exp, err := otlpmetricgrpc.New(ctx, clientOpts...) if err != nil { - return nil, func(context.Context) {}, err - } - - shutdown := func(ctx context.Context) { - if err := exp.Shutdown(ctx); err != nil { - otel.Handle(err) - } + return nil, err } - - return exp, shutdown, err + return exp, err } diff --git a/pkg/util/telemetry/metric_provider.go b/pkg/util/telemetry/metric_provider.go index d37e46d36..99cd8bee0 100644 --- a/pkg/util/telemetry/metric_provider.go +++ b/pkg/util/telemetry/metric_provider.go @@ -2,6 +2,7 @@ package telemetry import ( "context" + "errors" "go.opentelemetry.io/contrib/instrumentation/host" "go.opentelemetry.io/contrib/instrumentation/runtime" @@ -13,15 +14,17 @@ import ( // StopMeterProvider is the type for stop function of meter provider. // It stops both meter provider and exporter. -type StopMeterProvider func(context.Context) +type StopMeterProvider func(context.Context) error // NewMeterProvider creates a new meter provider and its stop function. func NewMeterProvider(opts ...MeterProviderOption) (metric.MeterProvider, StopMeterProvider, error) { cfg := newMeterProviderConfig(opts) + stop := func(ctx context.Context) error { return nil } + if cfg.exporter == nil { mp := noopmetric.NewMeterProvider() - return mp, func(context.Context) {}, nil + return mp, stop, nil } reader := metricsdk.NewPeriodicReader(cfg.exporter) @@ -33,18 +36,18 @@ func NewMeterProvider(opts ...MeterProviderOption) (metric.MeterProvider, StopMe if cfg.hostInstrumentation { if err := initHostInstrumentation(mp); err != nil { - return nil, func(context.Context) {}, err + return nil, stop, err } } if cfg.runtimeInstrumentation { if err := initRuntimeInstrumentation(mp, cfg.runtimeInstrumentationOpts); err != nil { - return nil, func(context.Context) {}, err + return nil, stop, err } } - stop := func(ctx context.Context) { - _ = mp.Shutdown(ctx) + stop = func(ctx context.Context) error { + return errors.Join(mp.Shutdown(ctx), cfg.exporter.Shutdown(ctx)) } return mp, stop, nil @@ -61,3 +64,7 @@ func initRuntimeInstrumentation(mp metric.MeterProvider, opts []runtime.Option) func SetGlobalMeterProvider(mp metric.MeterProvider) { otel.SetMeterProvider(mp) } + +func GetGlobalMeterProvider() metric.MeterProvider { + return otel.GetMeterProvider() +}