Skip to content

Commit

Permalink
Separate and internalize marshalers, and test exported surface only.
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Dec 16, 2024
1 parent 7a3a1bb commit 160c0d7
Show file tree
Hide file tree
Showing 13 changed files with 2,538 additions and 2,302 deletions.
25 changes: 15 additions & 10 deletions exporter/chronicleexporter/grpc_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"fmt"
"net/http"

"github.com/google/uuid"
"github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/internal/marshal"
"github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/protos/api"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -41,27 +41,32 @@ type grpcExporter struct {
cfg *Config
set component.TelemetrySettings
exporterID string
marshaler *protoMarshaler
marshaler *marshal.GRPC

client api.IngestionServiceV2Client
conn *grpc.ClientConn
metrics *hostMetricsReporter
}

func newGRPCExporter(cfg *Config, params exporter.Settings) (*grpcExporter, error) {
customerID, err := uuid.Parse(cfg.CustomerID)
marshaler, err := marshal.NewGRPC(marshal.Config{
CustomerID: cfg.CustomerID,
Namespace: cfg.Namespace,
LogType: cfg.LogType,
RawLogField: cfg.RawLogField,
OverrideLogType: cfg.OverrideLogType,
IngestionLabels: cfg.IngestionLabels,
BatchRequestSizeLimit: cfg.BatchRequestSizeLimitGRPC,
BatchLogCountLimit: cfg.BatchLogCountLimitGRPC,
}, params.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("parse customer ID: %w", err)
}
marshaller, err := newProtoMarshaler(*cfg, params.TelemetrySettings, customerID[:])
if err != nil {
return nil, fmt.Errorf("create proto marshaller: %w", err)
return nil, fmt.Errorf("create proto marshaler: %w", err)
}
return &grpcExporter{
cfg: cfg,
set: params.TelemetrySettings,
exporterID: params.ID.String(),
marshaler: marshaller,
marshaler: marshaler,
}, nil
}

Expand Down Expand Up @@ -112,7 +117,7 @@ func (exp *grpcExporter) Shutdown(context.Context) error {
}

func (exp *grpcExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
payloads, err := exp.marshaler.MarshalRawLogs(ctx, ld)
payloads, err := exp.marshaler.MarshalLogs(ctx, ld)
if err != nil {
return fmt.Errorf("marshal logs: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion exporter/chronicleexporter/hostmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/internal/ccid"
"github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/protos/api"
"github.com/shirou/gopsutil/v3/process"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -122,7 +123,7 @@ func (hmr *hostMetricsReporter) getAndReset() *api.BatchCreateEventsRequest {
now := timestamppb.Now()
batchID := uuid.New()
source := &api.EventSource{
CollectorId: chronicleCollectorID[:],
CollectorId: ccid.ChronicleCollectorID[:],
Namespace: hmr.namespace,
CustomerId: hmr.customerID,
}
Expand Down
30 changes: 20 additions & 10 deletions exporter/chronicleexporter/http_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"io"
"net/http"

"github.com/google/uuid"
"github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/internal/marshal"
"github.com/observiq/bindplane-otel-collector/exporter/chronicleexporter/protos/api"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -41,23 +41,33 @@ const httpScope = "https://www.googleapis.com/auth/cloud-platform"
type httpExporter struct {
cfg *Config
set component.TelemetrySettings
marshaler *protoMarshaler
marshaler *marshal.HTTP
client *http.Client
}

func newHTTPExporter(cfg *Config, params exporter.Settings) (*httpExporter, error) {
customerID, err := uuid.Parse(cfg.CustomerID)
marshaler, err := marshal.NewHTTP(marshal.HTTPConfig{
Config: marshal.Config{
CustomerID: cfg.CustomerID,
Namespace: cfg.Namespace,
LogType: cfg.LogType,
RawLogField: cfg.RawLogField,
OverrideLogType: cfg.OverrideLogType,
IngestionLabels: cfg.IngestionLabels,
BatchRequestSizeLimit: cfg.BatchRequestSizeLimitHTTP,
BatchLogCountLimit: cfg.BatchLogCountLimitHTTP,
},
Project: cfg.Project,
Location: cfg.Location,
Forwarder: cfg.Forwarder,
}, params.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("parse customer ID: %w", err)
}
marshaller, err := newProtoMarshaler(*cfg, params.TelemetrySettings, customerID[:])
if err != nil {
return nil, fmt.Errorf("create proto marshaller: %w", err)
return nil, fmt.Errorf("create proto marshaler: %w", err)
}
return &httpExporter{
cfg: cfg,
set: params.TelemetrySettings,
marshaler: marshaller,
marshaler: marshaler,
}, nil
}

Expand All @@ -84,7 +94,7 @@ func (exp *httpExporter) Shutdown(context.Context) error {
}

func (exp *httpExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
payloads, err := exp.marshaler.MarshalRawLogsForHTTP(ctx, ld)
payloads, err := exp.marshaler.MarshalLogs(ctx, ld)
if err != nil {
return fmt.Errorf("marshal logs: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions exporter/chronicleexporter/http_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestHTTPExporter(t *testing.T) {
return logs
}(),
expectedRequests: 1,
expectedErr: "upload logs to chronicle: 503 Service Unavailable",
expectedErr: "upload to chronicle: 503 Service Unavailable",
permanentErr: false,
},
{
Expand All @@ -144,7 +144,7 @@ func TestHTTPExporter(t *testing.T) {
return logs
}(),
expectedRequests: 1,
expectedErr: "Permanent error: upload logs to chronicle: 401 Unauthorized",
expectedErr: "upload to chronicle: Permanent error: 401 Unauthorized",
permanentErr: true,
},
}
Expand Down
23 changes: 23 additions & 0 deletions exporter/chronicleexporter/internal/ccid/ccid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package ccid exposes a hardcoded UUID that is used to identify bindplane agents in Chronicle.
package ccid

import (
"github.com/google/uuid"
)

// ChronicleCollectorID is a specific collector ID for Chronicle. It's used to identify bindplane agents in Chronicle.
var ChronicleCollectorID = uuid.MustParse("aaaa1111-aaaa-1111-aaaa-1111aaaa1111")
Loading

0 comments on commit 160c0d7

Please sign in to comment.