Skip to content

Commit

Permalink
feat: Enforce request maximum size and number of logs
Browse files Browse the repository at this point in the history
  • Loading branch information
mrsillydog committed Dec 9, 2024
1 parent ffe69f0 commit 4cc17b3
Show file tree
Hide file tree
Showing 9 changed files with 1,203 additions and 216 deletions.
47 changes: 42 additions & 5 deletions exporter/chronicleexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ type Config struct {

// Forwarder is the forwarder that will be used when the protocol is https.
Forwarder string `mapstructure:"forwarder"`

// BatchLogCountLimitGRPC is the maximum number of logs that can be sent in a single batch to Chronicle via the GRPC protocol
// This field is defaulted to 1000, as that is the default Chronicle backend limit.
// All batched logs beyond the backend limit will be dropped. Any batches with more logs than this limit will be split into multiple batches
BatchLogCountLimitGRPC int `mapstructure:"batch_log_count_limit_grpc"`

// BatchRequestSizeLimitGRPC is the maximum batch request size, in bytes, that can be sent to Chronicle via the GRPC protocol
// This field is defaulted to 1048576 as that is the default Chronicle backend limit
// Setting this option to a value above the Chronicle backend limit may result in rejected log batch requests
BatchRequestSizeLimitGRPC int `mapstructure:"batch_request_size_limit_grpc"`

// BatchLogCountLimitHTTP is the maximum number of logs that can be sent in a single batch to Chronicle via the HTTP protocol
// This field is defaulted to 1000, as that is the default Chronicle backend limit.
// All batched logs beyond the backend limit will be dropped. Any batches with more logs than this limit will be split into multiple batches
BatchLogCountLimitHTTP int `mapstructure:"batch_log_count_limit_grpc"`

// BatchRequestSizeLimitHTTP is the maximum batch request size, in bytes, that can be sent to Chronicle via the HTTP protocol
// This field is defaulted to 1048576 as that is the default Chronicle backend limit
// Setting this option to a value above the Chronicle backend limit may result in rejected log batch requests
BatchRequestSizeLimitHTTP int `mapstructure:"batch_request_size_limit_grpc"`
}

// Validate checks if the configuration is valid.
Expand All @@ -110,10 +130,6 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("endpoint should not contain a protocol: %s", cfg.Endpoint)
}

if cfg.Protocol != protocolHTTPS && cfg.Protocol != protocolGRPC {
return fmt.Errorf("invalid protocol: %s", cfg.Protocol)
}

