diff --git a/exporter/chronicleexporter/exporter.go b/exporter/chronicleexporter/exporter.go new file mode 100644 index 000000000..b70cee5cf --- /dev/null +++ b/exporter/chronicleexporter/exporter.go @@ -0,0 +1,328 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "bytes" + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "net/http" + "os" + "sync" + "time" + + "github.com/google/uuid" + "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/oauth" + grpcgzip "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" +) + +const ( + grpcScope = "https://www.googleapis.com/auth/malachite-ingestion" + httpScope = "https://www.googleapis.com/auth/cloud-platform" + + baseEndpoint = "malachiteingestion-pa.googleapis.com" +) + +type chronicleExporter struct { + cfg *Config + logger *zap.Logger + marshaler logMarshaler + collectorID, exporterID string + + // fields used for gRPC + grpcClient api.IngestionServiceV2Client + grpcConn *grpc.ClientConn + wg sync.WaitGroup + cancel context.CancelFunc + metrics *exporterMetrics + + // fields used for HTTP + httpClient *http.Client +} + +func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID string) (*chronicleExporter, error) { + customerID, err := uuid.Parse(cfg.CustomerID) + if err != nil { + return nil, fmt.Errorf("parse customer ID: %w", err) + } + + marshaller, err := newProtoMarshaler(*cfg, params.TelemetrySettings, customerID[:]) + if err != nil { + return nil, fmt.Errorf("create proto marshaller: %w", err) + } + + uuidCID, err := uuid.Parse(collectorID) + if err != nil { + return nil, fmt.Errorf("parse collector ID: %w", err) + } + + return &chronicleExporter{ + cfg: cfg, + logger: params.Logger, + metrics: newExporterMetrics(uuidCID[:], customerID[:], exporterID, cfg.Namespace), + marshaler: marshaller, + collectorID: collectorID, + exporterID: exporterID, + }, nil +} + +func (ce *chronicleExporter) Start(_ context.Context, _ component.Host) error { + creds, err := loadGoogleCredentials(ce.cfg) + if err != nil { + return fmt.Errorf("load Google credentials: %w", err) + } + + if ce.cfg.Protocol == protocolHTTPS { + ce.httpClient = oauth2.NewClient(context.Background(), creds.TokenSource) + return nil + } + + opts := []grpc.DialOption{ + // Apply OAuth tokens for each RPC call + grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: creds.TokenSource}), + grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), + } + conn, err := grpc.NewClient(ce.cfg.Endpoint+":443", opts...) + if err != nil { + return fmt.Errorf("dial: %w", err) + } + ce.grpcConn = conn + ce.grpcClient = api.NewIngestionServiceV2Client(conn) + + if ce.cfg.CollectAgentMetrics { + ctx, cancel := context.WithCancel(context.Background()) + ce.cancel = cancel + ce.wg.Add(1) + go ce.startHostMetricsCollection(ctx) + } + + return nil +} + +func (ce *chronicleExporter) Shutdown(context.Context) error { + defer http.DefaultTransport.(*http.Transport).CloseIdleConnections() + if ce.cfg.Protocol == protocolHTTPS { + t := ce.httpClient.Transport.(*oauth2.Transport) + if t.Base != nil { + t.Base.(*http.Transport).CloseIdleConnections() + } + return nil + } + if ce.cancel != nil { + ce.cancel() + ce.wg.Wait() + } + if ce.grpcConn != nil { + if err := ce.grpcConn.Close(); err != nil { + return fmt.Errorf("connection close: %s", err) + } + } + return nil +} + +func (ce *chronicleExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func loadGoogleCredentials(cfg *Config) (*google.Credentials, error) { + scope := grpcScope + if cfg.Protocol == protocolHTTPS { + scope = httpScope + } + + switch { + case cfg.Creds != "": + return google.CredentialsFromJSON(context.Background(), []byte(cfg.Creds), scope) + case cfg.CredsFilePath != "": + credsData, err := os.ReadFile(cfg.CredsFilePath) + if err != nil { + return nil, fmt.Errorf("read credentials file: %w", err) + } + + if len(credsData) == 0 { + return nil, errors.New("credentials file is empty") + } + + return google.CredentialsFromJSON(context.Background(), credsData, scope) + default: + return google.FindDefaultCredentials(context.Background(), scope) + } +} + +func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) error { + payloads, err := ce.marshaler.MarshalRawLogs(ctx, ld) + if err != nil { + return fmt.Errorf("marshal logs: %w", err) + } + + for _, payload := range payloads { + if err := ce.uploadToChronicle(ctx, payload); err != nil { + return fmt.Errorf("upload to chronicle: %w", err) + } + } + + return nil +} + +func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api.BatchCreateLogsRequest) error { + totalLogs := int64(len(request.GetBatch().GetEntries())) + + _, err := ce.grpcClient.BatchCreateLogs(ctx, request, ce.buildOptions()...) + if err != nil { + errCode := status.Code(err) + switch errCode { + // These errors are potentially transient + case codes.Canceled, + codes.Unavailable, + codes.DeadlineExceeded, + codes.ResourceExhausted, + codes.Aborted: + return fmt.Errorf("upload logs to chronicle: %w", err) + default: + return consumererror.NewPermanent(fmt.Errorf("upload logs to chronicle: %w", err)) + } + } + + ce.metrics.addSentLogs(totalLogs) + ce.metrics.updateLastSuccessfulUpload() + return nil +} + +func (ce *chronicleExporter) buildOptions() []grpc.CallOption { + opts := make([]grpc.CallOption, 0) + + if ce.cfg.Compression == grpcgzip.Name { + opts = append(opts, grpc.UseCompressor(grpcgzip.Name)) + } + + return opts +} + +func (ce *chronicleExporter) startHostMetricsCollection(ctx context.Context) { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + defer ce.wg.Done() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err := ce.metrics.collectHostMetrics() + if err != nil { + ce.logger.Error("Failed to collect host metrics", zap.Error(err)) + } + request := ce.metrics.getAndReset() + _, err = ce.grpcClient.BatchCreateEvents(ctx, request, ce.buildOptions()...) + if err != nil { + ce.logger.Error("Failed to upload host metrics", zap.Error(err)) + } + } + } +} + +func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Logs) error { + payloads, err := ce.marshaler.MarshalRawLogsForHTTP(ctx, ld) + if err != nil { + return fmt.Errorf("marshal logs: %w", err) + } + + for logType, payload := range payloads { + if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil { + return fmt.Errorf("upload to chronicle: %w", err) + } + } + + return nil +} + +// This uses the DataPlane URL for the request +// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}/logTypes/{logtype}/logs:import +func buildEndpoint(cfg *Config, logType string) string { + // Location Endpoint Version Project Location Instance LogType + formatString := "https://%s-%s/%s/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import" + return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, "v1alpha", cfg.Project, cfg.Location, cfg.CustomerID, logType) +} + +func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error { + + data, err := protojson.Marshal(logs) + if err != nil { + return fmt.Errorf("marshal protobuf logs to JSON: %w", err) + } + + var body io.Reader + + if ce.cfg.Compression == grpcgzip.Name { + var b bytes.Buffer + gz := gzip.NewWriter(&b) + if _, err := gz.Write(data); err != nil { + return fmt.Errorf("gzip write: %w", err) + } + if err := gz.Close(); err != nil { + return fmt.Errorf("gzip close: %w", err) + } + body = &b + } else { + body = bytes.NewBuffer(data) + } + + request, err := http.NewRequestWithContext(ctx, "POST", buildEndpoint(ce.cfg, logType), body) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + if ce.cfg.Compression == grpcgzip.Name { + request.Header.Set("Content-Encoding", "gzip") + } + + request.Header.Set("Content-Type", "application/json") + + resp, err := ce.httpClient.Do(request) + if err != nil { + return fmt.Errorf("send request to Chronicle: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + if err != nil { + ce.logger.Warn("Failed to read response body", zap.Error(err)) + } else { + ce.logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody)) + } + return fmt.Errorf("received non-OK response from Chronicle: %s", resp.Status) + } + + return nil +} diff --git a/exporter/chronicleexporter/exporter_test.go b/exporter/chronicleexporter/exporter_test.go new file mode 100644 index 000000000..805148540 --- /dev/null +++ b/exporter/chronicleexporter/exporter_test.go @@ -0,0 +1,171 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "context" + "errors" + "testing" + + "github.com/golang/mock/gomock" + "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" + "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api/mocks" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestLogsDataPusher(t *testing.T) { + + // Set up configuration, logger, and context + cfg := Config{Endpoint: baseEndpoint} + ctx := context.Background() + + testCases := []struct { + desc string + setupExporter func() *chronicleExporter + setupMocks func(*mocks.MockIngestionServiceV2Client) + expectedErr string + permanentErr bool + }{ + { + desc: "successful push to Chronicle", + setupExporter: func() *chronicleExporter { + mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) + marshaller := NewMockMarshaler(t) + marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) + return &chronicleExporter{ + cfg: &cfg, + metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace), + logger: zap.NewNop(), + grpcClient: mockClient, + marshaler: marshaller, + } + }, + setupMocks: func(mockClient *mocks.MockIngestionServiceV2Client) { + mockClient.EXPECT().BatchCreateLogs(gomock.Any(), gomock.Any(), gomock.Any()).Return(&api.BatchCreateLogsResponse{}, nil) + }, + expectedErr: "", + }, + { + desc: "upload to Chronicle fails (transient)", + setupExporter: func() *chronicleExporter { + mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) + marshaller := NewMockMarshaler(t) + marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) + return &chronicleExporter{ + cfg: &cfg, + metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace), + logger: zap.NewNop(), + grpcClient: mockClient, + marshaler: marshaller, + } + }, + setupMocks: func(mockClient *mocks.MockIngestionServiceV2Client) { + // Simulate an error returned from the Chronicle service + err := status.Error(codes.Unavailable, "service unavailable") + mockClient.EXPECT().BatchCreateLogs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, err) + }, + expectedErr: "service unavailable", + }, + { + desc: "upload to Chronicle fails (permanent)", + setupExporter: func() *chronicleExporter { + mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) + marshaller := NewMockMarshaler(t) + marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) + return &chronicleExporter{ + cfg: &cfg, + metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace), + logger: zap.NewNop(), + grpcClient: mockClient, + marshaler: marshaller, + } + }, + setupMocks: func(mockClient *mocks.MockIngestionServiceV2Client) { + err := status.Error(codes.InvalidArgument, "Invalid argument detected.") + mockClient.EXPECT().BatchCreateLogs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, err) + }, + expectedErr: "Invalid argument detected.", + permanentErr: true, + }, + { + desc: "marshaler error", + setupExporter: func() *chronicleExporter { + mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) + marshaller := NewMockMarshaler(t) + // Simulate an error during log marshaling + marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return(nil, errors.New("marshal error")) + return &chronicleExporter{ + cfg: &cfg, + metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace), + logger: zap.NewNop(), + grpcClient: mockClient, + marshaler: marshaller, + } + }, + setupMocks: func(_ *mocks.MockIngestionServiceV2Client) { + // No need to setup mocks for the client as the error occurs before the client is used + }, + expectedErr: "marshal error", + }, + { + desc: "empty log records", + setupExporter: func() *chronicleExporter { + mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) + marshaller := NewMockMarshaler(t) + // Return an empty slice to simulate no logs to push + marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{}, nil) + return &chronicleExporter{ + cfg: &cfg, + metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace), + logger: zap.NewNop(), + grpcClient: mockClient, + marshaler: marshaller, + } + }, + setupMocks: func(_ *mocks.MockIngestionServiceV2Client) { + // Expect no calls to BatchCreateLogs since there are no logs to push + }, + expectedErr: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + exporter := tc.setupExporter() + tc.setupMocks(exporter.grpcClient.(*mocks.MockIngestionServiceV2Client)) + + // Create a dummy plog.Logs to pass to logsDataPusher + logs := mockLogs(mockLogRecord("Test body", map[string]any{"key1": "value1"})) + + err := exporter.logsDataPusher(ctx, logs) + + if tc.expectedErr == "" { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, tc.expectedErr) + if tc.permanentErr { + require.True(t, consumererror.IsPermanent(err), "Expected error to be permanent") + } else { + require.False(t, consumererror.IsPermanent(err), "Expected error to be transient") + } + } + }) + } +} diff --git a/exporter/chronicleexporter/factory.go b/exporter/chronicleexporter/factory.go index 34a1dfee5..6dae68bca 100644 --- a/exporter/chronicleexporter/factory.go +++ b/exporter/chronicleexporter/factory.go @@ -16,12 +16,15 @@ package chronicleexporter import ( "context" + "errors" - "github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/internal/metadata" + "github.com/google/uuid" + "github.com/observiq/bindplane-agent/exporter/chronicleexporter/internal/metadata" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" ) // NewFactory creates a new Chronicle exporter factory. @@ -32,29 +35,17 @@ func NewFactory() exporter.Factory { exporter.WithLogs(createLogsExporter, metadata.LogsStability)) } -const ( - defaultEndpoint = "malachiteingestion-pa.googleapis.com" - defaultBatchLogCountLimitGRPC = 1000 - defaultBatchRequestSizeLimitGRPC = 1048576 - defaultBatchLogCountLimitHTTP = 1000 - defaultBatchRequestSizeLimitHTTP = 1048576 -) - // createDefaultConfig creates the default configuration for the exporter. func createDefaultConfig() component.Config { return &Config{ - Protocol: protocolGRPC, - TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(), - QueueConfig: exporterhelper.NewDefaultQueueConfig(), - BackOffConfig: configretry.NewDefaultBackOffConfig(), - OverrideLogType: true, - Compression: noCompression, - CollectAgentMetrics: true, - Endpoint: defaultEndpoint, - BatchLogCountLimitGRPC: defaultBatchLogCountLimitGRPC, - BatchRequestSizeLimitGRPC: defaultBatchRequestSizeLimitGRPC, - BatchLogCountLimitHTTP: defaultBatchLogCountLimitHTTP, - BatchRequestSizeLimitHTTP: defaultBatchRequestSizeLimitHTTP, + Protocol: protocolGRPC, + TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(), + QueueConfig: exporterhelper.NewDefaultQueueConfig(), + BackOffConfig: configretry.NewDefaultBackOffConfig(), + OverrideLogType: true, + Endpoint: baseEndpoint, + Compression: noCompression, + CollectAgentMetrics: true, } } @@ -63,25 +54,38 @@ func createLogsExporter( ctx context.Context, params exporter.Settings, cfg component.Config, -) (exp exporter.Logs, err error) { - c := cfg.(*Config) - if c.Protocol == protocolHTTPS { - exp, err = newHTTPExporter(c, params) +) (exporter.Logs, error) { + chronicleCfg, ok := cfg.(*Config) + if !ok { + return nil, errors.New("invalid config type") + } + + var cID string + sid, ok := params.Resource.Attributes().Get(semconv.AttributeServiceInstanceID) + if ok { + cID = sid.AsString() } else { - exp, err = newGRPCExporter(c, params) + cID = uuid.New().String() } + + exp, err := newExporter(chronicleCfg, params, cID, params.ID.String()) if err != nil { return nil, err } + + pusher := exp.logsDataPusher + if chronicleCfg.Protocol == protocolHTTPS { + pusher = exp.logsHTTPDataPusher + } return exporterhelper.NewLogs( ctx, params, - c, - exp.ConsumeLogs, + chronicleCfg, + pusher, exporterhelper.WithCapabilities(exp.Capabilities()), - exporterhelper.WithTimeout(c.TimeoutConfig), - exporterhelper.WithQueue(c.QueueConfig), - exporterhelper.WithRetry(c.BackOffConfig), + exporterhelper.WithTimeout(chronicleCfg.TimeoutConfig), + exporterhelper.WithQueue(chronicleCfg.QueueConfig), + exporterhelper.WithRetry(chronicleCfg.BackOffConfig), exporterhelper.WithStart(exp.Start), exporterhelper.WithShutdown(exp.Shutdown), )