From 577da11664d9b16ecd627654552c91dc079d36e0 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 21 Feb 2023 12:04:45 -0800 Subject: [PATCH 1/5] storage: better error for missing min version file We require the min version filename to open a store. If it doesn't exist, we assume the store doesn't exist which can lead to a confusing message if just the min version file is missing. This commit makes this message more useful. Informs #97337 Release note: none Epic: none --- pkg/storage/pebble.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 796f3f6f8b22..d6ae114bebcb 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1067,8 +1067,14 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { } opts.ErrorIfNotExists = true } else { - if opts.ErrorIfNotExists { - return nil, errors.Errorf("pebble: database %q does not exist", cfg.StorageConfig.Dir) + if opts.ErrorIfNotExists || opts.ReadOnly { + // Make sure the message is not confusing if the store does exist but + // there is no min version file. + filename := unencryptedFS.PathJoin(cfg.Dir, MinVersionFilename) + return nil, errors.Errorf( + "pebble: database %q does not exist (missing required file %q)", + cfg.StorageConfig.Dir, filename, + ) } // If there is no min version file, there should be no store. If there is // one, it's either 1) a store from a very old version (which we don't want From 15351c059256333372f25bca1f82e34715b4eea3 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 21 Feb 2023 12:06:12 -0800 Subject: [PATCH 2/5] storage: write min version file with checkpoint We now write out the min version file with checkpoints generated through the storage layer. This allows checkpoints to be opened with the cockroach storage layer (not just with the low level pebble tool). Informs #97337 Release note: None Epic: none --- pkg/storage/engine_test.go | 21 ++++++++++++++++----- pkg/storage/pebble.go | 14 ++++++++++++-- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index 1b418a58067e..199a3f2e3670 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -1004,15 +1004,26 @@ func TestCreateCheckpoint(t *testing.T) { assert.NoError(t, err) defer db.Close() - dir = filepath.Join(dir, "checkpoint") + checkpointDir := filepath.Join(dir, "checkpoint") assert.NoError(t, err) - assert.NoError(t, db.CreateCheckpoint(dir, nil)) - assert.DirExists(t, dir) - m, err := filepath.Glob(dir + "/*") + assert.NoError(t, db.CreateCheckpoint(checkpointDir, nil)) + assert.DirExists(t, checkpointDir) + m, err := filepath.Glob(checkpointDir + "/*") assert.NoError(t, err) assert.True(t, len(m) > 0) - if err := db.CreateCheckpoint(dir, nil); !testutils.IsError(err, "exists") { + + // Verify that we can open the checkpoint. + db2, err := Open( + context.Background(), + Filesystem(checkpointDir), + cluster.MakeTestingClusterSettings(), + MustExist) + require.NoError(t, err) + db2.Close() + + // Verify that creating another checkpoint in the same directory fails. + if err := db.CreateCheckpoint(checkpointDir, nil); !testutils.IsError(err, "exists") { t.Fatal(err) } } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index d6ae114bebcb..0630276ad323 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1950,11 +1950,21 @@ func (p *Pebble) CreateCheckpoint(dir string, spans []roachpb.Span) error { return err } + // Write out the min version file. + if err := writeMinVersionFile(p.unencryptedFS, dir, p.MinVersion()); err != nil { + return errors.Wrapf(err, "writing min version file for checkpoint") + } + // TODO(#90543, cockroachdb/pebble#2285): move spans info to Pebble manifest. if len(spans) > 0 { - return fs.SafeWriteToFile(p.fs, dir, p.fs.PathJoin(dir, "checkpoint.txt"), - checkpointSpansNote(spans)) + if err := fs.SafeWriteToFile( + p.fs, dir, p.fs.PathJoin(dir, "checkpoint.txt"), + checkpointSpansNote(spans), + ); err != nil { + return err + } } + return nil } From 9d9e84e07f8db8964c4d09b325bb03b1b5c7eabe Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 21 Feb 2023 16:23:43 -0800 Subject: [PATCH 3/5] cli: remove double quotes from a test file name This commit removes double quotes from a recently introduced test file since apparently windows doesn't like them which breaks the bincheck run. Epic: None Release note: None --- .../explain-bundle/bundle/stats-defaultdb.public.order.sql | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename "pkg/cli/testdata/explain-bundle/bundle/stats-defaultdb.public.\"order\".sql" => pkg/cli/testdata/explain-bundle/bundle/stats-defaultdb.public.order.sql (100%) diff --git "a/pkg/cli/testdata/explain-bundle/bundle/stats-defaultdb.public.\"order\".sql" b/pkg/cli/testdata/explain-bundle/bundle/stats-defaultdb.public.order.sql similarity index 100% rename from "pkg/cli/testdata/explain-bundle/bundle/stats-defaultdb.public.\"order\".sql" rename to pkg/cli/testdata/explain-bundle/bundle/stats-defaultdb.public.order.sql From 13acc589a0a35c40f71362e59da7befa80caa91a Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Sun, 22 Jan 2023 14:19:00 -0500 Subject: [PATCH 4/5] obsservice: support embedding in CRDB nodes This patch adds a special value to the --obsservice-addr CRDB flag: --obsservice-addr=embed means that an the respective node will run the Obs Service RPC service internally and the events exporter will be configured to connect to this internal service. Release note: None Fixes: CRDB-19741 Epic: CRDB-16791 --- pkg/base/constants.go | 4 + pkg/cli/cliflags/flags.go | 3 +- pkg/obs/BUILD.bazel | 1 + pkg/obs/event_exporter.go | 138 +++++++++++++------- pkg/obsservice/obslib/ingest/BUILD.bazel | 1 + pkg/obsservice/obslib/ingest/ingest_test.go | 132 +++++++++++-------- pkg/obsservice/obslib/ingest/main_test.go | 2 +- pkg/server/BUILD.bazel | 5 + pkg/server/initial_sql.go | 7 + pkg/server/server.go | 78 ++++++----- pkg/server/server_obs_service.go | 84 ++++++++++++ pkg/server/tenant.go | 22 ++-- pkg/server/testserver.go | 15 ++- pkg/testutils/lint/lint_test.go | 1 + 14 files changed, 352 insertions(+), 141 deletions(-) create mode 100644 pkg/server/server_obs_service.go diff --git a/pkg/base/constants.go b/pkg/base/constants.go index aee0ff8b6763..e25f6f9d01f1 100644 --- a/pkg/base/constants.go +++ b/pkg/base/constants.go @@ -52,4 +52,8 @@ const ( // MinRangeMaxBytes is the minimum value for range max bytes. MinRangeMaxBytes = 64 << 10 // 64 KB + + // ObsServiceEmbedFlagValue is the special value of the --obsservice-addr flag + // configuring the CRDB node to run the Obs Service internally. + ObsServiceEmbedFlagValue = "embed" ) diff --git a/pkg/cli/cliflags/flags.go b/pkg/cli/cliflags/flags.go index 3f84967b5727..e028b470214a 100644 --- a/pkg/cli/cliflags/flags.go +++ b/pkg/cli/cliflags/flags.go @@ -1743,7 +1743,8 @@ commands, WARNING for client commands.`, EnvVar: "", Description: `Address of an OpenTelemetry OTLP sink such as the Observability Service or the OpenTelemetry Collector. If set, telemetry -events are exported to this address.`, +events are exported to this address. The special value "embed" causes +the Cockroach node to run the Observability Service internally.`, } BuildTag = FlagInfo{ diff --git a/pkg/obs/BUILD.bazel b/pkg/obs/BUILD.bazel index a34ab7e04264..9449e021c242 100644 --- a/pkg/obs/BUILD.bazel +++ b/pkg/obs/BUILD.bazel @@ -10,6 +10,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/obs", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/obsservice/obspb", "//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service", "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", diff --git a/pkg/obs/event_exporter.go b/pkg/obs/event_exporter.go index 99750b09ce41..714c0c06711c 100644 --- a/pkg/obs/event_exporter.go +++ b/pkg/obs/event_exporter.go @@ -15,6 +15,7 @@ import ( "net" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" otel_collector_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1" otel_pb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/common/v1" @@ -34,15 +35,31 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -// EventsExporterInterface abstracts exporting events to the Observability Service. It is -// implemented by EventsExporter. +// EventsExporterInterface abstracts exporting events to the Observability +// Service. It is implemented by EventsExporter. type EventsExporterInterface interface { - // Start starts the exporter, periodically flushing the buffered events to the - // sink. - Start(ctx context.Context, nodeInfo NodeInfo, stop *stop.Stopper) error + // SetNodeInfo initializes the node information that will be included in every + // batch of events that gets exported. This information is not passed to + // NewEventsExporter() to allow the EventsExporter to be constructed before + // the node ID is known. + SetNodeInfo(NodeInfo) + + // SetDialer configures the dialer to be used when opening network connections. + SetDialer(dialer func(ctx context.Context, _ string) (net.Conn, error)) + + // Start starts the goroutine that will periodically flush the events to the + // configured sink. + // + // Flushes are triggered by the configured flush interval and by the buffer size + // threshold. + Start(context.Context, *stop.Stopper) error + // SendEvent buffers an event to be sent. // // SendEvent does not block. If the buffer is full, old events are dropped. + // + // SendEvent can be called before Start(). Such events will be buffered + // (within the buffering limits) and sent after Start() is eventually called. SendEvent(ctx context.Context, typ obspb.EventType, event otel_logs_pb.LogRecord) } @@ -51,18 +68,39 @@ type NoopEventsExporter struct{} var _ EventsExporterInterface = &NoopEventsExporter{} -func (d NoopEventsExporter) Start( - ctx context.Context, nodeInfo NodeInfo, stop *stop.Stopper, -) error { +// SetNodeInfo is part of the EventsExporterInterface. +func (nop NoopEventsExporter) SetNodeInfo(NodeInfo) {} + +// Start is part of the EventsExporterInterface. +func (d NoopEventsExporter) Start(ctx context.Context, stop *stop.Stopper) error { return nil } -// SendEvent implements the EventsExporterInterface. -func (d NoopEventsExporter) SendEvent( - ctx context.Context, typ obspb.EventType, event otel_logs_pb.LogRecord, +// SetDialer is part of the EventsExporterInterface. +func (nop NoopEventsExporter) SetDialer( + dialer func(ctx context.Context, _ string) (net.Conn, error), ) { } +// SendEvent is part of the EventsExporterInterface. +func (nop NoopEventsExporter) SendEvent(context.Context, obspb.EventType, otel_logs_pb.LogRecord) {} + +// EventExporterTestingKnobs can be passed to Server to adjust flushing for the +// EventExporter. +type EventExporterTestingKnobs struct { + // FlushInterval, if set, overrides the default trigger interval for the + // EventExporter. + FlushInterval time.Duration + // FlushTriggerByteSize, if set, overrides the default trigger value for the + // EventExporter. + FlushTriggerByteSize uint64 +} + +var _ base.ModuleTestingKnobs = &EventExporterTestingKnobs{} + +// ModuleTestingKnobs implements the ModuleTestingKnobs interface. +func (e *EventExporterTestingKnobs) ModuleTestingKnobs() {} + // EventsExporter is a buffered client for the OTLP logs gRPC service. It is // used to export events to the Observability Service (possibly through an // OpenTelemetry Collector). @@ -78,6 +116,8 @@ type EventsExporter struct { tr *tracing.Tracer targetAddr string + dialer func(ctx context.Context, _ string) (net.Conn, error) + // flushInterval is the duration after which a flush is triggered. // 0 disables this trigger. flushInterval time.Duration @@ -103,6 +143,19 @@ type EventsExporter struct { conn *grpc.ClientConn } +// ValidateOTLPTargetAddr validates the target address filling the possible +// missing port with the default. +func ValidateOTLPTargetAddr(targetAddr string) (string, error) { + otlpHost, otlpPort, err := addr.SplitHostPort(targetAddr, "4317" /* defaultPort */) + if err != nil { + return "", errors.Newf("invalid OTLP host in --obsservice-addr=%s", targetAddr) + } + if otlpHost == "" { + return "", errors.Newf("missing OTLP host in --obsservice-addr=%s", targetAddr) + } + return net.JoinHostPort(otlpHost, otlpPort), nil +} + // NewEventsExporter creates an EventsExporter. // // Start() needs to be called before the EventsExporter actually exports any @@ -143,19 +196,7 @@ func NewEventsExporter( triggerSizeBytes uint64, maxBufferSizeBytes uint64, memMonitor *mon.BytesMonitor, -) (*EventsExporter, error) { - otlpHost, otlpPort, err := addr.SplitHostPort(targetAddr, "4317" /* defaultPort */) - if err != nil { - return nil, errors.Newf("invalid OTLP host in --obsservice-addr=%s", targetAddr) - } - if otlpHost == "" { - return nil, errors.Newf("missing OTLP host in --obsservice-addr=%s", targetAddr) - } - if otlpPort == "" { - otlpPort = "4317" - } - targetAddr = net.JoinHostPort(otlpHost, otlpPort) - +) *EventsExporter { s := &EventsExporter{ clock: clock, tr: tr, @@ -174,8 +215,7 @@ func NewEventsExporter( }, } s.buf.mu.memAccount = memMonitor.MakeBoundAccount() - - return s, nil + return s } // NodeInfo groups the information identifying a node that will be included in @@ -189,19 +229,16 @@ type NodeInfo struct { BinaryVersion string } -// Start starts the goroutine that will periodically flush the events to the -// configured sink. -// -// Flushes are triggered by the configured flush interval and by the buffer size -// threshold. -// -// nodeInfo represents the node information that will be included in every batch -// of events that gets exported. This information is passed to Start() rather -// than to NewEventsExporter() to allow the EventsExporter to be constructed before +// SetDialer configures the dialer to be used when opening network connections. +func (s *EventsExporter) SetDialer(dialer func(ctx context.Context, _ string) (net.Conn, error)) { + s.dialer = dialer +} + +// SetNodeInfo initializes the node information that will be included in every +// batch of events that gets exported. This information is not passed to +// NewEventsExporter() to allow the EventsExporter to be constructed before // the node ID is known. -func (s *EventsExporter) Start( - ctx context.Context, nodeInfo NodeInfo, stopper *stop.Stopper, -) error { +func (s *EventsExporter) SetNodeInfo(nodeInfo NodeInfo) { s.resource = otel_res_pb.Resource{ Attributes: []*otel_pb.KeyValue{ { @@ -218,10 +255,21 @@ func (s *EventsExporter) Start( }, }, } +} - // Note that Dial is non-blocking. +// Start starts the goroutine that will periodically flush the events to the +// configured sink. +// +// Flushes are triggered by the configured flush interval and by the buffer size +// threshold. +func (s *EventsExporter) Start(ctx context.Context, stopper *stop.Stopper) error { // TODO(andrei): Add support for TLS / mutual TLS. - conn, err := grpc.Dial(s.targetAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + if s.dialer != nil { + opts = append(opts, grpc.WithContextDialer(s.dialer)) + } + // Note that Dial is non-blocking. + conn, err := grpc.Dial(s.targetAddr, opts...) if err != nil { return err } @@ -410,14 +458,14 @@ var unrecognizedEventEveryN = log.Every(time.Minute) var unrecognizedEventPayloadEveryN = log.Every(time.Minute) // SendEvent buffers an event to be sent. +// +// SendEvent does not block. If the buffer is full, old events are dropped. +// +// SendEvent can be called before Start(). Such events will be buffered +// (within the buffering limits) and sent after Start() is eventually called. func (s *EventsExporter) SendEvent( ctx context.Context, typ obspb.EventType, event otel_logs_pb.LogRecord, ) { - // Return early if there's no sink configured. - if s.otelClient == nil { - return - } - // Make sure there's room for the new event. If there isn't, we'll drop // events from the front of the buffer (the oldest), until there is room. newEventSize, err := sizeOfEvent(event) diff --git a/pkg/obsservice/obslib/ingest/BUILD.bazel b/pkg/obsservice/obslib/ingest/BUILD.bazel index 167d49286d9a..91d88b9c8908 100644 --- a/pkg/obsservice/obslib/ingest/BUILD.bazel +++ b/pkg/obsservice/obslib/ingest/BUILD.bazel @@ -29,6 +29,7 @@ go_test( embed = [":ingest"], deps = [ "//pkg/base", + "//pkg/obs", "//pkg/obsservice/obslib/migrations", "//pkg/obsservice/obspb", "//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service", diff --git a/pkg/obsservice/obslib/ingest/ingest_test.go b/pkg/obsservice/obslib/ingest/ingest_test.go index 5a79ad2c9366..b0888e4ce757 100644 --- a/pkg/obsservice/obslib/ingest/ingest_test.go +++ b/pkg/obsservice/obslib/ingest/ingest_test.go @@ -10,12 +10,14 @@ package ingest import ( "context" + gosql "database/sql" "net" "net/url" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/obs" "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/migrations" "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1" @@ -132,61 +134,89 @@ func TestEventIngestionIntegration(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - // Allocate a port for the ingestion service to work around a circular - // dependency: CRDB needs to be told what the port is, but we can only create - // the event ingester after having started CRDB (because the ingester wants a - // reference to CRDB). - otlpListener, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - defer func() { - _ = otlpListener.Close() - }() + testutils.RunTrueAndFalse(t, "embed", func(t *testing.T, embed bool) { + var obsAddr string - s, sqlDB, _ := serverutils.StartServer(t, - base.TestServerArgs{ - ObsServiceAddr: otlpListener.Addr().String(), - }, - ) - defer s.Stopper().Stop(ctx) - pgURL, cleanupFunc := sqlutils.PGUrl( - t, s.ServingSQLAddr(), - "TestPersistEvents", url.User(username.RootUser), - ) - defer cleanupFunc() + var s serverutils.TestServerInterface + var sqlDB *gosql.DB + if !embed { + // Allocate a port for the ingestion service to work around a circular + // dependency: CRDB needs to be told what the port is, but we can only create + // the event ingester after having started CRDB (because the ingester wants a + // reference to CRDB). + otlpListener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer func() { + _ = otlpListener.Close() + }() + obsAddr = otlpListener.Addr().String() + s, sqlDB, _ = serverutils.StartServer(t, + base.TestServerArgs{ + ObsServiceAddr: obsAddr, + Knobs: base.TestingKnobs{ + EventExporter: &obs.EventExporterTestingKnobs{ + // Flush every message. + FlushTriggerByteSize: 1, + }, + }, + }, + ) + defer s.Stopper().Stop(ctx) - config, err := pgxpool.ParseConfig(pgURL.String()) - require.NoError(t, err) - config.ConnConfig.Database = "defaultdb" - pool, err := pgxpool.ConnectConfig(ctx, config) - require.NoError(t, err) - defer pool.Close() - require.NoError(t, migrations.RunDBMigrations(ctx, config.ConnConfig)) + pgURL, cleanupFunc := sqlutils.PGUrl( + t, s.ServingSQLAddr(), "TestPersistEvents", url.User(username.RootUser), + ) + defer cleanupFunc() - // Start the ingestion in the background. - obsStop := stop.NewStopper() - defer obsStop.Stop(ctx) - e, err := MakeEventIngester(ctx, config) - require.NoError(t, err) - defer e.Close() - grpcServer := grpc.NewServer() - defer grpcServer.Stop() - logspb.RegisterLogsServiceServer(grpcServer, &e) - go func() { - _ = grpcServer.Serve(otlpListener) - }() - - // Perform a schema change and check that we get an event. - _, err = sqlDB.Exec("create table t()") - require.NoError(t, err) + config, err := pgxpool.ParseConfig(pgURL.String()) + require.NoError(t, err) + config.ConnConfig.Database = "crdb_observability" + pool, err := pgxpool.ConnectConfig(ctx, config) + require.NoError(t, err) + defer pool.Close() + require.NoError(t, migrations.RunDBMigrations(ctx, config.ConnConfig)) - // Wait for an event to be ingested. - testutils.SucceedsSoon(t, func() error { - r := pool.QueryRow(ctx, "select count(1) from cluster_events where event_type='create_table'") - var count int - require.NoError(t, r.Scan(&count)) - if count < 1 { - return errors.Newf("no events yet") + // Start the ingestion in the background. + obsStop := stop.NewStopper() + defer obsStop.Stop(ctx) + e, err := MakeEventIngester(ctx, config) + require.NoError(t, err) + defer e.Close() + grpcServer := grpc.NewServer() + defer grpcServer.Stop() + logspb.RegisterLogsServiceServer(grpcServer, &e) + go func() { + _ = grpcServer.Serve(otlpListener) + }() + } else { + s, sqlDB, _ = serverutils.StartServer(t, + base.TestServerArgs{ + ObsServiceAddr: base.ObsServiceEmbedFlagValue, + Knobs: base.TestingKnobs{ + EventExporter: &obs.EventExporterTestingKnobs{ + // Flush every message. + FlushTriggerByteSize: 1, + }, + }, + }, + ) + defer s.Stopper().Stop(ctx) } - return nil + + // Perform a schema change and check that we get an event. + _, err := sqlDB.Exec("create table t()") + require.NoError(t, err) + + // Wait for an event to be ingested. + testutils.SucceedsSoon(t, func() error { + r := sqlDB.QueryRow("SELECT count(*) FROM crdb_observability.cluster_events WHERE event_type='create_table'") + var count int + require.NoError(t, r.Scan(&count)) + if count < 1 { + return errors.Newf("no events yet") + } + return nil + }) }) + } diff --git a/pkg/obsservice/obslib/ingest/main_test.go b/pkg/obsservice/obslib/ingest/main_test.go index dd64ce03af4e..4aa0f8075a11 100644 --- a/pkg/obsservice/obslib/ingest/main_test.go +++ b/pkg/obsservice/obslib/ingest/main_test.go @@ -6,7 +6,7 @@ // // http://www.apache.org/licenses/LICENSE-2.0 -package ingest +package ingest_test import ( "os" diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 20110d123001..425475030993 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -57,6 +57,7 @@ go_library( "server_controller_orchestration.go", "server_controller_sql.go", "server_http.go", + "server_obs_service.go", "server_sql.go", "server_systemlog_gc.go", "session_writer.go", @@ -144,6 +145,9 @@ go_library( "//pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher", "//pkg/multitenant/tenantcostmodel", "//pkg/obs", + "//pkg/obsservice/obslib/ingest", + "//pkg/obsservice/obslib/migrations", + "//pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1:logs_service", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", @@ -321,6 +325,7 @@ go_library( "@com_github_gorilla_mux//:mux", "@com_github_grpc_ecosystem_grpc_gateway//runtime:go_default_library", "@com_github_grpc_ecosystem_grpc_gateway//utilities:go_default_library", + "@com_github_jackc_pgx_v4//pgxpool", "@com_github_marusama_semaphore//:semaphore", "@com_github_nightlyone_lockfile//:lockfile", "@com_github_nytimes_gziphandler//:gziphandler", diff --git a/pkg/server/initial_sql.go b/pkg/server/initial_sql.go index 45c8d7629060..86fdf05516bd 100644 --- a/pkg/server/initial_sql.go +++ b/pkg/server/initial_sql.go @@ -14,6 +14,7 @@ import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -30,6 +31,12 @@ import ( func (s *Server) RunInitialSQL( ctx context.Context, startSingleNode bool, adminUser, adminPassword string, ) error { + if s.cfg.ObsServiceAddr == base.ObsServiceEmbedFlagValue { + if err := s.startEmbeddedObsService(ctx); err != nil { + return err + } + } + newCluster := s.InitialStart() && s.NodeID() == kvstorage.FirstNodeID if !newCluster { // The initial SQL code only runs the first time the cluster is initialized. diff --git a/pkg/server/server.go b/pkg/server/server.go index d5382c00f7f5..f9722f990deb 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -178,7 +178,7 @@ type Server struct { tenantCapabilitiesWatcher tenantcapabilities.Watcher - // pgL is the SQL listener. + // pgL is the SQL listener for pgwire connections coming over the network. pgL net.Listener // loopbackPgL is the SQL listener for internal pgwire connections. loopbackPgL *netutil.LoopbackListener @@ -920,26 +920,51 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { }) registry.AddMetricStruct(kvProber.Metrics()) - // Create the Obs Server. We'll start it later, once we know our node ID. + flushInterval := 5 * time.Second + flushTriggerBytesSize := uint64(1 << 20) // 1MB + if cfg.TestingKnobs.EventExporter != nil { + knobs := cfg.TestingKnobs.EventExporter.(*obs.EventExporterTestingKnobs) + if knobs.FlushInterval != time.Duration(0) { + flushInterval = knobs.FlushInterval + } + if knobs.FlushTriggerByteSize != 0 { + flushTriggerBytesSize = knobs.FlushTriggerByteSize + } + } + + // Create the EventExporter, which will export events to the Obs Service. + // We'll start it later, once we know our node ID. var eventsExporter obs.EventsExporterInterface if cfg.ObsServiceAddr != "" { - ee, err := obs.NewEventsExporter( - cfg.ObsServiceAddr, - timeutil.DefaultTimeSource{}, - cfg.Tracer, - 5*time.Second, // maxStaleness - 1<<20, // triggerSizeBytes - 1MB - 10*1<<20, // maxBufferSizeBytes - 10MB - sqlMonitorAndMetrics.rootSQLMemoryMonitor, // memMonitor - this is not "SQL" usage, but we don't have another memory pool - ) - if err != nil { - log.Errorf(ctx, "failed to create events exporter: %s", err) + if cfg.ObsServiceAddr == base.ObsServiceEmbedFlagValue { + ee := obs.NewEventsExporter( + "", // targetAddr - we'll configure a custom dialer connecting to the local node later + timeutil.DefaultTimeSource{}, + cfg.Tracer, + flushInterval, + flushTriggerBytesSize, + 10*1<<20, // maxBufferSizeBytes - 10MB + sqlMonitorAndMetrics.rootSQLMemoryMonitor, // memMonitor - this is not "SQL" usage, but we don't have another memory pool + ) + eventsExporter = ee } else { + targetAddr, err := obs.ValidateOTLPTargetAddr(cfg.ObsServiceAddr) + if err != nil { + return nil, err + } + ee := obs.NewEventsExporter( + targetAddr, + timeutil.DefaultTimeSource{}, + cfg.Tracer, + flushInterval, + flushTriggerBytesSize, + 10*1<<20, // maxBufferSizeBytes - 10MB + sqlMonitorAndMetrics.rootSQLMemoryMonitor, // memMonitor - this is not "SQL" usage, but we don't have another memory pool + ) log.Infof(ctx, "will export events over OTLP to: %s", cfg.ObsServiceAddr) eventsExporter = ee } - } - if eventsExporter == nil { + } else { eventsExporter = &obs.NoopEventsExporter{} } @@ -1227,20 +1252,6 @@ func (li listenerInfo) Iter() map[string]string { } } -// Start calls PreStart() and AcceptClient() in sequence. -// This is suitable for use e.g. in tests. -// This mirrors the implementation of (*SQLServerWrapper).Start. -// TODO(knz): Find a way to implement this method only once for both. -func (s *Server) Start(ctx context.Context) error { - if err := s.PreStart(ctx); err != nil { - return err - } - if err := s.AcceptInternalClients(ctx); err != nil { - return err - } - return s.AcceptClients(ctx) -} - // PreStart starts the server on the specified port, starts gossip and // initializes the node using the engines from the server's context. // @@ -1892,12 +1903,15 @@ func (s *Server) PreStart(ctx context.Context) error { s.startDiagnostics(workersCtx) } - if err := s.eventsExporter.Start(ctx, obs.NodeInfo{ + s.eventsExporter.SetNodeInfo(obs.NodeInfo{ ClusterID: state.clusterID, NodeID: int32(state.nodeID), BinaryVersion: build.BinaryVersion(), - }, s.stopper); err != nil { - return errors.Wrap(err, "failed to start the event exporter") + }) + if s.cfg.ObsServiceAddr != base.ObsServiceEmbedFlagValue { + if err := s.eventsExporter.Start(ctx, s.stopper); err != nil { + return errors.Wrapf(err, "failed to start events exporter") + } } // Connect the engines to the disk stats map constructor. diff --git a/pkg/server/server_obs_service.go b/pkg/server/server_obs_service.go new file mode 100644 index 000000000000..57de217c42ff --- /dev/null +++ b/pkg/server/server_obs_service.go @@ -0,0 +1,84 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "net" + + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/ingest" + "github.com/cockroachdb/cockroach/pkg/obsservice/obslib/migrations" + logspb "github.com/cockroachdb/cockroach/pkg/obsservice/obspb/opentelemetry-proto/collector/logs/v1" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/netutil" + "github.com/jackc/pgx/v4/pgxpool" + "google.golang.org/grpc" +) + +// startEmbeddedObsService creates the schema for the Observability Service (if +// it doesn't exist already), starts the internal RPC service for event +// ingestion and hooks up the event exporter to talk to the local service. +func (s *Server) startEmbeddedObsService(ctx context.Context) error { + // Create the Obs Service schema. + loopbackConfig, err := pgxpool.ParseConfig("") + if err != nil { + return err + } + loopbackConfig.ConnConfig.User = "root" + loopbackConfig.ConnConfig.Database = "crdb_observability" + loopbackConfig.ConnConfig.DialFunc = func(ctx context.Context, network, addr string) (net.Conn, error) { + return s.loopbackPgL.Connect(ctx) + } + log.Infof(ctx, "running migrations for embedded Observability Service") + if err := migrations.RunDBMigrations(ctx, loopbackConfig.ConnConfig); err != nil { + return err + } + + // Create the internal ingester RPC server. + embeddedObsSvc, err := ingest.MakeEventIngester(ctx, loopbackConfig) + if err != nil { + return err + } + // We'll use an RPC server serving on a "loopback" interface implemented with + // in-memory pipes. + grpcServer := grpc.NewServer() + logspb.RegisterLogsServiceServer(grpcServer, &embeddedObsSvc) + rpcLoopbackL := netutil.NewLoopbackListener(ctx, s.stopper) + if err := s.stopper.RunAsyncTask( + ctx, "obssvc-loopback-quiesce", func(ctx context.Context) { + <-s.stopper.ShouldQuiesce() + grpcServer.Stop() + embeddedObsSvc.Close() + }, + ); err != nil { + embeddedObsSvc.Close() + return err + } + if err := s.stopper.RunAsyncTask( + ctx, "obssvc-listener", func(ctx context.Context) { + netutil.FatalIfUnexpected(grpcServer.Serve(rpcLoopbackL)) + }); err != nil { + return err + } + + // Now that the ingester is listening for RPCs, we can hook up the exporter to + // it and start the exporter. Note that in the non-embedded case, Start() has + // already been called. + s.eventsExporter.SetDialer(func(ctx context.Context, _ string) (net.Conn, error) { + conn, err := rpcLoopbackL.Connect(ctx) + return conn, err + }) + log.Infof(ctx, "starting event exporting talking to local event ingester") + if err := s.eventsExporter.Start(ctx, s.stopper); err != nil { + return err + } + return nil +} diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 760ac16e36ec..aa9171dc88fa 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -713,11 +713,12 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error { if instanceID == 0 { log.Fatalf(ctx, "expected SQLInstanceID to be initialized after preStart") } - if err := s.eventsExporter.Start(ctx, obs.NodeInfo{ + s.eventsExporter.SetNodeInfo(obs.NodeInfo{ ClusterID: clusterID, NodeID: int32(instanceID), BinaryVersion: build.BinaryVersion(), - }, s.stopper); err != nil { + }) + if err := s.eventsExporter.Start(ctx, s.stopper); err != nil { return errors.Wrap(err, "failed to start the event exporter") } @@ -1099,7 +1100,13 @@ func makeTenantSQLServerArgs( // cluster ID is known, with a Start() call. var eventsExporter obs.EventsExporterInterface if baseCfg.ObsServiceAddr != "" { - ee, err := obs.NewEventsExporter( + if baseCfg.ObsServiceAddr == base.ObsServiceEmbedFlagValue { + // TODO(andrei): Add support for this option for tenants - at least for + // shared-process tenants where the event exporting should be hooked up to + // the ingester running in the host process. + return sqlServerArgs{}, errors.New("--obsservice-addr=embed is not currently supported for tenants") + } + ee := obs.NewEventsExporter( baseCfg.ObsServiceAddr, timeutil.DefaultTimeSource{}, baseCfg.Tracer, @@ -1108,13 +1115,8 @@ func makeTenantSQLServerArgs( 10*1<<20, // maxBufferSizeBytes - 10MB monitorAndMetrics.rootSQLMemoryMonitor, // memMonitor - this is not "SQL" usage, but we don't have another memory pool ) - if err != nil { - log.Errorf(startupCtx, "failed to create events exporter: %s", err) - } else { - eventsExporter = ee - } - } - if eventsExporter == nil { + eventsExporter = ee + } else { eventsExporter = &obs.NoopEventsExporter{} } diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index dddcf7f1310a..544c38fa1885 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -598,9 +598,22 @@ func (ts *TestServer) maybeStartDefaultTestTenant(ctx context.Context) error { // Use TestServer.Stopper().Stop() to shutdown the server after the test // completes. func (ts *TestServer) Start(ctx context.Context) error { - if err := ts.Server.Start(ctx); err != nil { + if err := ts.Server.PreStart(ctx); err != nil { return err } + if err := ts.Server.AcceptInternalClients(ctx); err != nil { + return err + } + // In tests we need some, but not all of RunInitialSQL functionality. + if err := ts.Server.RunInitialSQL( + ctx, false /* startSingleNode */, "" /* adminUser */, "", /* adminPassword */ + ); err != nil { + return err + } + if err := ts.Server.AcceptClients(ctx); err != nil { + return err + } + if err := ts.maybeStartDefaultTestTenant(ctx); err != nil { // We're failing the call to this function but we've already started // the TestServer above. Stop it here to avoid leaking the server. diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 512f445dfa8d..8c93b93098ad 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -924,6 +924,7 @@ func TestLint(t *testing.T) { ":!rpc/context.go", ":!rpc/nodedialer/nodedialer_test.go", ":!util/grpcutil/grpc_util_test.go", + ":!server/server_obs_service.go", ":!server/testserver.go", ":!util/tracing/*_test.go", ":!ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go", From e7019b0a4225970ca224452b9a6975535a7b8b7d Mon Sep 17 00:00:00 2001 From: e-mbrown Date: Tue, 10 Jan 2023 14:16:45 +0700 Subject: [PATCH 5/5] sql: remove extra type hint from `SHOW CREATE VIEW` Views created with user provided type hints were given extra type annotations. The format functions for `AnnotateTypeExpr` and `Array` were both annotating the expression. This commit alters the format function of `AnnotateTypeExpr` so it doesnt duplicate annotations for arrays. Release note: None --- pkg/sql/logictest/testdata/logic_test/views | 21 +++++++++++++++++++++ pkg/sql/sem/tree/expr.go | 7 +++++++ 2 files changed, 28 insertions(+) diff --git a/pkg/sql/logictest/testdata/logic_test/views b/pkg/sql/logictest/testdata/logic_test/views index 37e710c2d264..8c35a3ad6736 100644 --- a/pkg/sql/logictest/testdata/logic_test/views +++ b/pkg/sql/logictest/testdata/logic_test/views @@ -140,6 +140,27 @@ CREATE DATABASE test2 statement ok SET DATABASE = test2 +statement ok +CREATE VIEW t1 AS SELECT ARRAY[]:::STRING[]; +CREATE VIEW t2 AS SELECT ARRAY[1,2,3]:::INT[]; + + +query TT colnames +SHOW CREATE VIEW t1 +---- +table_name create_statement +t1 CREATE VIEW public.t1 ( + "array" + ) AS SELECT ARRAY[]:::STRING[] + +query TT colnames +SHOW CREATE VIEW t2 +---- +table_name create_statement +t2 CREATE VIEW public.t2 ( + "array" + ) AS SELECT ARRAY[1:::INT8, 2:::INT8, 3:::INT8]:::INT8[] + query II colnames,rowsort SELECT * FROM test.v1 ---- diff --git a/pkg/sql/sem/tree/expr.go b/pkg/sql/sem/tree/expr.go index eaecbf0ce31f..c8d56c0cfec6 100644 --- a/pkg/sql/sem/tree/expr.go +++ b/pkg/sql/sem/tree/expr.go @@ -1595,6 +1595,13 @@ func (node *AnnotateTypeExpr) Format(ctx *FmtCtx) { switch node.SyntaxMode { case AnnotateShort: exprFmtWithParen(ctx, node.Expr) + // The Array format function handles adding type annotations for this case. + // We short circuit here to prevent double type annotation. + if arrayExpr, ok := node.Expr.(*Array); ok { + if ctx.HasFlags(FmtParsable) && arrayExpr.typ != nil { + return + } + } ctx.WriteString(":::") ctx.FormatTypeReference(node.Type)