Skip to content

Commit

Permalink
feat(all): add common flags for telemetry (#494)
Browse files Browse the repository at this point in the history
  • Loading branch information
ijsong authored Jul 28, 2023
2 parents 4a138fa + fcacd1a commit 63355e9
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 172 deletions.
24 changes: 24 additions & 0 deletions cmd/varlogadm/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/kakao/varlog/internal/flags"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/log"
"github.com/kakao/varlog/pkg/util/telemetry"
)

func newAdminApp() *cli.App {
Expand Down Expand Up @@ -60,6 +61,14 @@ func newStartCommand() *cli.Command {
flags.LogFileCompression,
flags.LogHumanReadable,
flags.LogLevel,

// telemetry
flags.TelemetryExporter,
flags.TelemetryExporterStopTimeout,
flags.TelemetryOTLPEndpoint,
flags.TelemetryOTLPInsecure,
flags.TelemetryHost,
flags.TelemetryRuntime,
},
}
}
Expand All @@ -84,6 +93,21 @@ func start(c *cli.Context) error {
_ = logger.Sync()
}()

meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "adm", clusterID.String())
if err != nil {
return err
}
mp, stop, err := telemetry.NewMeterProvider(meterProviderOpts...)
if err != nil {
return err
}
telemetry.SetGlobalMeterProvider(mp)
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), c.Duration(flags.TelemetryExporterStopTimeout.Name))
defer cancel()
_ = stop(ctx)
}()

mrMgr, err := mrmanager.New(context.TODO(),
mrmanager.WithAddresses(c.StringSlice(flagMetadataRepository.Name)...),
mrmanager.WithInitialMRConnRetryCount(c.Int(flagInitMRConnRetryCount.Name)),
Expand Down
31 changes: 27 additions & 4 deletions cmd/varlogmr/metadata_repository.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/kakao/varlog/internal/metarepos"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/log"
"github.com/kakao/varlog/pkg/util/telemetry"
"github.com/kakao/varlog/pkg/util/units"
)

Expand Down Expand Up @@ -53,10 +55,27 @@ func start(c *cli.Context) error {
return err
}

raftAddr := c.String(flagRaftAddr.Name)
nodeID := types.NewNodeIDFromURL(raftAddr)
meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "mr", nodeID.String())
if err != nil {
return err
}
mp, stop, err := telemetry.NewMeterProvider(meterProviderOpts...)
if err != nil {
return err
}
telemetry.SetGlobalMeterProvider(mp)
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), c.Duration(flags.TelemetryExporterStopTimeout.Name))
defer cancel()
_ = stop(ctx)
}()

