Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#97433

95679: sql: remove extra type hint from `SHOW CREATE VIEW` r=rafiss a=e-mbrown

Resolves: cockroachdb#87886

Views created with user provided type hints were given
extra type annotations. The format functions for `AnnotateTypeExpr`
and `Array` were both annotating the expression. This commit alters
the format function of `AnnotateTypeExpr` so it doesnt duplicate
annotations for arrays.


Release note: None

95703: obsservice: support embedding in CRDB nodes r=andreimatei a=andreimatei

This patch adds a special value to the --obsservice-addr CRDB flag:
--obsservice-addr=embed means that an the respective node will run the
Obs Service RPC service internally and the events exporter will be
configured to connect to this internal service.

Release note: None
Fixes cockroachdb#88194 
Epic: CRDB-16791

97410: storage: write min version file with checkpoint r=RaduBerinde a=RaduBerinde

#### storage: better error for missing min version file

We require the min version filename to open a store. If it doesn't
exist, we assume the store doesn't exist which can lead to a confusing
message if just the min version file is missing. This commit makes
this message more useful.

Informs cockroachdb#97337

Release note: none
Epic: none

#### storage: write min version file with checkpoint

We now write out the min version file with checkpoints generated
through the storage layer. This allows checkpoints to be opened with
the cockroach storage layer (not just with the low level pebble tool).

Fixes cockroachdb#97337

Release note: None
Epic: none


97433: cli: remove double quotes from a test file name r=yuzefovich a=yuzefovich

This commit removes double quotes from a recently introduced test file since apparently windows doesn't like them which [breaks](https://github.com/cockroachdb/cockroach/actions/runs/4236059742) the bincheck run.

Epic: None

Release note: None

Co-authored-by: e-mbrown <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
5 people committed Feb 22, 2023
5 parents 3a532ac + e7019b0 + 13acc58 + 15351c0 + 9d9e84e commit 2431b4b
Show file tree
Hide file tree
Showing 19 changed files with 416 additions and 150 deletions.
4 changes: 4 additions & 0 deletions pkg/base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ const (

// MinRangeMaxBytes is the minimum value for range max bytes.
MinRangeMaxBytes = 64 << 10 // 64 KB

// ObsServiceEmbedFlagValue is the special value of the --obsservice-addr flag
// configuring the CRDB node to run the Obs Service internally.
ObsServiceEmbedFlagValue = "embed"
)
3 changes: 2 additions & 1 deletion pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1743,7 +1743,8 @@ commands, WARNING for client commands.`,
EnvVar: "",
Description: `Address of an OpenTelemetry OTLP sink such as the
Observability Service or the OpenTelemetry Collector. If set, telemetry
events are exported to this address.`,
events are exported to this address. The special value "embed" causes
the Cockroach node to run the Observability Service internally.`,
}

