Skip to content

Commit

Permalink
chore: Add new tests for chronicle exporter with http and grpc servers (
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored and Caleb-Hurshman committed Dec 17, 2024
1 parent 37c45e1 commit f1caa87
Showing 1 changed file with 37 additions and 43 deletions.
80 changes: 37 additions & 43 deletions exporter/chronicleexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"io"
"net/http"
"os"

"github.com/google/uuid"
"github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/protos/api"
Expand Down Expand Up @@ -80,23 +79,19 @@ func newExporter(cfg *Config, params exporter.Settings, exporterID string) (*chr
}, nil
}

func (ce *chronicleExporter) Start(_ context.Context, _ component.Host) error {
creds, err := loadGoogleCredentials(ce.cfg)
func (ce *chronicleExporter) Start(ctx context.Context, _ component.Host) error {
ts, err := tokenSource(ctx, ce.cfg)
if err != nil {
return fmt.Errorf("load Google credentials: %w", err)
}

if ce.cfg.Protocol == protocolHTTPS {
ce.httpClient = oauth2.NewClient(context.Background(), creds.TokenSource)
ce.httpClient = oauth2.NewClient(context.Background(), ts)
return nil
}

opts := []grpc.DialOption{
// Apply OAuth tokens for each RPC call
grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: creds.TokenSource}),
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
}
conn, err := grpc.NewClient(ce.cfg.Endpoint+":443", opts...)
endpoint, dialOpts := grpcClientParams(ce.cfg.Endpoint, ts)
conn, err := grpc.NewClient(endpoint, dialOpts...)
if err != nil {
return fmt.Errorf("dial: %w", err)
}
Expand Down Expand Up @@ -143,31 +138,6 @@ func (ce *chronicleExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func loadGoogleCredentials(cfg *Config) (*google.Credentials, error) {
scope := grpcScope
if cfg.Protocol == protocolHTTPS {
scope = httpScope
}

switch {
case cfg.Creds != "":
return google.CredentialsFromJSON(context.Background(), []byte(cfg.Creds), scope)
case cfg.CredsFilePath != "":
credsData, err := os.ReadFile(cfg.CredsFilePath)
if err != nil {
return nil, fmt.Errorf("read credentials file: %w", err)
}

if len(credsData) == 0 {
return nil, errors.New("credentials file is empty")
}

return google.CredentialsFromJSON(context.Background(), credsData, scope)
default:
return google.FindDefaultCredentials(context.Background(), scope)
}
}

func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) error {
payloads, err := ce.marshaler.MarshalRawLogs(ctx, ld)
if err != nil {
Expand Down Expand Up @@ -281,14 +251,38 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
if err != nil {
ce.set.Logger.Warn("Failed to read response body", zap.Error(err))
} else {
ce.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody))
}
return fmt.Errorf("received non-OK response from Chronicle: %s", resp.Status)
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}

return nil
if err != nil {
ce.set.Logger.Warn("Failed to read response body", zap.Error(err))
} else {
ce.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody))
}

// TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/http.go
statusErr := fmt.Errorf("upload logs to chronicle: %s", resp.Status)
switch resp.StatusCode {
case http.StatusInternalServerError, http.StatusServiceUnavailable: // potentially transient
return statusErr
default:
return consumererror.NewPermanent(statusErr)
}
}

// Override for testing
var grpcClientParams = func(cfgEndpoint string, ts oauth2.TokenSource) (string, []grpc.DialOption) {
return cfgEndpoint + ":443", []grpc.DialOption{
grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: ts}),
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
}
}

// This uses the DataPlane URL for the request
// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}
// Override for testing
var httpEndpoint = func(cfg *Config, logType string) string {
formatString := "https://%s-%s/v1alpha/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import"
return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, cfg.Project, cfg.Location, cfg.CustomerID, logType)
}

0 comments on commit f1caa87

Please sign in to comment.