opts := []metarepos.Option{
metarepos.WithClusterID(cid),
metarepos.WithRPCAddress(c.String(flagRPCAddr.Name)),
metarepos.WithRaftAddress(c.String(flagRaftAddr.Name)),
metarepos.WithRaftAddress(raftAddr),
metarepos.WithDebugAddress(c.String(flagDebugAddr.Name)),
metarepos.WithRaftDirectory(c.String(flagRaftDir.Name)),
metarepos.WithReplicationFactor(c.Int(flagReplicationFactor.Name)),
Expand Down Expand Up @@ -123,10 +142,14 @@ func initCLI() *cli.App {
flagReportCommitterWriteBufferSize.StringFlag(false, units.ToByteSizeString(metarepos.DefaultReportCommitterWriteBufferSize)),
flagMaxTopicsCount,
flagMaxLogStreamsCountPerTopic,
flagTelemetryCollectorName.StringFlag(false, metarepos.DefaultTelemetryCollectorName),
flagTelemetryCollectorEndpoint.StringFlag(false, metarepos.DefaultTelmetryCollectorEndpoint),

//flagLogDir.StringFlag(false, metarepos.DefaultLogDir),
// telemetry
flags.TelemetryExporter,
flags.TelemetryExporterStopTimeout,
flags.TelemetryOTLPEndpoint,
flags.TelemetryOTLPInsecure,
flags.TelemetryHost,
flags.TelemetryRuntime,

// logger options
flags.LogDir,
Expand Down
13 changes: 7 additions & 6 deletions cmd/varlogsn/cli.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package main

import (
"time"

"github.com/urfave/cli/v2"

"github.com/kakao/varlog/internal/flags"
Expand Down Expand Up @@ -87,10 +85,13 @@ func newStartCommand() *cli.Command {
flags.LogHumanReadable,
flags.LogLevel,

flagExporterType.StringFlag(false, "noop"),
flagExporterStopTimeout.DurationFlag(false, 5*time.Second),
flagOTLPExporterInsecure.BoolFlag(),
flagOTLPExporterEndpoint.StringFlag(false, ""),
// telemetry
flags.TelemetryExporter,
flags.TelemetryExporterStopTimeout,
flags.TelemetryOTLPEndpoint,
flags.TelemetryOTLPInsecure,
flags.TelemetryHost,
flags.TelemetryRuntime,
},
}
}
22 changes: 0 additions & 22 deletions cmd/varlogsn/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,26 +195,4 @@ var (
EnvVars: []string{"STORAGE_METRICS_LOG_INTERVAL"},
Value: storage.DefaultMetricsLogInterval,
}

// flags for telemetry.
flagExporterType = flags.FlagDesc{
Name: "exporter-type",
Usage: "exporter type: stdout, otlp or noop",
Envs: []string{"EXPORTER_TYPE"},
}
flagExporterStopTimeout = flags.FlagDesc{
Name: "expoter-stop-timeout",
Usage: "timeout for stopping exporter",
Envs: []string{"EXPORTER_STOP_TIMEOUT"},
}
flagOTLPExporterInsecure = flags.FlagDesc{
Name: "exporter-otlp-insecure",
Usage: "disable client transport security for the OTLP exporter",
Envs: []string{"EXPORTER_OTLP_INSECURE"},
}
flagOTLPExporterEndpoint = flags.FlagDesc{
Name: "exporter-otlp-endpoint",
Usage: "the endpoint that exporter connects",
Envs: []string{"EXPORTER_OTLP_ENDPOINT"},
}
)
64 changes: 7 additions & 57 deletions cmd/varlogsn/varlogsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,9 @@ import (
"math"
"os"
"os/signal"
"strings"
"syscall"

"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/metric"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
_ "go.uber.org/automaxprocs"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -100,15 +94,19 @@ func start(c *cli.Context) error {

logger = logger.Named("sn").With(zap.Uint32("cid", uint32(clusterID)), zap.Int32("snid", int32(storageNodeID)))

mp, stop, err := initTelemetry(context.Background(), c, storageNodeID)
meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "sn", storageNodeID.String())
if err != nil {
return err
}
mp, stop, err := telemetry.NewMeterProvider(meterProviderOpts...)
if err != nil {
return err
}
telemetry.SetGlobalMeterProvider(mp)
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), c.Duration(flagExporterStopTimeout.Name))
ctx, cancel := context.WithTimeout(context.Background(), c.Duration(flags.TelemetryExporterStopTimeout.Name))
defer cancel()
stop(ctx)
_ = stop(ctx)
}()

storageOpts, err := parseStorageOptions(c)
Expand Down Expand Up @@ -178,54 +176,6 @@ func start(c *cli.Context) error {
return g.Wait()
}

func initTelemetry(ctx context.Context, c *cli.Context, snid types.StorageNodeID) (metric.MeterProvider, telemetry.StopMeterProvider, error) {
var (
err error
exporter metricsdk.Exporter
shutdown telemetry.ShutdownExporter
)

res, err := resource.New(ctx,
resource.WithFromEnv(),
resource.WithHost(),
resource.WithAttributes(
semconv.ServiceName("sn"),
semconv.ServiceNamespace("varlog"),
semconv.ServiceInstanceID(snid.String()),
))
if err != nil {
return nil, nil, err
}

meterProviderOpts := []telemetry.MeterProviderOption{
telemetry.WithResource(res),
telemetry.WithRuntimeInstrumentation(),
}
switch strings.ToLower(c.String(flagExporterType.Name)) {
case "stdout":
exporter, shutdown, err = telemetry.NewStdoutExporter()
case "otlp":
var opts []otlpmetricgrpc.Option
if c.Bool(flagOTLPExporterInsecure.Name) {
opts = append(opts, otlpmetricgrpc.WithInsecure())
}
if !c.IsSet(flagOTLPExporterEndpoint.Name) {
return nil, nil, errors.New("no exporter endpoint")
}
opts = append(opts, otlpmetricgrpc.WithEndpoint(c.String(flagOTLPExporterEndpoint.Name)))
exporter, shutdown, err = telemetry.NewOLTPExporter(context.Background(), opts...)
}
if err != nil {
return nil, nil, err
}

if exporter != nil {
meterProviderOpts = append(meterProviderOpts, telemetry.WithExporter(exporter, shutdown))
}

return telemetry.NewMeterProvider(meterProviderOpts...)
}