BuildTag = FlagInfo{
Expand Down
1 change: 1 addition & 0 deletions pkg/obs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/obs",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/obsservice/obspb",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
Expand Down
138 changes: 93 additions & 45 deletions pkg/obs/event_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/obsservice/obspb"
otel_collector_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1"
otel_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1"
Expand All @@ -34,15 +35,31 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

// EventsExporterInterface abstracts exporting events to the Observability Service. It is
// implemented by EventsExporter.
// EventsExporterInterface abstracts exporting events to the Observability
// Service. It is implemented by EventsExporter.
type EventsExporterInterface interface {
// Start starts the exporter, periodically flushing the buffered events to the
// sink.
Start(ctx context.Context, nodeInfo NodeInfo, stop *stop.Stopper) error
// SetNodeInfo initializes the node information that will be included in every
// batch of events that gets exported. This information is not passed to
// NewEventsExporter() to allow the EventsExporter to be constructed before
// the node ID is known.
SetNodeInfo(NodeInfo)

// SetDialer configures the dialer to be used when opening network connections.
SetDialer(dialer func(ctx context.Context, _ string) (net.Conn, error))

// Start starts the goroutine that will periodically flush the events to the
// configured sink.
//
// Flushes are triggered by the configured flush interval and by the buffer size
// threshold.
Start(context.Context, *stop.Stopper) error

// SendEvent buffers an event to be sent.
//
// SendEvent does not block. If the buffer is full, old events are dropped.
//
// SendEvent can be called before Start(). Such events will be buffered
// (within the buffering limits) and sent after Start() is eventually called.
SendEvent(ctx context.Context, typ obspb.EventType, event otel_logs_pb.LogRecord)
}

Expand All @@ -51,18 +68,39 @@ type NoopEventsExporter struct{}

var _ EventsExporterInterface = &NoopEventsExporter{}

func (d NoopEventsExporter) Start(
ctx context.Context, nodeInfo NodeInfo, stop *stop.Stopper,
) error {
// SetNodeInfo is part of the EventsExporterInterface.
func (nop NoopEventsExporter) SetNodeInfo(NodeInfo) {}

// Start is part of the EventsExporterInterface.
func (d NoopEventsExporter) Start(ctx context.Context, stop *stop.Stopper) error {
return nil
}

// SendEvent implements the EventsExporterInterface.
func (d NoopEventsExporter) SendEvent(
ctx context.Context, typ obspb.EventType, event otel_logs_pb.LogRecord,
// SetDialer is part of the EventsExporterInterface.
func (nop NoopEventsExporter) SetDialer(
dialer func(ctx context.Context, _ string) (net.Conn, error),
) {
}

// SendEvent is part of the EventsExporterInterface.
func (nop NoopEventsExporter) SendEvent(context.Context, obspb.EventType, otel_logs_pb.LogRecord) {}

// EventExporterTestingKnobs can be passed to Server to adjust flushing for the
// EventExporter.
type EventExporterTestingKnobs struct {
// FlushInterval, if set, overrides the default trigger interval for the
// EventExporter.
FlushInterval time.Duration
// FlushTriggerByteSize, if set, overrides the default trigger value for the
// EventExporter.
FlushTriggerByteSize uint64
}

var _ base.ModuleTestingKnobs = &EventExporterTestingKnobs{}

// ModuleTestingKnobs implements the ModuleTestingKnobs interface.
func (e *EventExporterTestingKnobs) ModuleTestingKnobs() {}

// EventsExporter is a buffered client for the OTLP logs gRPC service. It is
// used to export events to the Observability Service (possibly through an
// OpenTelemetry Collector).
Expand All @@ -78,6 +116,8 @@ type EventsExporter struct {
tr *tracing.Tracer
targetAddr string

dialer func(ctx context.Context, _ string) (net.Conn, error)

// flushInterval is the duration after which a flush is triggered.
// 0 disables this trigger.
flushInterval time.Duration
Expand All @@ -103,6 +143,19 @@ type EventsExporter struct {
conn *grpc.ClientConn
}

// ValidateOTLPTargetAddr validates the target address filling the possible
// missing port with the default.
func ValidateOTLPTargetAddr(targetAddr string) (string, error) {
otlpHost, otlpPort, err := addr.SplitHostPort(targetAddr, "4317" /* defaultPort */)
if err != nil {
return "", errors.Newf("invalid OTLP host in --obsservice-addr=%s", targetAddr)
}
if otlpHost == "" {
return "", errors.Newf("missing OTLP host in --obsservice-addr=%s", targetAddr)
}
return net.JoinHostPort(otlpHost, otlpPort), nil
}

// NewEventsExporter creates an EventsExporter.
//
// Start() needs to be called before the EventsExporter actually exports any
Expand Down Expand Up @@ -143,19 +196,7 @@ func NewEventsExporter(
triggerSizeBytes uint64,
maxBufferSizeBytes uint64,
memMonitor *mon.BytesMonitor,
) (*EventsExporter, error) {
otlpHost, otlpPort, err := addr.SplitHostPort(targetAddr, "4317" /* defaultPort */)
if err != nil {
return nil, errors.Newf("invalid OTLP host in --obsservice-addr=%s", targetAddr)
}
if otlpHost == "" {
return nil, errors.Newf("missing OTLP host in --obsservice-addr=%s", targetAddr)
}
if otlpPort == "" {
otlpPort = "4317"
}
targetAddr = net.JoinHostPort(otlpHost, otlpPort)

) *EventsExporter {
s := &EventsExporter{
clock: clock,
tr: tr,
Expand All @@ -174,8 +215,7 @@ func NewEventsExporter(
},
}
s.buf.mu.memAccount = memMonitor.MakeBoundAccount()

return s, nil
return s
}

// NodeInfo groups the information identifying a node that will be included in
Expand All @@ -189,19 +229,16 @@ type NodeInfo struct {
BinaryVersion string
}

// Start starts the goroutine that will periodically flush the events to the
// configured sink.
//
// Flushes are triggered by the configured flush interval and by the buffer size
// threshold.
//
// nodeInfo represents the node information that will be included in every batch
// of events that gets exported. This information is passed to Start() rather
// than to NewEventsExporter() to allow the EventsExporter to be constructed before
// SetDialer configures the dialer to be used when opening network connections.
func (s *EventsExporter) SetDialer(dialer func(ctx context.Context, _ string) (net.Conn, error)) {
s.dialer = dialer
}

// SetNodeInfo initializes the node information that will be included in every
// batch of events that gets exported. This information is not passed to
// NewEventsExporter() to allow the EventsExporter to be constructed before
// the node ID is known.
func (s *EventsExporter) Start(
ctx context.Context, nodeInfo NodeInfo, stopper *stop.Stopper,
) error {
func (s *EventsExporter) SetNodeInfo(nodeInfo NodeInfo) {
s.resource = otel_res_pb.Resource{
Attributes: []*otel_pb.KeyValue{
{
Expand All @@ -218,10 +255,21 @@ func (s *EventsExporter) Start(
},
},
}
}

// Note that Dial is non-blocking.
// Start starts the goroutine that will periodically flush the events to the
// configured sink.
//
// Flushes are triggered by the configured flush interval and by the buffer size
// threshold.
func (s *EventsExporter) Start(ctx context.Context, stopper *stop.Stopper) error {
// TODO(andrei): Add support for TLS / mutual TLS.
conn, err := grpc.Dial(s.targetAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
if s.dialer != nil {
opts = append(opts, grpc.WithContextDialer(s.dialer))
}
// Note that Dial is non-blocking.
conn, err := grpc.Dial(s.targetAddr, opts...)
if err != nil {
return err
}
Expand Down Expand Up @@ -410,14 +458,14 @@ var unrecognizedEventEveryN = log.Every(time.Minute)
var unrecognizedEventPayloadEveryN = log.Every(time.Minute)

// SendEvent buffers an event to be sent.
//
// SendEvent does not block. If the buffer is full, old events are dropped.
//
// SendEvent can be called before Start(). Such events will be buffered
// (within the buffering limits) and sent after Start() is eventually called.
func (s *EventsExporter) SendEvent(
ctx context.Context, typ obspb.EventType, event otel_logs_pb.LogRecord,
) {
// Return early if there's no sink configured.
if s.otelClient == nil {
return
}

// Make sure there's room for the new event. If there isn't, we'll drop
// events from the front of the buffer (the oldest), until there is room.
newEventSize, err := sizeOfEvent(event)
Expand Down
1 change: 1 addition & 0 deletions pkg/obsservice/obslib/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_test(
embed = [":ingest"],
deps = [
"//pkg/base",
"//pkg/obs",
"//pkg/obsservice/obslib/migrations",
"//pkg/obsservice/obspb",
"//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service",
Expand Down
Loading

0 comments on commit 2431b4b

Please sign in to comment.