Skip to content

Commit

Permalink
fix: Shut down zombie goroutine in chronicleexporter (#2029)
Browse files Browse the repository at this point in the history
* Properly shut down chronicleexporter zombie goroutine

* Fix lint

* Fix the same problem for the GRPC workflow
  • Loading branch information
mrsillydog authored and Caleb-Hurshman committed Dec 17, 2024
1 parent 6972db8 commit a1f887d
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 105 deletions.
192 changes: 116 additions & 76 deletions exporter/chronicleexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@ import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"

"github.com/google/uuid"
"github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/protos/api"
"github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand All @@ -43,24 +48,28 @@ import (
const (
grpcScope = "https://www.googleapis.com/auth/malachite-ingestion"
httpScope = "https://www.googleapis.com/auth/cloud-platform"

baseEndpoint = "malachiteingestion-pa.googleapis.com"
)

type chronicleExporter struct {
cfg *Config
set component.TelemetrySettings
marshaler logMarshaler
exporterID string
cfg *Config
logger *zap.Logger
marshaler logMarshaler
collectorID, exporterID string

// fields used for gRPC
grpcClient api.IngestionServiceV2Client
grpcConn *grpc.ClientConn
metrics *hostMetricsReporter
wg sync.WaitGroup
cancel context.CancelFunc
metrics *exporterMetrics

// fields used for HTTP
httpClient *http.Client
}

func newExporter(cfg *Config, params exporter.Settings, exporterID string) (*chronicleExporter, error) {
func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID string) (*chronicleExporter, error) {
customerID, err := uuid.Parse(cfg.CustomerID)
if err != nil {
return nil, fmt.Errorf("parse customer ID: %w", err)
Expand All @@ -71,44 +80,49 @@ func newExporter(cfg *Config, params exporter.Settings, exporterID string) (*chr
return nil, fmt.Errorf("create proto marshaller: %w", err)
}

uuidCID, err := uuid.Parse(collectorID)
if err != nil {
return nil, fmt.Errorf("parse collector ID: %w", err)
}

return &chronicleExporter{
cfg: cfg,
set: params.TelemetrySettings,
marshaler: marshaller,
exporterID: exporterID,
cfg: cfg,
logger: params.Logger,
metrics: newExporterMetrics(uuidCID[:], customerID[:], exporterID, cfg.Namespace),
marshaler: marshaller,
collectorID: collectorID,
exporterID: exporterID,
}, nil
}

func (ce *chronicleExporter) Start(ctx context.Context, _ component.Host) error {
ts, err := tokenSource(ctx, ce.cfg)
func (ce *chronicleExporter) Start(_ context.Context, _ component.Host) error {
creds, err := loadGoogleCredentials(ce.cfg)
if err != nil {
return fmt.Errorf("load Google credentials: %w", err)
}

if ce.cfg.Protocol == protocolHTTPS {
ce.httpClient = oauth2.NewClient(context.Background(), ts)
ce.httpClient = oauth2.NewClient(context.Background(), creds.TokenSource)
return nil
}

endpoint, dialOpts := grpcClientParams(ce.cfg.Endpoint, ts)
conn, err := grpc.NewClient(endpoint, dialOpts...)
opts := []grpc.DialOption{
// Apply OAuth tokens for each RPC call
grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: creds.TokenSource}),
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
}
conn, err := grpc.NewClient(ce.cfg.Endpoint+":443", opts...)
if err != nil {
return fmt.Errorf("dial: %w", err)
}
ce.grpcConn = conn
ce.grpcClient = api.NewIngestionServiceV2Client(conn)

if ce.cfg.CollectAgentMetrics {
f := func(ctx context.Context, request *api.BatchCreateEventsRequest) error {
_, err := ce.grpcClient.BatchCreateEvents(ctx, request)
return err
}
metrics, err := newHostMetricsReporter(ce.cfg, ce.set, ce.exporterID, f)
if err != nil {
return fmt.Errorf("create metrics reporter: %w", err)
}
ce.metrics = metrics
ce.metrics.start()
ctx, cancel := context.WithCancel(context.Background())
ce.cancel = cancel
ce.wg.Add(1)
go ce.startHostMetricsCollection(ctx)
}

return nil
Expand All @@ -123,8 +137,9 @@ func (ce *chronicleExporter) Shutdown(context.Context) error {
}
return nil
}
if ce.metrics != nil {
ce.metrics.shutdown()
if ce.cancel != nil {
ce.cancel()
ce.wg.Wait()
}
if ce.grpcConn != nil {
if err := ce.grpcConn.Close(); err != nil {
Expand All @@ -138,6 +153,31 @@ func (ce *chronicleExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func loadGoogleCredentials(cfg *Config) (*google.Credentials, error) {
scope := grpcScope
if cfg.Protocol == protocolHTTPS {
scope = httpScope
}

switch {
case cfg.Creds != "":
return google.CredentialsFromJSON(context.Background(), []byte(cfg.Creds), scope)
case cfg.CredsFilePath != "":
credsData, err := os.ReadFile(cfg.CredsFilePath)
if err != nil {
return nil, fmt.Errorf("read credentials file: %w", err)
}

if len(credsData) == 0 {
return nil, errors.New("credentials file is empty")
}

return google.CredentialsFromJSON(context.Background(), credsData, scope)
default:
return google.FindDefaultCredentials(context.Background(), scope)
}
}

func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) error {
payloads, err := ce.marshaler.MarshalRawLogs(ctx, ld)
if err != nil {
Expand All @@ -146,24 +186,16 @@ func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) e

for _, payload := range payloads {
if err := ce.uploadToChronicle(ctx, payload); err != nil {
return err
return fmt.Errorf("upload to chronicle: %w", err)
}
}

return nil
}

func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api.BatchCreateLogsRequest) error {
if ce.metrics != nil {
totalLogs := int64(len(request.GetBatch().GetEntries()))
defer ce.metrics.recordSent(totalLogs)
}
if ce.metrics != nil {
totalLogs := int64(len(request.GetBatch().GetEntries()))
defer ce.metrics.recordSent(totalLogs)
}
totalLogs := int64(len(request.GetBatch().GetEntries()))

_, err := ce.grpcClient.BatchCreateLogs(ctx, request, ce.buildOptions()...)
_, err := ce.grpcClient.BatchCreateLogs(ctx, request, ce.buildOptions()...)
if err != nil {
errCode := status.Code(err)
Expand All @@ -180,6 +212,8 @@ func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api
}
}

ce.metrics.addSentLogs(totalLogs)
ce.metrics.updateLastSuccessfulUpload()
return nil
}

Expand All @@ -193,23 +227,53 @@ func (ce *chronicleExporter) buildOptions() []grpc.CallOption {
return opts
}

func (ce *chronicleExporter) startHostMetricsCollection(ctx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()

defer ce.wg.Done()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := ce.metrics.collectHostMetrics()
if err != nil {
ce.logger.Error("Failed to collect host metrics", zap.Error(err))
}
request := ce.metrics.getAndReset()
_, err = ce.grpcClient.BatchCreateEvents(ctx, request, ce.buildOptions()...)
if err != nil {
ce.logger.Error("Failed to upload host metrics", zap.Error(err))
}
}
}
}