func parseStorageOptions(c *cli.Context) (opts []storage.Option, err error) {
l0CompactionFileThreshold, err := getStorageDBFlagValues(c.IntSlice(flagStorageL0CompactionFileThreshold.Name))
if err != nil {
Expand Down
136 changes: 136 additions & 0 deletions internal/flags/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package flags

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"

"github.com/kakao/varlog/pkg/util/telemetry"
)

const (
CategoryTelemetry = "Telemetry:"

TelemetryExporterNOOP = "noop"
TelemetryExporterStdout = "stdout"
TelemetryExporterOTLP = "otlp"

DefaultTelemetryOTLPEndpoint = "localhost:4317"

DefaultTelemetryStopTimeout = 3 * time.Second
)

var (
TelemetryExporter = &cli.StringFlag{
Name: "telemetry-exporter",
Category: CategoryTelemetry,
Aliases: []string{"exporter-type"},
Usage: fmt.Sprintf("Exporter type: %s, %s or %s.", TelemetryExporterNOOP, TelemetryExporterStdout, TelemetryExporterOTLP),
EnvVars: []string{"TELEMETRY_EXPORTER", "EXPORTER_TYPE"},
Value: TelemetryExporterNOOP,
Action: func(_ *cli.Context, value string) error {
switch strings.ToLower(value) {
case TelemetryExporterNOOP, TelemetryExporterStdout, TelemetryExporterOTLP:
return nil
default:
return fmt.Errorf("invalid value \"%s\" for flag --telemetry-exporter", value)
}
},
}
TelemetryOTLPEndpoint = &cli.StringFlag{
Name: "telemetry-otlp-endpoint",
Category: CategoryTelemetry,
Aliases: []string{"exporter-otlp-endpoint"},
Usage: "Endpoint for OTLP exporter.",
EnvVars: []string{"TELEMETRY_OTLP_ENDPOINT", "EXPORTER_OTLP_ENDPOINT"},
Value: DefaultTelemetryOTLPEndpoint,
Action: func(c *cli.Context, value string) error {
if c.String(TelemetryExporter.Name) != TelemetryExporterOTLP || value != "" {
return nil
}
return errors.New("no value for flag --telemetry-otlp-endpoint")
},
}
TelemetryOTLPInsecure = &cli.BoolFlag{
Name: "telemetry-otlp-insecure",
Category: CategoryTelemetry,
Aliases: []string{"exporter-otlp-insecure"},
Usage: "Disable gRPC client transport security for OTLP exporter.",
EnvVars: []string{"TELEMETRY_OTLP_INSECURE", "EXPORTER_OTLP_INSECURE"},
}
TelemetryExporterStopTimeout = &cli.DurationFlag{
Name: "telemetry-exporter-stop-timeout",
Category: CategoryTelemetry,
Aliases: []string{"expoter-stop-timeout"},
Usage: "Timeout for stopping OTLP exporter.",
EnvVars: []string{"TELEMETRY_EXPORTER_STOP_TIMEOUT", "EXPORTER_STOP_TIMEOUT"},
Value: DefaultTelemetryStopTimeout,
}
TelemetryHost = &cli.BoolFlag{
Name: "telemetry-host",
Category: CategoryTelemetry,
Usage: "Export host metrics.",
EnvVars: []string{"TELEMETRY_HOST"},
}
TelemetryRuntime = &cli.BoolFlag{
Name: "telemetry-runtime",
Category: CategoryTelemetry,
Usage: "Export runtime metrics.",
EnvVars: []string{"TELEMETRY_RUNTIME"},
}
)

func ParseTelemetryFlags(ctx context.Context, c *cli.Context, serviceName, serviceInstanceID string) (opts []telemetry.MeterProviderOption, err error) {
const serviceNamespace = "varlog"

res, err := resource.New(ctx,
resource.WithFromEnv(),
resource.WithHost(),
resource.WithAttributes(
semconv.ServiceName(serviceName),
semconv.ServiceNamespace(serviceNamespace),
semconv.ServiceInstanceID(serviceInstanceID),
))
if err != nil {
return nil, err
}
opts = append(opts, telemetry.WithResource(res))

var exporter metricsdk.Exporter

switch strings.ToLower(c.String(TelemetryExporter.Name)) {
case TelemetryExporterStdout:
exporter, err = telemetry.NewStdoutExporter()
case TelemetryExporterOTLP:
var opts []otlpmetricgrpc.Option
if c.Bool(TelemetryOTLPInsecure.Name) {
opts = append(opts, otlpmetricgrpc.WithInsecure())
}
opts = append(opts, otlpmetricgrpc.WithEndpoint(c.String(TelemetryOTLPEndpoint.Name)))
exporter, err = telemetry.NewOLTPExporter(context.Background(), opts...)
case TelemetryExporterNOOP:
exporter = nil
}
if err != nil {
return nil, err
}

opts = append(opts, telemetry.WithExporter(exporter))

if c.Bool(TelemetryHost.Name) {
opts = append(opts, telemetry.WithHostInstrumentation())
}
if c.Bool(TelemetryRuntime.Name) {
opts = append(opts, telemetry.WithRuntimeInstrumentation())
}

return opts, nil
}
Loading

0 comments on commit 63355e9

Please sign in to comment.