Skip to content

Commit

Permalink
chore: separate http and grpc exporters (#2050)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored and Caleb-Hurshman committed Dec 17, 2024
1 parent e04e460 commit 6972db8
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 28 deletions.
28 changes: 11 additions & 17 deletions exporter/chronicleexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package chronicleexporter

import (
"context"
"errors"

"github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/internal/metadata"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -64,30 +63,25 @@ func createLogsExporter(
ctx context.Context,
params exporter.Settings,
cfg component.Config,
) (exporter.Logs, error) {
chronicleCfg, ok := cfg.(*Config)
if !ok {
return nil, errors.New("invalid config type")
) (exp exporter.Logs, err error) {
c := cfg.(*Config)
if c.Protocol == protocolHTTPS {
exp, err = newHTTPExporter(c, params)
} else {
exp, err = newGRPCExporter(c, params)
}

exp, err := newExporter(chronicleCfg, params, params.ID.String())
if err != nil {
return nil, err
}

pusher := exp.logsDataPusher
if chronicleCfg.Protocol == protocolHTTPS {
pusher = exp.logsHTTPDataPusher
}
return exporterhelper.NewLogs(
ctx,
params,
chronicleCfg,
pusher,
c,
exp.ConsumeLogs,
exporterhelper.WithCapabilities(exp.Capabilities()),
exporterhelper.WithTimeout(chronicleCfg.TimeoutConfig),
exporterhelper.WithQueue(chronicleCfg.QueueConfig),
exporterhelper.WithRetry(chronicleCfg.BackOffConfig),
exporterhelper.WithTimeout(c.TimeoutConfig),
exporterhelper.WithQueue(c.QueueConfig),
exporterhelper.WithRetry(c.BackOffConfig),
exporterhelper.WithStart(exp.Start),
exporterhelper.WithShutdown(exp.Shutdown),
)
Expand Down
160 changes: 160 additions & 0 deletions exporter/chronicleexporter/grpc_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chronicleexporter

import (
"context"
"fmt"
"net/http"

"github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/protos/api"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"golang.org/x/oauth2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
grpcgzip "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
)

const grpcScope = "https://www.googleapis.com/auth/malachite-ingestion"

type grpcExporter struct {
cfg *Config
set component.TelemetrySettings
exporterID string
marshaler *protoMarshaler

client api.IngestionServiceV2Client
conn *grpc.ClientConn
metrics *hostMetricsReporter
}

func newGRPCExporter(cfg *Config, params exporter.Settings) (*grpcExporter, error) {
marshaler, err := newProtoMarshaler(*cfg, params.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("create proto marshaler: %w", err)
}
return &grpcExporter{
cfg: cfg,
set: params.TelemetrySettings,
exporterID: params.ID.String(),
marshaler: marshaler,
}, nil
}

func (exp *grpcExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (exp *grpcExporter) Start(ctx context.Context, _ component.Host) error {
ts, err := tokenSource(ctx, exp.cfg)
if err != nil {
return fmt.Errorf("load Google credentials: %w", err)
}
endpoint, dialOpts := grpcClientParams(exp.cfg.Endpoint, ts)
conn, err := grpc.NewClient(endpoint, dialOpts...)
if err != nil {
return fmt.Errorf("dial: %w", err)
}
exp.conn = conn
exp.client = api.NewIngestionServiceV2Client(conn)

if exp.cfg.CollectAgentMetrics {
f := func(ctx context.Context, request *api.BatchCreateEventsRequest) error {
_, err := exp.client.BatchCreateEvents(ctx, request)
return err
}
metrics, err := newHostMetricsReporter(exp.cfg, exp.set, exp.exporterID, f)
if err != nil {
return fmt.Errorf("create metrics reporter: %w", err)
}
exp.metrics = metrics
exp.metrics.start()
}

return nil
}

func (exp *grpcExporter) Shutdown(context.Context) error {
defer http.DefaultTransport.(*http.Transport).CloseIdleConnections()
if exp.metrics != nil {
exp.metrics.shutdown()
}
if exp.conn != nil {
if err := exp.conn.Close(); err != nil {
return fmt.Errorf("connection close: %s", err)
}
}
return nil
}

func (exp *grpcExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
payloads, err := exp.marshaler.MarshalRawLogs(ctx, ld)
if err != nil {
return fmt.Errorf("marshal logs: %w", err)
}
for _, payload := range payloads {
if err := exp.uploadToChronicle(ctx, payload); err != nil {
return err
}
}
return nil
}

func (exp *grpcExporter) uploadToChronicle(ctx context.Context, request *api.BatchCreateLogsRequest) error {
if exp.metrics != nil {
totalLogs := int64(len(request.GetBatch().GetEntries()))
defer exp.metrics.recordSent(totalLogs)
}
_, err := exp.client.BatchCreateLogs(ctx, request, exp.buildOptions()...)
if err != nil {
errCode := status.Code(err)
switch errCode {
// These errors are potentially transient
// TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/grpc.go
case codes.Canceled,
codes.Unavailable,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted:
return fmt.Errorf("upload logs to chronicle: %w", err)
default:
return consumererror.NewPermanent(fmt.Errorf("upload logs to chronicle: %w", err))
}
}
return nil
}

func (exp *grpcExporter) buildOptions() []grpc.CallOption {
opts := make([]grpc.CallOption, 0)
if exp.cfg.Compression == grpcgzip.Name {
opts = append(opts, grpc.UseCompressor(grpcgzip.Name))
}
return opts
}

// Override for testing
var grpcClientParams = func(cfgEndpoint string, ts oauth2.TokenSource) (string, []grpc.DialOption) {
return cfgEndpoint + ":443", []grpc.DialOption{
grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: ts}),
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
}
}
161 changes: 161 additions & 0 deletions exporter/chronicleexporter/http_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chronicleexporter

import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net/http"

"github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/protos/api"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"golang.org/x/oauth2"
grpcgzip "google.golang.org/grpc/encoding/gzip"
"google.golang.org/protobuf/encoding/protojson"
)

const httpScope = "https://www.googleapis.com/auth/cloud-platform"

type httpExporter struct {
cfg *Config
set component.TelemetrySettings
marshaler *protoMarshaler
client *http.Client
}

func newHTTPExporter(cfg *Config, params exporter.Settings) (*httpExporter, error) {
marshaler, err := newProtoMarshaler(*cfg, params.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("create proto marshaler: %w", err)
}
return &httpExporter{
cfg: cfg,
set: params.TelemetrySettings,
marshaler: marshaler,
}, nil
}

func (exp *httpExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (exp *httpExporter) Start(ctx context.Context, _ component.Host) error {
ts, err := tokenSource(ctx, exp.cfg)
if err != nil {
return fmt.Errorf("load Google credentials: %w", err)
}
exp.client = oauth2.NewClient(context.Background(), ts)
return nil
}

func (exp *httpExporter) Shutdown(context.Context) error {
defer http.DefaultTransport.(*http.Transport).CloseIdleConnections()
t := exp.client.Transport.(*oauth2.Transport)
if t.Base != nil {
t.Base.(*http.Transport).CloseIdleConnections()
}
return nil
}

func (exp *httpExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
payloads, err := exp.marshaler.MarshalRawLogsForHTTP(ctx, ld)
if err != nil {
return fmt.Errorf("marshal logs: %w", err)
}
for logType, logTypePayloads := range payloads {
for _, payload := range logTypePayloads {
if err := exp.uploadToChronicleHTTP(ctx, payload, logType); err != nil {
return fmt.Errorf("upload to chronicle: %w", err)
}
}
}
return nil
}

func (exp *httpExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error {
data, err := protojson.Marshal(logs)
if err != nil {
return fmt.Errorf("marshal protobuf logs to JSON: %w", err)
}

var body io.Reader
if exp.cfg.Compression == grpcgzip.Name {
var b bytes.Buffer
gz := gzip.NewWriter(&b)
if _, err := gz.Write(data); err != nil {
return fmt.Errorf("gzip write: %w", err)
}
if err := gz.Close(); err != nil {
return fmt.Errorf("gzip close: %w", err)
}
body = &b
} else {
body = bytes.NewBuffer(data)
}

request, err := http.NewRequestWithContext(ctx, "POST", httpEndpoint(exp.cfg, logType), body)
if err != nil {
return fmt.Errorf("create request: %w", err)
}

if exp.cfg.Compression == grpcgzip.Name {
request.Header.Set("Content-Encoding", "gzip")
}

request.Header.Set("Content-Type", "application/json")

resp, err := exp.client.Do(request)
if err != nil {
return fmt.Errorf("send request to Chronicle: %w", err)
}
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}

if err != nil {
exp.set.Logger.Warn("Failed to read response body", zap.Error(err))
} else {
exp.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody))
}

// TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/http.go
statusErr := errors.New(resp.Status)
switch resp.StatusCode {
case http.StatusInternalServerError, http.StatusServiceUnavailable: // potentially transient
return statusErr
default:
return consumererror.NewPermanent(statusErr)
}
}

// This uses the DataPlane URL for the request
// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}
// Override for testing
var httpEndpoint = func(cfg *Config, logType string) string {
formatString := "https://%s-%s/v1alpha/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import"
return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, cfg.Project, cfg.Location, cfg.CustomerID, logType)
}
4 changes: 2 additions & 2 deletions exporter/chronicleexporter/http_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestHTTPExporter(t *testing.T) {
return logs
}(),
expectedRequests: 1,
expectedErr: "upload logs to chronicle: 503 Service Unavailable",
expectedErr: "upload to chronicle: 503 Service Unavailable",
permanentErr: false,
},
{
Expand All @@ -144,7 +144,7 @@ func TestHTTPExporter(t *testing.T) {
return logs
}(),
expectedRequests: 1,
expectedErr: "Permanent error: upload logs to chronicle: 401 Unauthorized",
expectedErr: "upload to chronicle: Permanent error: 401 Unauthorized",
permanentErr: true,
},
}
Expand Down
Loading

0 comments on commit 6972db8

Please sign in to comment.