func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Logs) error {
payloads, err := ce.marshaler.MarshalRawLogsForHTTP(ctx, ld)
if err != nil {
return fmt.Errorf("marshal logs: %w", err)
}

for logType, logTypePayloads := range payloads {
for _, payload := range logTypePayloads {
if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil {
return err
}
for logType, payload := range payloads {
if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil {
return fmt.Errorf("upload to chronicle: %w", err)
}
}

return nil
}

// This uses the DataPlane URL for the request
// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}/logTypes/{logtype}/logs:import
func buildEndpoint(cfg *Config, logType string) string {
// Location Endpoint Version Project Location Instance LogType
formatString := "https://%s-%s/%s/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import"
return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, "v1alpha", cfg.Project, cfg.Location, cfg.CustomerID, logType)
}

func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error {

data, err := protojson.Marshal(logs)
Expand All @@ -233,7 +297,7 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap
body = bytes.NewBuffer(data)
}

request, err := http.NewRequestWithContext(ctx, "POST", httpEndpoint(ce.cfg, logType), body)
request, err := http.NewRequestWithContext(ctx, "POST", buildEndpoint(ce.cfg, logType), body)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
Expand All @@ -251,38 +315,14 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}

if err != nil {
ce.set.Logger.Warn("Failed to read response body", zap.Error(err))
} else {
ce.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody))
}

// TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/http.go
statusErr := fmt.Errorf("upload logs to chronicle: %s", resp.Status)
switch resp.StatusCode {
case http.StatusInternalServerError, http.StatusServiceUnavailable: // potentially transient
return statusErr
default:
return consumererror.NewPermanent(statusErr)
}
}

// Override for testing
var grpcClientParams = func(cfgEndpoint string, ts oauth2.TokenSource) (string, []grpc.DialOption) {
return cfgEndpoint + ":443", []grpc.DialOption{
grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: ts}),
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
if resp.StatusCode != http.StatusOK {
if err != nil {
ce.logger.Warn("Failed to read response body", zap.Error(err))
} else {
ce.logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody))
}
return fmt.Errorf("received non-OK response from Chronicle: %s", resp.Status)
}
}

// This uses the DataPlane URL for the request
// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}
// Override for testing
var httpEndpoint = func(cfg *Config, logType string) string {
formatString := "https://%s-%s/v1alpha/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import"
return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, cfg.Project, cfg.Location, cfg.CustomerID, logType)
return nil
}
Loading

0 comments on commit a1f887d

Please sign in to comment.