if cfg.Protocol == protocolHTTPS {
if cfg.Location == "" {
return errors.New("location is required when protocol is https")
Expand All @@ -124,7 +140,28 @@ func (cfg *Config) Validate() error {
if cfg.Forwarder == "" {
return errors.New("forwarder is required when protocol is https")
}
if cfg.BatchLogCountLimitHTTP <= 0 {
return errors.New("positive batch count log limit is required when protocol is https")
}

if cfg.BatchRequestSizeLimitHTTP <= 0 {
return errors.New("positive batch request size limit is required when protocol is https")
}

return nil
}

if cfg.Protocol == protocolGRPC {
if cfg.BatchLogCountLimitGRPC <= 0 {
return errors.New("positive batch count log limit is required when protocol is grpc")
}

if cfg.BatchRequestSizeLimitGRPC <= 0 {
return errors.New("positive batch request size limit is required when protocol is grpc")
}

return nil
}

return nil
return fmt.Errorf("invalid protocol: %s", cfg.Protocol)
}
152 changes: 117 additions & 35 deletions exporter/chronicleexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,44 +29,76 @@ func TestConfigValidate(t *testing.T) {
{
desc: "Both creds_file_path and creds are set",
config: &Config{
CredsFilePath: "/path/to/creds_file",
Creds: "creds_example",
LogType: "log_type_example",
Compression: noCompression,
CredsFilePath: "/path/to/creds_file",
Creds: "creds_example",
LogType: "log_type_example",
Compression: noCompression,
BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC,
BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC,
},
expectedErr: "can only specify creds_file_path or creds",
},
{
desc: "Valid config with creds",
config: &Config{
Creds: "creds_example",
LogType: "log_type_example",
Compression: noCompression,
Protocol: protocolGRPC,
Creds: "creds_example",
LogType: "log_type_example",
Compression: noCompression,
Protocol: protocolGRPC,
BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC,
BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC,
},
expectedErr: "",
},
{
desc: "Valid config with creds_file_path",
config: &Config{
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Compression: noCompression,
Protocol: protocolGRPC,
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Compression: noCompression,
Protocol: protocolGRPC,
BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC,
BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC,
},
expectedErr: "",
},
{
desc: "Valid config with raw log field",
config: &Config{
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
RawLogField: `body["field"]`,
Compression: noCompression,
Protocol: protocolGRPC,
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
RawLogField: `body["field"]`,
Compression: noCompression,
Protocol: protocolGRPC,
BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC,
BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC,
},
expectedErr: "",
},
{
desc: "Invalid batch log count limit",
config: &Config{
Creds: "creds_example",
LogType: "log_type_example",
Compression: noCompression,
Protocol: protocolGRPC,
BatchLogCountLimitGRPC: 0,
BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC,
},
expectedErr: "positive batch count log limit is required when protocol is grpc",
},
{
desc: "Invalid batch request size limit",
config: &Config{
Creds: "creds_example",
LogType: "log_type_example",
Compression: noCompression,
Protocol: protocolGRPC,
BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC,
BatchRequestSizeLimitGRPC: 0,
},
expectedErr: "positive batch request size limit is required when protocol is grpc",
},
{
desc: "Invalid compression type",
config: &Config{
Expand All @@ -79,39 +111,89 @@ func TestConfigValidate(t *testing.T) {
{
desc: "Protocol is https and location is empty",
config: &Config{
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Forwarder: "forwarder_example",
Project: "project_example",
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Forwarder: "forwarder_example",
Project: "project_example",
BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP,
BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP,
},
expectedErr: "location is required when protocol is https",
},
{
desc: "Protocol is https and forwarder is empty",
config: &Config{
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Project: "project_example",
Location: "location_example",
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Project: "project_example",
Location: "location_example",
BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP,
BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP,
},
expectedErr: "forwarder is required when protocol is https",
},
{
desc: "Protocol is https and project is empty",
config: &Config{
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Location: "location_example",
Forwarder: "forwarder_example",
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Location: "location_example",
Forwarder: "forwarder_example",
BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP,
BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP,
},
expectedErr: "project is required when protocol is https",
},
{
desc: "Protocol is https and http batch log count limit is 0",
config: &Config{
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Project: "project_example",
Location: "location_example",
Forwarder: "forwarder_example",
BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP,
BatchLogCountLimitHTTP: 0,
},
expectedErr: "positive batch count log limit is required when protocol is https",
},
{
desc: "Protocol is https and http batch request size limit is 0",
config: &Config{
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Project: "project_example",
Location: "location_example",
Forwarder: "forwarder_example",
BatchRequestSizeLimitHTTP: 0,
BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP,
},
expectedErr: "positive batch request size limit is required when protocol is https",
},
{
desc: "Valid https config",
config: &Config{
CredsFilePath: "/path/to/creds_file",
LogType: "log_type_example",
Protocol: protocolHTTPS,
Compression: noCompression,
Project: "project_example",
Location: "location_example",
Forwarder: "forwarder_example",
BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP,
BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP,
},
},
}

for _, tc := range testCases {
Expand Down
8 changes: 5 additions & 3 deletions exporter/chronicleexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,11 @@ func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Log
return fmt.Errorf("marshal logs: %w", err)
}

for logType, payload := range payloads {
if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil {
return fmt.Errorf("upload to chronicle: %w", err)
for logType, logTypePayloads := range payloads {
for _, payload := range logTypePayloads {
if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil {
return fmt.Errorf("upload to chronicle: %w", err)
}
}
}

Expand Down
25 changes: 17 additions & 8 deletions exporter/chronicleexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,26 @@ func NewFactory() exporter.Factory {
exporter.WithLogs(createLogsExporter, metadata.LogsStability))
}

const DefaultBatchLogCountLimitGRPC = 1000
const DefaultBatchRequestSizeLimitGRPC = 1048576
const DefaultBatchLogCountLimitHTTP = 1000
const DefaultBatchRequestSizeLimitHTTP = 1048576

// createDefaultConfig creates the default configuration for the exporter.
func createDefaultConfig() component.Config {
return &Config{
Protocol: protocolGRPC,
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
OverrideLogType: true,
Endpoint: baseEndpoint,
Compression: noCompression,
CollectAgentMetrics: true,
Protocol: protocolGRPC,
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
OverrideLogType: true,
Endpoint: baseEndpoint,
Compression: noCompression,
CollectAgentMetrics: true,
BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC,
BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC,
BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP,
BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP,
}
}

Expand Down
20 changes: 12 additions & 8 deletions exporter/chronicleexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ import (

func Test_createDefaultConfig(t *testing.T) {
expectedCfg := &Config{
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
OverrideLogType: true,
Endpoint: "malachiteingestion-pa.googleapis.com",
Compression: "none",
CollectAgentMetrics: true,
Protocol: "gRPC",
TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(),
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
OverrideLogType: true,
Endpoint: "malachiteingestion-pa.googleapis.com",
Compression: "none",
CollectAgentMetrics: true,
Protocol: protocolGRPC,
BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC,
BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC,
BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP,
BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP,
}

actual := createDefaultConfig()
Expand Down
2 changes: 1 addition & 1 deletion exporter/chronicleexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
go.opentelemetry.io/collector/semconv v0.114.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
golang.org/x/oauth2 v0.24.0
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1
google.golang.org/grpc v1.68.0
Expand Down Expand Up @@ -83,7 +84,6 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.19.0 // indirect
Expand Down
Loading

0 comments on commit 4cc17b3

Please sign in to comment.