diff --git a/exporter/chronicleexporter/exporter.go b/exporter/chronicleexporter/exporter.go index 52b2a8fc4..b70cee5cf 100644 --- a/exporter/chronicleexporter/exporter.go +++ b/exporter/chronicleexporter/exporter.go @@ -18,9 +18,13 @@ import ( "bytes" "compress/gzip" "context" + "errors" "fmt" "io" "net/http" + "os" + "sync" + "time" "github.com/google/uuid" "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" @@ -31,6 +35,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" "golang.org/x/oauth2" + "golang.org/x/oauth2/google" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -48,21 +53,23 @@ const ( ) type chronicleExporter struct { - cfg *Config - set component.TelemetrySettings - marshaler logMarshaler - exporterID string + cfg *Config + logger *zap.Logger + marshaler logMarshaler + collectorID, exporterID string // fields used for gRPC grpcClient api.IngestionServiceV2Client grpcConn *grpc.ClientConn - metrics *hostMetricsReporter + wg sync.WaitGroup + cancel context.CancelFunc + metrics *exporterMetrics // fields used for HTTP httpClient *http.Client } -func newExporter(cfg *Config, params exporter.Settings, exporterID string) (*chronicleExporter, error) { +func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID string) (*chronicleExporter, error) { customerID, err := uuid.Parse(cfg.CustomerID) if err != nil { return nil, fmt.Errorf("parse customer ID: %w", err) @@ -73,27 +80,38 @@ func newExporter(cfg *Config, params exporter.Settings, exporterID string) (*chr return nil, fmt.Errorf("create proto marshaller: %w", err) } + uuidCID, err := uuid.Parse(collectorID) + if err != nil { + return nil, fmt.Errorf("parse collector ID: %w", err) + } + return &chronicleExporter{ - cfg: cfg, - set: params.TelemetrySettings, - marshaler: marshaller, - exporterID: exporterID, + cfg: cfg, + logger: params.Logger, + metrics: newExporterMetrics(uuidCID[:], customerID[:], exporterID, cfg.Namespace), + marshaler: marshaller, + collectorID: collectorID, + exporterID: exporterID, }, nil } -func (ce *chronicleExporter) Start(ctx context.Context, _ component.Host) error { - ts, err := tokenSource(ctx, ce.cfg) +func (ce *chronicleExporter) Start(_ context.Context, _ component.Host) error { + creds, err := loadGoogleCredentials(ce.cfg) if err != nil { return fmt.Errorf("load Google credentials: %w", err) } if ce.cfg.Protocol == protocolHTTPS { - ce.httpClient = oauth2.NewClient(context.Background(), ts) + ce.httpClient = oauth2.NewClient(context.Background(), creds.TokenSource) return nil } - endpoint, dialOpts := grpcClientParams(ce.cfg.Endpoint, ts) - conn, err := grpc.NewClient(endpoint, dialOpts...) + opts := []grpc.DialOption{ + // Apply OAuth tokens for each RPC call + grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: creds.TokenSource}), + grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), + } + conn, err := grpc.NewClient(ce.cfg.Endpoint+":443", opts...) if err != nil { return fmt.Errorf("dial: %w", err) } @@ -101,16 +119,10 @@ func (ce *chronicleExporter) Start(ctx context.Context, _ component.Host) error ce.grpcClient = api.NewIngestionServiceV2Client(conn) if ce.cfg.CollectAgentMetrics { - f := func(ctx context.Context, request *api.BatchCreateEventsRequest) error { - _, err := ce.grpcClient.BatchCreateEvents(ctx, request) - return err - } - metrics, err := newHostMetricsReporter(ce.cfg, ce.set, ce.exporterID, f) - if err != nil { - return fmt.Errorf("create metrics reporter: %w", err) - } - ce.metrics = metrics - ce.metrics.start() + ctx, cancel := context.WithCancel(context.Background()) + ce.cancel = cancel + ce.wg.Add(1) + go ce.startHostMetricsCollection(ctx) } return nil @@ -125,8 +137,9 @@ func (ce *chronicleExporter) Shutdown(context.Context) error { } return nil } - if ce.metrics != nil { - ce.metrics.shutdown() + if ce.cancel != nil { + ce.cancel() + ce.wg.Wait() } if ce.grpcConn != nil { if err := ce.grpcConn.Close(); err != nil { @@ -140,6 +153,31 @@ func (ce *chronicleExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } +func loadGoogleCredentials(cfg *Config) (*google.Credentials, error) { + scope := grpcScope + if cfg.Protocol == protocolHTTPS { + scope = httpScope + } + + switch { + case cfg.Creds != "": + return google.CredentialsFromJSON(context.Background(), []byte(cfg.Creds), scope) + case cfg.CredsFilePath != "": + credsData, err := os.ReadFile(cfg.CredsFilePath) + if err != nil { + return nil, fmt.Errorf("read credentials file: %w", err) + } + + if len(credsData) == 0 { + return nil, errors.New("credentials file is empty") + } + + return google.CredentialsFromJSON(context.Background(), credsData, scope) + default: + return google.FindDefaultCredentials(context.Background(), scope) + } +} + func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) error { payloads, err := ce.marshaler.MarshalRawLogs(ctx, ld) if err != nil { @@ -148,7 +186,7 @@ func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) e for _, payload := range payloads { if err := ce.uploadToChronicle(ctx, payload); err != nil { - return err + return fmt.Errorf("upload to chronicle: %w", err) } } @@ -156,10 +194,7 @@ func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) e } func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api.BatchCreateLogsRequest) error { - if ce.metrics != nil { - totalLogs := int64(len(request.GetBatch().GetEntries())) - defer ce.metrics.recordSent(totalLogs) - } + totalLogs := int64(len(request.GetBatch().GetEntries())) _, err := ce.grpcClient.BatchCreateLogs(ctx, request, ce.buildOptions()...) if err != nil { @@ -177,6 +212,8 @@ func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api } } + ce.metrics.addSentLogs(totalLogs) + ce.metrics.updateLastSuccessfulUpload() return nil } @@ -190,6 +227,30 @@ func (ce *chronicleExporter) buildOptions() []grpc.CallOption { return opts } +func (ce *chronicleExporter) startHostMetricsCollection(ctx context.Context) { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + defer ce.wg.Done() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err := ce.metrics.collectHostMetrics() + if err != nil { + ce.logger.Error("Failed to collect host metrics", zap.Error(err)) + } + request := ce.metrics.getAndReset() + _, err = ce.grpcClient.BatchCreateEvents(ctx, request, ce.buildOptions()...) + if err != nil { + ce.logger.Error("Failed to upload host metrics", zap.Error(err)) + } + } + } +} + func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Logs) error { payloads, err := ce.marshaler.MarshalRawLogsForHTTP(ctx, ld) if err != nil { @@ -205,6 +266,14 @@ func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Log return nil } +// This uses the DataPlane URL for the request +// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}/logTypes/{logtype}/logs:import +func buildEndpoint(cfg *Config, logType string) string { + // Location Endpoint Version Project Location Instance LogType + formatString := "https://%s-%s/%s/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import" + return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, "v1alpha", cfg.Project, cfg.Location, cfg.CustomerID, logType) +} + func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error { data, err := protojson.Marshal(logs) @@ -228,7 +297,7 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap body = bytes.NewBuffer(data) } - request, err := http.NewRequestWithContext(ctx, "POST", httpEndpoint(ce.cfg, logType), body) + request, err := http.NewRequestWithContext(ctx, "POST", buildEndpoint(ce.cfg, logType), body) if err != nil { return fmt.Errorf("create request: %w", err) } @@ -246,38 +315,14 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) - if err == nil && resp.StatusCode == http.StatusOK { - return nil - } - - if err != nil { - ce.set.Logger.Warn("Failed to read response body", zap.Error(err)) - } else { - ce.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody)) - } - - // TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/http.go - statusErr := fmt.Errorf("upload logs to chronicle: %s", resp.Status) - switch resp.StatusCode { - case http.StatusInternalServerError, http.StatusServiceUnavailable: // potentially transient - return statusErr - default: - return consumererror.NewPermanent(statusErr) - } -} - -// Override for testing -var grpcClientParams = func(cfgEndpoint string, ts oauth2.TokenSource) (string, []grpc.DialOption) { - return cfgEndpoint + ":443", []grpc.DialOption{ - grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: ts}), - grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), + if resp.StatusCode != http.StatusOK { + if err != nil { + ce.logger.Warn("Failed to read response body", zap.Error(err)) + } else { + ce.logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody)) + } + return fmt.Errorf("received non-OK response from Chronicle: %s", resp.Status) } -} -// This uses the DataPlane URL for the request -// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID} -// Override for testing -var httpEndpoint = func(cfg *Config, logType string) string { - formatString := "https://%s-%s/v1alpha/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import" - return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, cfg.Project, cfg.Location, cfg.CustomerID, logType) + return nil }