From 4cc17b3397a5257a711759dfcdef5dbdc44dda18 Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Mon, 9 Dec 2024 13:15:11 -0500 Subject: [PATCH] feat: Enforce request maximum size and number of logs --- exporter/chronicleexporter/config.go | 47 +- exporter/chronicleexporter/config_test.go | 152 ++- exporter/chronicleexporter/exporter.go | 8 +- exporter/chronicleexporter/factory.go | 25 +- exporter/chronicleexporter/factory_test.go | 20 +- exporter/chronicleexporter/go.mod | 2 +- exporter/chronicleexporter/marshal.go | 133 ++- exporter/chronicleexporter/marshal_test.go | 1020 +++++++++++++++-- .../chronicleexporter/mock_log_marshaler.go | 12 +- 9 files changed, 1203 insertions(+), 216 deletions(-) diff --git a/exporter/chronicleexporter/config.go b/exporter/chronicleexporter/config.go index db191433b..61380ebe6 100644 --- a/exporter/chronicleexporter/config.go +++ b/exporter/chronicleexporter/config.go @@ -85,6 +85,26 @@ type Config struct { // Forwarder is the forwarder that will be used when the protocol is https. Forwarder string `mapstructure:"forwarder"` + + // BatchLogCountLimitGRPC is the maximum number of logs that can be sent in a single batch to Chronicle via the GRPC protocol + // This field is defaulted to 1000, as that is the default Chronicle backend limit. + // All batched logs beyond the backend limit will be dropped. Any batches with more logs than this limit will be split into multiple batches + BatchLogCountLimitGRPC int `mapstructure:"batch_log_count_limit_grpc"` + + // BatchRequestSizeLimitGRPC is the maximum batch request size, in bytes, that can be sent to Chronicle via the GRPC protocol + // This field is defaulted to 1048576 as that is the default Chronicle backend limit + // Setting this option to a value above the Chronicle backend limit may result in rejected log batch requests + BatchRequestSizeLimitGRPC int `mapstructure:"batch_request_size_limit_grpc"` + + // BatchLogCountLimitHTTP is the maximum number of logs that can be sent in a single batch to Chronicle via the HTTP protocol + // This field is defaulted to 1000, as that is the default Chronicle backend limit. + // All batched logs beyond the backend limit will be dropped. Any batches with more logs than this limit will be split into multiple batches + BatchLogCountLimitHTTP int `mapstructure:"batch_log_count_limit_grpc"` + + // BatchRequestSizeLimitHTTP is the maximum batch request size, in bytes, that can be sent to Chronicle via the HTTP protocol + // This field is defaulted to 1048576 as that is the default Chronicle backend limit + // Setting this option to a value above the Chronicle backend limit may result in rejected log batch requests + BatchRequestSizeLimitHTTP int `mapstructure:"batch_request_size_limit_grpc"` } // Validate checks if the configuration is valid. @@ -110,10 +130,6 @@ func (cfg *Config) Validate() error { return fmt.Errorf("endpoint should not contain a protocol: %s", cfg.Endpoint) } - if cfg.Protocol != protocolHTTPS && cfg.Protocol != protocolGRPC { - return fmt.Errorf("invalid protocol: %s", cfg.Protocol) - } - if cfg.Protocol == protocolHTTPS { if cfg.Location == "" { return errors.New("location is required when protocol is https") @@ -124,7 +140,28 @@ func (cfg *Config) Validate() error { if cfg.Forwarder == "" { return errors.New("forwarder is required when protocol is https") } + if cfg.BatchLogCountLimitHTTP <= 0 { + return errors.New("positive batch count log limit is required when protocol is https") + } + + if cfg.BatchRequestSizeLimitHTTP <= 0 { + return errors.New("positive batch request size limit is required when protocol is https") + } + + return nil + } + + if cfg.Protocol == protocolGRPC { + if cfg.BatchLogCountLimitGRPC <= 0 { + return errors.New("positive batch count log limit is required when protocol is grpc") + } + + if cfg.BatchRequestSizeLimitGRPC <= 0 { + return errors.New("positive batch request size limit is required when protocol is grpc") + } + + return nil } - return nil + return fmt.Errorf("invalid protocol: %s", cfg.Protocol) } diff --git a/exporter/chronicleexporter/config_test.go b/exporter/chronicleexporter/config_test.go index d72590c43..f23c99731 100644 --- a/exporter/chronicleexporter/config_test.go +++ b/exporter/chronicleexporter/config_test.go @@ -29,44 +29,76 @@ func TestConfigValidate(t *testing.T) { { desc: "Both creds_file_path and creds are set", config: &Config{ - CredsFilePath: "/path/to/creds_file", - Creds: "creds_example", - LogType: "log_type_example", - Compression: noCompression, + CredsFilePath: "/path/to/creds_file", + Creds: "creds_example", + LogType: "log_type_example", + Compression: noCompression, + BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC, + BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC, }, expectedErr: "can only specify creds_file_path or creds", }, { desc: "Valid config with creds", config: &Config{ - Creds: "creds_example", - LogType: "log_type_example", - Compression: noCompression, - Protocol: protocolGRPC, + Creds: "creds_example", + LogType: "log_type_example", + Compression: noCompression, + Protocol: protocolGRPC, + BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC, + BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC, }, expectedErr: "", }, { desc: "Valid config with creds_file_path", config: &Config{ - CredsFilePath: "/path/to/creds_file", - LogType: "log_type_example", - Compression: noCompression, - Protocol: protocolGRPC, + CredsFilePath: "/path/to/creds_file", + LogType: "log_type_example", + Compression: noCompression, + Protocol: protocolGRPC, + BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC, + BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC, }, expectedErr: "", }, { desc: "Valid config with raw log field", config: &Config{ - CredsFilePath: "/path/to/creds_file", - LogType: "log_type_example", - RawLogField: `body["field"]`, - Compression: noCompression, - Protocol: protocolGRPC, + CredsFilePath: "/path/to/creds_file", + LogType: "log_type_example", + RawLogField: `body["field"]`, + Compression: noCompression, + Protocol: protocolGRPC, + BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC, + BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC, }, expectedErr: "", }, + { + desc: "Invalid batch log count limit", + config: &Config{ + Creds: "creds_example", + LogType: "log_type_example", + Compression: noCompression, + Protocol: protocolGRPC, + BatchLogCountLimitGRPC: 0, + BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC, + }, + expectedErr: "positive batch count log limit is required when protocol is grpc", + }, + { + desc: "Invalid batch request size limit", + config: &Config{ + Creds: "creds_example", + LogType: "log_type_example", + Compression: noCompression, + Protocol: protocolGRPC, + BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC, + BatchRequestSizeLimitGRPC: 0, + }, + expectedErr: "positive batch request size limit is required when protocol is grpc", + }, { desc: "Invalid compression type", config: &Config{ @@ -79,39 +111,89 @@ func TestConfigValidate(t *testing.T) { { desc: "Protocol is https and location is empty", config: &Config{ - CredsFilePath: "/path/to/creds_file", - LogType: "log_type_example", - Protocol: protocolHTTPS, - Compression: noCompression, - Forwarder: "forwarder_example", - Project: "project_example", + CredsFilePath: "/path/to/creds_file", + LogType: "log_type_example", + Protocol: protocolHTTPS, + Compression: noCompression, + Forwarder: "forwarder_example", + Project: "project_example", + BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP, + BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP, }, expectedErr: "location is required when protocol is https", }, { desc: "Protocol is https and forwarder is empty", config: &Config{ - CredsFilePath: "/path/to/creds_file", - LogType: "log_type_example", - Protocol: protocolHTTPS, - Compression: noCompression, - Project: "project_example", - Location: "location_example", + CredsFilePath: "/path/to/creds_file", + LogType: "log_type_example", + Protocol: protocolHTTPS, + Compression: noCompression, + Project: "project_example", + Location: "location_example", + BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP, + BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP, }, expectedErr: "forwarder is required when protocol is https", }, { desc: "Protocol is https and project is empty", config: &Config{ - CredsFilePath: "/path/to/creds_file", - LogType: "log_type_example", - Protocol: protocolHTTPS, - Compression: noCompression, - Location: "location_example", - Forwarder: "forwarder_example", + CredsFilePath: "/path/to/creds_file", + LogType: "log_type_example", + Protocol: protocolHTTPS, + Compression: noCompression, + Location: "location_example", + Forwarder: "forwarder_example", + BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP, + BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP, }, expectedErr: "project is required when protocol is https", }, + { + desc: "Protocol is https and http batch log count limit is 0", + config: &Config{ + CredsFilePath: "/path/to/creds_file", + LogType: "log_type_example", + Protocol: protocolHTTPS, + Compression: noCompression, + Project: "project_example", + Location: "location_example", + Forwarder: "forwarder_example", + BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP, + BatchLogCountLimitHTTP: 0, + }, + expectedErr: "positive batch count log limit is required when protocol is https", + }, + { + desc: "Protocol is https and http batch request size limit is 0", + config: &Config{ + CredsFilePath: "/path/to/creds_file", + LogType: "log_type_example", + Protocol: protocolHTTPS, + Compression: noCompression, + Project: "project_example", + Location: "location_example", + Forwarder: "forwarder_example", + BatchRequestSizeLimitHTTP: 0, + BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP, + }, + expectedErr: "positive batch request size limit is required when protocol is https", + }, + { + desc: "Valid https config", + config: &Config{ + CredsFilePath: "/path/to/creds_file", + LogType: "log_type_example", + Protocol: protocolHTTPS, + Compression: noCompression, + Project: "project_example", + Location: "location_example", + Forwarder: "forwarder_example", + BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP, + BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP, + }, + }, } for _, tc := range testCases { diff --git a/exporter/chronicleexporter/exporter.go b/exporter/chronicleexporter/exporter.go index b70cee5cf..a5dce88ed 100644 --- a/exporter/chronicleexporter/exporter.go +++ b/exporter/chronicleexporter/exporter.go @@ -257,9 +257,11 @@ func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Log return fmt.Errorf("marshal logs: %w", err) } - for logType, payload := range payloads { - if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil { - return fmt.Errorf("upload to chronicle: %w", err) + for logType, logTypePayloads := range payloads { + for _, payload := range logTypePayloads { + if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil { + return fmt.Errorf("upload to chronicle: %w", err) + } } } diff --git a/exporter/chronicleexporter/factory.go b/exporter/chronicleexporter/factory.go index 6dae68bca..3f84b9dda 100644 --- a/exporter/chronicleexporter/factory.go +++ b/exporter/chronicleexporter/factory.go @@ -35,17 +35,26 @@ func NewFactory() exporter.Factory { exporter.WithLogs(createLogsExporter, metadata.LogsStability)) } +const DefaultBatchLogCountLimitGRPC = 1000 +const DefaultBatchRequestSizeLimitGRPC = 1048576 +const DefaultBatchLogCountLimitHTTP = 1000 +const DefaultBatchRequestSizeLimitHTTP = 1048576 + // createDefaultConfig creates the default configuration for the exporter. func createDefaultConfig() component.Config { return &Config{ - Protocol: protocolGRPC, - TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(), - QueueConfig: exporterhelper.NewDefaultQueueConfig(), - BackOffConfig: configretry.NewDefaultBackOffConfig(), - OverrideLogType: true, - Endpoint: baseEndpoint, - Compression: noCompression, - CollectAgentMetrics: true, + Protocol: protocolGRPC, + TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(), + QueueConfig: exporterhelper.NewDefaultQueueConfig(), + BackOffConfig: configretry.NewDefaultBackOffConfig(), + OverrideLogType: true, + Endpoint: baseEndpoint, + Compression: noCompression, + CollectAgentMetrics: true, + BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC, + BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC, + BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP, + BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP, } } diff --git a/exporter/chronicleexporter/factory_test.go b/exporter/chronicleexporter/factory_test.go index ee88580b2..fa608141a 100644 --- a/exporter/chronicleexporter/factory_test.go +++ b/exporter/chronicleexporter/factory_test.go @@ -24,14 +24,18 @@ import ( func Test_createDefaultConfig(t *testing.T) { expectedCfg := &Config{ - TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(), - QueueConfig: exporterhelper.NewDefaultQueueConfig(), - BackOffConfig: configretry.NewDefaultBackOffConfig(), - OverrideLogType: true, - Endpoint: "malachiteingestion-pa.googleapis.com", - Compression: "none", - CollectAgentMetrics: true, - Protocol: "gRPC", + TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(), + QueueConfig: exporterhelper.NewDefaultQueueConfig(), + BackOffConfig: configretry.NewDefaultBackOffConfig(), + OverrideLogType: true, + Endpoint: "malachiteingestion-pa.googleapis.com", + Compression: "none", + CollectAgentMetrics: true, + Protocol: protocolGRPC, + BatchLogCountLimitGRPC: DefaultBatchLogCountLimitGRPC, + BatchRequestSizeLimitGRPC: DefaultBatchRequestSizeLimitGRPC, + BatchLogCountLimitHTTP: DefaultBatchLogCountLimitHTTP, + BatchRequestSizeLimitHTTP: DefaultBatchRequestSizeLimitHTTP, } actual := createDefaultConfig() diff --git a/exporter/chronicleexporter/go.mod b/exporter/chronicleexporter/go.mod index e7657ad90..1f8274cb6 100644 --- a/exporter/chronicleexporter/go.mod +++ b/exporter/chronicleexporter/go.mod @@ -21,6 +21,7 @@ require ( go.opentelemetry.io/collector/semconv v0.114.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 golang.org/x/oauth2 v0.24.0 google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 google.golang.org/grpc v1.68.0 @@ -83,7 +84,6 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect go.opentelemetry.io/otel/trace v1.32.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/sys v0.27.0 // indirect golang.org/x/text v0.19.0 // indirect diff --git a/exporter/chronicleexporter/marshal.go b/exporter/chronicleexporter/marshal.go index 7963b6c2c..967761a7f 100644 --- a/exporter/chronicleexporter/marshal.go +++ b/exporter/chronicleexporter/marshal.go @@ -29,6 +29,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -50,7 +51,7 @@ var supportedLogTypes = map[string]string{ //go:generate mockery --name logMarshaler --filename mock_log_marshaler.go --structname MockMarshaler --inpackage type logMarshaler interface { MarshalRawLogs(ctx context.Context, ld plog.Logs) ([]*api.BatchCreateLogsRequest, error) - MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) (map[string]*api.ImportLogsRequest, error) + MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) (map[string][]*api.ImportLogsRequest, error) } type protoMarshaler struct { cfg Config @@ -411,25 +412,61 @@ func (m *protoMarshaler) constructPayloads(rawLogs map[string][]*api.LogEntry, n namespace = m.cfg.Namespace } ingestionLabels := ingestionLabelsMap[logType] - payloads = append(payloads, &api.BatchCreateLogsRequest{ - Batch: &api.LogEntryBatch{ - StartTime: timestamppb.New(m.startTime), - Entries: entries, - LogType: logType, - Source: &api.EventSource{ - CollectorId: m.collectorID, - CustomerId: m.customerID, - Labels: ingestionLabels, - Namespace: namespace, - }, - }, - }) + + request := m.buildGRPCRequest(entries, logType, namespace, ingestionLabels) + + payloads = append(payloads, m.enforceMaximumsGRPCRequest(request)...) } } return payloads } -func (m *protoMarshaler) MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) (map[string]*api.ImportLogsRequest, error) { +func (m *protoMarshaler) enforceMaximumsGRPCRequest(request *api.BatchCreateLogsRequest) []*api.BatchCreateLogsRequest { + size := proto.Size(request) + if size > m.cfg.BatchRequestSizeLimitGRPC || len(request.Batch.Entries) > m.cfg.BatchLogCountLimitGRPC { + if len(request.Batch.Entries) < 2 { + m.teleSettings.Logger.Error("Single entry exceeds max request size. Dropping entry", zap.Int("size", size)) + return []*api.BatchCreateLogsRequest{} + } + + // split request into two + entries := request.Batch.Entries + mid := len(entries) / 2 + leftHalf := entries[:mid] + rightHalf := entries[mid:] + + request.Batch.Entries = leftHalf + otherHalfRequest := m.buildGRPCRequest(rightHalf, request.Batch.LogType, request.Batch.Source.Namespace, request.Batch.Source.Labels) + + // re-enforce max size restriction on each half + enforcedRequest := m.enforceMaximumsGRPCRequest(request) + enforcedOtherHalfRequest := m.enforceMaximumsGRPCRequest(otherHalfRequest) + + return append(enforcedRequest, enforcedOtherHalfRequest...) + } + + return []*api.BatchCreateLogsRequest{ + request, + } +} + +func (m *protoMarshaler) buildGRPCRequest(entries []*api.LogEntry, logType, namespace string, ingestionLabels []*api.Label) *api.BatchCreateLogsRequest { + return &api.BatchCreateLogsRequest{ + Batch: &api.LogEntryBatch{ + StartTime: timestamppb.New(m.startTime), + Entries: entries, + LogType: logType, + Source: &api.EventSource{ + CollectorId: m.collectorID, + CustomerId: m.customerID, + Labels: ingestionLabels, + Namespace: namespace, + }, + }, + } +} + +func (m *protoMarshaler) MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) (map[string][]*api.ImportLogsRequest, error) { rawLogs, err := m.extractRawHTTPLogs(ctx, ld) if err != nil { return nil, fmt.Errorf("extract raw logs: %w", err) @@ -482,26 +519,60 @@ func buildForwarderString(cfg Config) string { return fmt.Sprintf(format, cfg.Project, cfg.Location, cfg.CustomerID, cfg.Forwarder) } -func (m *protoMarshaler) constructHTTPPayloads(rawLogs map[string][]*api.Log) map[string]*api.ImportLogsRequest { - payloads := make(map[string]*api.ImportLogsRequest, len(rawLogs)) +func (m *protoMarshaler) constructHTTPPayloads(rawLogs map[string][]*api.Log) map[string][]*api.ImportLogsRequest { + payloads := make(map[string][]*api.ImportLogsRequest, len(rawLogs)) for logType, entries := range rawLogs { if len(entries) > 0 { - payloads[logType] = - &api.ImportLogsRequest{ - // TODO: Add parent and hint - // We don't yet have solid guidance on what these should be - Parent: "", - Hint: "", - - Source: &api.ImportLogsRequest_InlineSource{ - InlineSource: &api.ImportLogsRequest_LogsInlineSource{ - Forwarder: buildForwarderString(m.cfg), - Logs: entries, - }, - }, - } + request := m.buildHTTPRequest(entries) + + payloads[logType] = m.enforceMaximumsHTTPRequest(request) } } return payloads } + +func (m *protoMarshaler) enforceMaximumsHTTPRequest(request *api.ImportLogsRequest) []*api.ImportLogsRequest { + size := proto.Size(request) + logs := request.GetInlineSource().Logs + if size > m.cfg.BatchRequestSizeLimitHTTP || len(logs) > m.cfg.BatchLogCountLimitHTTP { + if len(logs) < 2 { + m.teleSettings.Logger.Error("Single entry exceeds max request size. Dropping entry", zap.Int("size", size)) + return []*api.ImportLogsRequest{} + } + + // split request into two + mid := len(logs) / 2 + leftHalf := logs[:mid] + rightHalf := logs[mid:] + + request.GetInlineSource().Logs = leftHalf + otherHalfRequest := m.buildHTTPRequest(rightHalf) + + // re-enforce max size restriction on each half + enforcedRequest := m.enforceMaximumsHTTPRequest(request) + enforcedOtherHalfRequest := m.enforceMaximumsHTTPRequest(otherHalfRequest) + + return append(enforcedRequest, enforcedOtherHalfRequest...) + } + + return []*api.ImportLogsRequest{ + request, + } +} + +func (m *protoMarshaler) buildHTTPRequest(entries []*api.Log) *api.ImportLogsRequest { + return &api.ImportLogsRequest{ + // TODO: Add parent and hint + // We don't yet have solid guidance on what these should be + Parent: "", + Hint: "", + + Source: &api.ImportLogsRequest_InlineSource{ + InlineSource: &api.ImportLogsRequest_LogsInlineSource{ + Forwarder: buildForwarderString(m.cfg), + Logs: entries, + }, + }, + } +} diff --git a/exporter/chronicleexporter/marshal_test.go b/exporter/chronicleexporter/marshal_test.go index fb5694239..d7c219186 100644 --- a/exporter/chronicleexporter/marshal_test.go +++ b/exporter/chronicleexporter/marshal_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" + "golang.org/x/exp/rand" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -41,10 +42,12 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "Single log record with expected data", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { return mockLogs(mockLogRecord("Test log message", map[string]any{"log_type": "WINEVTLOG", "namespace": "test", `chronicle_ingestion_label["env"]`: "prod"})) @@ -67,10 +70,12 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "Single log record with expected data, no log_type, namespace, or ingestion labels", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: true, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: true, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { return mockLogs(mockLogRecord("Test log message", nil)) @@ -95,10 +100,12 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "Multiple log records", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { logs := plog.NewLogs() @@ -121,10 +128,12 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "Log record with attributes", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "attributes", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "attributes", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { return mockLogs(mockLogRecord("", map[string]any{"key1": "value1", "log_type": "WINEVTLOG", "namespace": "test", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"})) @@ -143,10 +152,12 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "No log records", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "DEFAULT", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "DEFAULT", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { return plog.NewLogs() // No log records added @@ -158,9 +169,11 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "No log type set in config or attributes", cfg: Config{ - CustomerID: uuid.New().String(), - RawLogField: "body", - OverrideLogType: true, + CustomerID: uuid.New().String(), + RawLogField: "body", + OverrideLogType: true, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { return mockLogs(mockLogRecord("Log without logType", map[string]any{"namespace": "test", `ingestion_label["realkey1"]`: "realvalue1", `ingestion_label["realkey2"]`: "realvalue2"})) @@ -174,10 +187,12 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "Multiple log records with duplicate data, no log type in attributes", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { logs := plog.NewLogs() @@ -205,10 +220,12 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "Multiple log records with different data, no log type in attributes", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { logs := plog.NewLogs() @@ -238,10 +255,12 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "Override log type with attribute", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "DEFAULT", // This should be overridden by the log_type attribute - RawLogField: "body", - OverrideLogType: true, + CustomerID: uuid.New().String(), + LogType: "DEFAULT", // This should be overridden by the log_type attribute + RawLogField: "body", + OverrideLogType: true, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { return mockLogs(mockLogRecord("Log with overridden type", map[string]any{"log_type": "windows_event.application", "namespace": "test", `ingestion_label["realkey1"]`: "realvalue1", `ingestion_label["realkey2"]`: "realvalue2"})) @@ -255,10 +274,12 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "Override log type with chronicle attribute", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "DEFAULT", // This should be overridden by the chronicle_log_type attribute - RawLogField: "body", - OverrideLogType: true, + CustomerID: uuid.New().String(), + LogType: "DEFAULT", // This should be overridden by the chronicle_log_type attribute + RawLogField: "body", + OverrideLogType: true, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { return mockLogs(mockLogRecord("Log with overridden type", map[string]any{"chronicle_log_type": "ASOC_ALERT", "chronicle_namespace": "test", `chronicle_ingestion_label["realkey1"]`: "realvalue1", `chronicle_ingestion_label["realkey2"]`: "realvalue2"})) @@ -280,10 +301,12 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "Multiple log records with duplicate data, log type in attributes", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { logs := plog.NewLogs() @@ -317,10 +340,12 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { { name: "Multiple log records with different data, log type in attributes", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, }, logRecords: func() plog.Logs { logs := plog.NewLogs() @@ -369,6 +394,381 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { } }, }, + { + name: "Many logs, all one batch", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + for i := 0; i < 1000; i++ { + record1 := logRecords.AppendEmpty() + record1.Body().SetStr("Log message") + record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) + } + return logs + }, + + expectations: func(t *testing.T, requests []*api.BatchCreateLogsRequest) { + // verify 1 request, with 1 batch + require.Len(t, requests, 1, "Expected a one-batch request") + batch := requests[0].Batch + require.Len(t, batch.Entries, 1000, "Expected 1000 log entries in the batch") + // verify batch for first log + require.Contains(t, batch.LogType, "WINEVTLOGS") + require.Contains(t, batch.Source.Namespace, "test") + require.Len(t, batch.Source.Labels, 2) + + // verify ingestion labels + for _, req := range requests { + for _, label := range req.Batch.Source.Labels { + require.Contains(t, []string{ + "key1", + "key2", + "key3", + "key4", + }, label.Key) + require.Contains(t, []string{ + "value1", + "value2", + "value3", + "value4", + }, label.Value) + } + } + }, + }, + { + name: "Single batch split into multiple because more than 1000 logs", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + for i := 0; i < 1001; i++ { + record1 := logRecords.AppendEmpty() + record1.Body().SetStr("Log message") + record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) + } + return logs + }, + + expectations: func(t *testing.T, requests []*api.BatchCreateLogsRequest) { + // verify 1 request, with 1 batch + require.Len(t, requests, 2, "Expected a two-batch request") + batch := requests[0].Batch + require.Len(t, batch.Entries, 500, "Expected 500 log entries in the first batch") + // verify batch for first log + require.Contains(t, batch.LogType, "WINEVTLOGS") + require.Contains(t, batch.Source.Namespace, "test") + require.Len(t, batch.Source.Labels, 2) + + batch2 := requests[1].Batch + require.Len(t, batch2.Entries, 501, "Expected 501 log entries in the second batch") + // verify batch for first log + require.Contains(t, batch2.LogType, "WINEVTLOGS") + require.Contains(t, batch2.Source.Namespace, "test") + require.Len(t, batch2.Source.Labels, 2) + + // verify ingestion labels + for _, req := range requests { + for _, label := range req.Batch.Source.Labels { + require.Contains(t, []string{ + "key1", + "key2", + "key3", + "key4", + }, label.Key) + require.Contains(t, []string{ + "value1", + "value2", + "value3", + "value4", + }, label.Value) + } + } + }, + }, + { + name: "Recursively split batch, exceeds 1000 entries multiple times", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + for i := 0; i < 2002; i++ { + record1 := logRecords.AppendEmpty() + record1.Body().SetStr("Log message") + record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) + } + return logs + }, + + expectations: func(t *testing.T, requests []*api.BatchCreateLogsRequest) { + // verify 1 request, with 1 batch + require.Len(t, requests, 4, "Expected a four-batch request") + batch := requests[0].Batch + require.Len(t, batch.Entries, 500, "Expected 500 log entries in the first batch") + // verify batch for first log + require.Contains(t, batch.LogType, "WINEVTLOGS") + require.Contains(t, batch.Source.Namespace, "test") + require.Len(t, batch.Source.Labels, 2) + + batch2 := requests[1].Batch + require.Len(t, batch2.Entries, 501, "Expected 501 log entries in the second batch") + // verify batch for first log + require.Contains(t, batch2.LogType, "WINEVTLOGS") + require.Contains(t, batch2.Source.Namespace, "test") + require.Len(t, batch2.Source.Labels, 2) + + batch3 := requests[2].Batch + require.Len(t, batch3.Entries, 500, "Expected 500 log entries in the third batch") + // verify batch for first log + require.Contains(t, batch3.LogType, "WINEVTLOGS") + require.Contains(t, batch3.Source.Namespace, "test") + require.Len(t, batch3.Source.Labels, 2) + + batch4 := requests[3].Batch + require.Len(t, batch4.Entries, 501, "Expected 501 log entries in the fourth batch") + // verify batch for first log + require.Contains(t, batch4.LogType, "WINEVTLOGS") + require.Contains(t, batch4.Source.Namespace, "test") + require.Len(t, batch4.Source.Labels, 2) + + // verify ingestion labels + for _, req := range requests { + for _, label := range req.Batch.Source.Labels { + require.Contains(t, []string{ + "key1", + "key2", + "key3", + "key4", + }, label.Key) + require.Contains(t, []string{ + "value1", + "value2", + "value3", + "value4", + }, label.Value) + } + } + }, + }, + { + name: "Single batch split into multiple because request size too large", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + // create 640 logs with size 8192 bytes each - totalling 5242880 bytes. non-body fields put us over limit + for i := 0; i < 640; i++ { + record1 := logRecords.AppendEmpty() + body := tokenWithLength(8192) + record1.Body().SetStr(string(body)) + record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) + } + return logs + }, + + expectations: func(t *testing.T, requests []*api.BatchCreateLogsRequest) { + // verify request, with 1 batch + require.Len(t, requests, 2, "Expected a two-batch request") + batch := requests[0].Batch + require.Len(t, batch.Entries, 320, "Expected 320 log entries in the first batch") + // verify batch for first log + require.Contains(t, batch.LogType, "WINEVTLOGS") + require.Contains(t, batch.Source.Namespace, "test") + require.Len(t, batch.Source.Labels, 2) + + batch2 := requests[1].Batch + require.Len(t, batch2.Entries, 320, "Expected 320 log entries in the second batch") + // verify batch for first log + require.Contains(t, batch2.LogType, "WINEVTLOGS") + require.Contains(t, batch2.Source.Namespace, "test") + require.Len(t, batch2.Source.Labels, 2) + + // verify ingestion labels + for _, req := range requests { + for _, label := range req.Batch.Source.Labels { + require.Contains(t, []string{ + "key1", + "key2", + "key3", + "key4", + }, label.Key) + require.Contains(t, []string{ + "value1", + "value2", + "value3", + "value4", + }, label.Value) + } + } + }, + }, + { + name: "Recursively split batch into multiple because request size too large", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + // create 1280 logs with size 8192 bytes each - totalling 5242880 * 2 bytes. non-body fields put us over twice the limit + for i := 0; i < 1280; i++ { + record1 := logRecords.AppendEmpty() + body := tokenWithLength(8192) + record1.Body().SetStr(string(body)) + record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) + } + return logs + }, + + expectations: func(t *testing.T, requests []*api.BatchCreateLogsRequest) { + // verify 1 request, with 1 batch + require.Len(t, requests, 4, "Expected a four-batch request") + batch := requests[0].Batch + require.Len(t, batch.Entries, 320, "Expected 320 log entries in the first batch") + // verify batch for first log + require.Contains(t, batch.LogType, "WINEVTLOGS") + require.Contains(t, batch.Source.Namespace, "test") + require.Len(t, batch.Source.Labels, 2) + + batch2 := requests[1].Batch + require.Len(t, batch2.Entries, 320, "Expected 320 log entries in the second batch") + // verify batch for first log + require.Contains(t, batch2.LogType, "WINEVTLOGS") + require.Contains(t, batch2.Source.Namespace, "test") + require.Len(t, batch2.Source.Labels, 2) + + batch3 := requests[2].Batch + require.Len(t, batch3.Entries, 320, "Expected 320 log entries in the third batch") + // verify batch for first log + require.Contains(t, batch3.LogType, "WINEVTLOGS") + require.Contains(t, batch3.Source.Namespace, "test") + require.Len(t, batch3.Source.Labels, 2) + + batch4 := requests[3].Batch + require.Len(t, batch4.Entries, 320, "Expected 320 log entries in the fourth batch") + // verify batch for first log + require.Contains(t, batch4.LogType, "WINEVTLOGS") + require.Contains(t, batch4.Source.Namespace, "test") + require.Len(t, batch4.Source.Labels, 2) + + // verify ingestion labels + for _, req := range requests { + for _, label := range req.Batch.Source.Labels { + require.Contains(t, []string{ + "key1", + "key2", + "key3", + "key4", + }, label.Key) + require.Contains(t, []string{ + "value1", + "value2", + "value3", + "value4", + }, label.Value) + } + } + }, + }, + { + name: "Unsplittable batch, single log exceeds max request size", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + record1 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + body := tokenWithLength(5242881) + record1.Body().SetStr(string(body)) + record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) + return logs + }, + + expectations: func(t *testing.T, requests []*api.BatchCreateLogsRequest) { + // verify 1 request, with 1 batch + require.Len(t, requests, 0, "Expected a zero requests") + }, + }, + { + name: "Multiple valid log records + unsplittable log entries", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitGRPC: 1000, + BatchRequestSizeLimitGRPC: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + tooLargeBody := string(tokenWithLength(5242881)) + // first normal log, then impossible to split log + logRecords1 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + record1 := logRecords1.AppendEmpty() + record1.Body().SetStr("First log message") + tooLargeRecord1 := logRecords1.AppendEmpty() + tooLargeRecord1.Body().SetStr(tooLargeBody) + // first impossible to split log, then normal log + logRecords2 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + tooLargeRecord2 := logRecords2.AppendEmpty() + tooLargeRecord2.Body().SetStr(tooLargeBody) + record2 := logRecords2.AppendEmpty() + record2.Body().SetStr("Second log message") + return logs + }, + expectations: func(t *testing.T, requests []*api.BatchCreateLogsRequest) { + // this is a kind of weird edge case, the overly large logs makes the final requests quite inefficient, but it's going to be so rare that the inefficiency isn't a real concern + require.Len(t, requests, 2, "Expected two batch requests") + batch1 := requests[0].Batch + require.Len(t, batch1.Entries, 1, "Expected one log entry in the first batch") + // Verifying the first log entry data + require.Equal(t, "First log message", string(batch1.Entries[0].Data)) + + batch2 := requests[1].Batch + require.Len(t, batch2.Entries, 1, "Expected one log entry in the second batch") + // Verifying the second log entry data + require.Equal(t, "Second log message", string(batch2.Entries[0].Data)) + }, + }, } for _, tt := range tests { @@ -398,19 +798,21 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { cfg Config labels []*api.Label logRecords func() plog.Logs - expectations func(t *testing.T, requests map[string]*api.ImportLogsRequest) + expectations func(t *testing.T, requests map[string][]*api.ImportLogsRequest) }{ { name: "Single log record with expected data", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, - Protocol: protocolHTTPS, - Project: "test-project", - Location: "us", - Forwarder: uuid.New().String(), + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + Protocol: protocolHTTPS, + Project: "test-project", + Location: "us", + Forwarder: uuid.New().String(), + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, }, labels: []*api.Label{ {Key: "env", Value: "prod"}, @@ -418,9 +820,9 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { logRecords: func() plog.Logs { return mockLogs(mockLogRecord("Test log message", map[string]any{"log_type": "WINEVTLOG", "namespace": "test"})) }, - expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) { + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { require.Len(t, requests, 1) - logs := requests["WINEVTLOG"].GetInlineSource().Logs + logs := requests["WINEVTLOG"][0].GetInlineSource().Logs require.Len(t, logs, 1) // Convert Data (byte slice) to string for comparison logDataAsString := string(logs[0].Data) @@ -431,10 +833,12 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { { name: "Multiple log records", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, }, labels: []*api.Label{ {Key: "env", Value: "staging"}, @@ -447,9 +851,9 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { record2.Body().SetStr("Second log message") return logs }, - expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) { + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { require.Len(t, requests, 1, "Expected a single batch request") - logs := requests["WINEVTLOG"].GetInlineSource().Logs + logs := requests["WINEVTLOG"][0].GetInlineSource().Logs require.Len(t, logs, 2, "Expected two log entries in the batch") // Verifying the first log entry data require.Equal(t, "First log message", string(logs[0].Data)) @@ -460,18 +864,20 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { { name: "Log record with attributes", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "attributes", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "attributes", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, }, labels: []*api.Label{}, logRecords: func() plog.Logs { return mockLogs(mockLogRecord("", map[string]any{"key1": "value1", "log_type": "WINEVTLOG", "namespace": "test", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"})) }, - expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) { + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { require.Len(t, requests, 1) - logs := requests["WINEVTLOG"].GetInlineSource().Logs + logs := requests["WINEVTLOG"][0].GetInlineSource().Logs // Assuming the attributes are marshaled into the Data field as a JSON string expectedData := `{"key1":"value1", "log_type":"WINEVTLOG", "namespace":"test", "chronicle_ingestion_label[\"key1\"]": "value1", "chronicle_ingestion_label[\"key2\"]": "value2"}` actualData := string(logs[0].Data) @@ -481,34 +887,38 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { { name: "No log records", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "DEFAULT", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "DEFAULT", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, }, labels: []*api.Label{}, logRecords: func() plog.Logs { return plog.NewLogs() // No log records added }, - expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) { + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { require.Len(t, requests, 0, "Expected no requests due to no log records") }, }, { name: "No log type set in config or attributes", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "attributes", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "attributes", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, }, labels: []*api.Label{}, logRecords: func() plog.Logs { return mockLogs(mockLogRecord("", map[string]any{"key1": "value1", "log_type": "WINEVTLOG", "namespace": "test", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"})) }, - expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) { + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { require.Len(t, requests, 1) - logs := requests["WINEVTLOG"].GetInlineSource().Logs + logs := requests["WINEVTLOG"][0].GetInlineSource().Logs // Assuming the attributes are marshaled into the Data field as a JSON string expectedData := `{"key1":"value1", "log_type":"WINEVTLOG", "namespace":"test", "chronicle_ingestion_label[\"key1\"]": "value1", "chronicle_ingestion_label[\"key2\"]": "value2"}` actualData := string(logs[0].Data) @@ -518,10 +928,12 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { { name: "Multiple log records with duplicate data, no log type in attributes", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, }, logRecords: func() plog.Logs { logs := plog.NewLogs() @@ -533,10 +945,10 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { record2.Attributes().FromRaw(map[string]any{"chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) return logs }, - expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) { + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { // verify one request for log type in config require.Len(t, requests, 1, "Expected a single batch request") - logs := requests["WINEVTLOG"].GetInlineSource().Logs + logs := requests["WINEVTLOG"][0].GetInlineSource().Logs // verify batch source labels require.Len(t, logs[0].Labels, 2) require.Len(t, logs, 2, "Expected two log entries in the batch") @@ -549,10 +961,12 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { { name: "Multiple log records with different data, no log type in attributes", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, }, logRecords: func() plog.Logs { logs := plog.NewLogs() @@ -564,10 +978,10 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { record2.Attributes().FromRaw(map[string]any{`chronicle_ingestion_label["key3"]`: "value3", `chronicle_ingestion_label["key4"]`: "value4"}) return logs }, - expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) { + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { // verify one request for one log type require.Len(t, requests, 1, "Expected a single batch request") - logs := requests["WINEVTLOG"].GetInlineSource().Logs + logs := requests["WINEVTLOG"][0].GetInlineSource().Logs require.Len(t, logs, 2, "Expected two log entries in the batch") require.Equal(t, "", logs[0].EnvironmentNamespace) // verify batch source labels @@ -582,34 +996,38 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { { name: "Override log type with attribute", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "DEFAULT", // This should be overridden by the log_type attribute - RawLogField: "body", - OverrideLogType: true, + CustomerID: uuid.New().String(), + LogType: "DEFAULT", // This should be overridden by the log_type attribute + RawLogField: "body", + OverrideLogType: true, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, }, logRecords: func() plog.Logs { return mockLogs(mockLogRecord("Log with overridden type", map[string]any{"log_type": "windows_event.application", "namespace": "test", `ingestion_label["realkey1"]`: "realvalue1", `ingestion_label["realkey2"]`: "realvalue2"})) }, - expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) { + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { require.Len(t, requests, 1) - logs := requests["WINEVTLOG"].GetInlineSource().Logs + logs := requests["WINEVTLOG"][0].GetInlineSource().Logs require.NotEqual(t, len(logs), 0) }, }, { name: "Override log type with chronicle attribute", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "DEFAULT", // This should be overridden by the chronicle_log_type attribute - RawLogField: "body", - OverrideLogType: true, + CustomerID: uuid.New().String(), + LogType: "DEFAULT", // This should be overridden by the chronicle_log_type attribute + RawLogField: "body", + OverrideLogType: true, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, }, logRecords: func() plog.Logs { return mockLogs(mockLogRecord("Log with overridden type", map[string]any{"chronicle_log_type": "ASOC_ALERT", "chronicle_namespace": "test", `chronicle_ingestion_label["realkey1"]`: "realvalue1", `chronicle_ingestion_label["realkey2"]`: "realvalue2"})) }, - expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) { + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { require.Len(t, requests, 1) - logs := requests["ASOC_ALERT"].GetInlineSource().Logs + logs := requests["ASOC_ALERT"][0].GetInlineSource().Logs require.Equal(t, "test", logs[0].EnvironmentNamespace, "Expected namespace to be overridden by attribute") expectedLabels := map[string]string{ "realkey1": "realvalue1", @@ -623,10 +1041,12 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { { name: "Multiple log records with duplicate data, log type in attributes", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, }, logRecords: func() plog.Logs { logs := plog.NewLogs() @@ -639,10 +1059,10 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { record2.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) return logs }, - expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) { + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { // verify 1 request, 2 batches for same log type require.Len(t, requests, 1, "Expected a single batch request") - logs := requests["WINEVTLOGS"].GetInlineSource().Logs + logs := requests["WINEVTLOGS"][0].GetInlineSource().Logs require.Len(t, logs, 2, "Expected two log entries in the batch") // verify variables require.Equal(t, "test1", logs[0].EnvironmentNamespace) @@ -659,10 +1079,12 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { { name: "Multiple log records with different data, log type in attributes", cfg: Config{ - CustomerID: uuid.New().String(), - LogType: "WINEVTLOG", - RawLogField: "body", - OverrideLogType: false, + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, }, logRecords: func() plog.Logs { logs := plog.NewLogs() @@ -676,7 +1098,7 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { return logs }, - expectations: func(t *testing.T, requests map[string]*api.ImportLogsRequest) { + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { expectedLabels := map[string]string{ "key1": "value1", "key2": "value2", @@ -686,7 +1108,7 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { // verify 2 requests, with 1 batch for different log types require.Len(t, requests, 2, "Expected a two batch request") - logs1 := requests["WINEVTLOGS1"].GetInlineSource().Logs + logs1 := requests["WINEVTLOGS1"][0].GetInlineSource().Logs require.Len(t, logs1, 1, "Expected one log entries in the batch") // verify variables for first log require.Equal(t, logs1[0].EnvironmentNamespace, "test1") @@ -695,7 +1117,7 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") } - logs2 := requests["WINEVTLOGS2"].GetInlineSource().Logs + logs2 := requests["WINEVTLOGS2"][0].GetInlineSource().Logs require.Len(t, logs2, 1, "Expected one log entries in the batch") // verify variables for second log require.Equal(t, logs2[0].EnvironmentNamespace, "test2") @@ -705,6 +1127,358 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { } }, }, + { + name: "Many log records all one batch", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + for i := 0; i < 1000; i++ { + record1 := logRecords.AppendEmpty() + record1.Body().SetStr("First log message") + record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) + } + + return logs + }, + + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { + expectedLabels := map[string]string{ + "key1": "value1", + "key2": "value2", + } + // verify 1 requests + require.Len(t, requests, 1, "Expected a one batch request") + + logs1 := requests["WINEVTLOGS1"][0].GetInlineSource().Logs + require.Len(t, logs1, 1000, "Expected one thousand log entries in the batch") + // verify variables for first log + require.Equal(t, logs1[0].EnvironmentNamespace, "test1") + require.Len(t, logs1[0].Labels, 2) + for key, label := range logs1[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + }, + }, + { + name: "Many log records split into two batches", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + for i := 0; i < 1001; i++ { + record1 := logRecords.AppendEmpty() + record1.Body().SetStr("First log message") + record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) + } + + return logs + }, + + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { + expectedLabels := map[string]string{ + "key1": "value1", + "key2": "value2", + } + // verify 1 request log type + require.Len(t, requests, 1, "Expected one log type for the requests") + winEvtLogRequests := requests["WINEVTLOGS1"] + require.Len(t, winEvtLogRequests, 2, "Expected two batches") + + logs1 := winEvtLogRequests[0].GetInlineSource().Logs + require.Len(t, logs1, 500, "Expected 500 log entries in the first batch") + // verify variables for first log + require.Equal(t, logs1[0].EnvironmentNamespace, "test1") + require.Len(t, logs1[0].Labels, 2) + for key, label := range logs1[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + + logs2 := winEvtLogRequests[1].GetInlineSource().Logs + require.Len(t, logs2, 501, "Expected 501 log entries in the second batch") + // verify variables for first log + require.Equal(t, logs2[0].EnvironmentNamespace, "test1") + require.Len(t, logs2[0].Labels, 2) + for key, label := range logs2[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + }, + }, + { + name: "Recursively split batch multiple times because too many logs", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + for i := 0; i < 2002; i++ { + record1 := logRecords.AppendEmpty() + record1.Body().SetStr("First log message") + record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) + } + + return logs + }, + + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { + expectedLabels := map[string]string{ + "key1": "value1", + "key2": "value2", + } + // verify 1 request log type + require.Len(t, requests, 1, "Expected one log type for the requests") + winEvtLogRequests := requests["WINEVTLOGS1"] + require.Len(t, winEvtLogRequests, 4, "Expected four batches") + + logs1 := winEvtLogRequests[0].GetInlineSource().Logs + require.Len(t, logs1, 500, "Expected 500 log entries in the first batch") + // verify variables for first log + require.Equal(t, logs1[0].EnvironmentNamespace, "test1") + require.Len(t, logs1[0].Labels, 2) + for key, label := range logs1[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + + logs2 := winEvtLogRequests[1].GetInlineSource().Logs + require.Len(t, logs2, 501, "Expected 501 log entries in the second batch") + // verify variables for first log + require.Equal(t, logs2[0].EnvironmentNamespace, "test1") + require.Len(t, logs2[0].Labels, 2) + for key, label := range logs2[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + + logs3 := winEvtLogRequests[2].GetInlineSource().Logs + require.Len(t, logs3, 500, "Expected 500 log entries in the third batch") + // verify variables for first log + require.Equal(t, logs3[0].EnvironmentNamespace, "test1") + require.Len(t, logs3[0].Labels, 2) + for key, label := range logs3[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + + logs4 := winEvtLogRequests[3].GetInlineSource().Logs + require.Len(t, logs4, 501, "Expected 501 log entries in the fourth batch") + // verify variables for first log + require.Equal(t, logs4[0].EnvironmentNamespace, "test1") + require.Len(t, logs4[0].Labels, 2) + for key, label := range logs4[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + }, + }, + { + name: "Many log records split into two batches because request size too large", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + // 8192 * 640 = 5242880 + body := tokenWithLength(8192) + for i := 0; i < 640; i++ { + record1 := logRecords.AppendEmpty() + record1.Body().SetStr(string(body)) + record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) + } + + return logs + }, + + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { + expectedLabels := map[string]string{ + "key1": "value1", + "key2": "value2", + } + // verify 1 request log type + require.Len(t, requests, 1, "Expected one log type for the requests") + winEvtLogRequests := requests["WINEVTLOGS1"] + require.Len(t, winEvtLogRequests, 2, "Expected two batches") + + logs1 := winEvtLogRequests[0].GetInlineSource().Logs + require.Len(t, logs1, 320, "Expected 320 log entries in the first batch") + // verify variables for first log + require.Equal(t, logs1[0].EnvironmentNamespace, "test1") + require.Len(t, logs1[0].Labels, 2) + for key, label := range logs1[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + + logs2 := winEvtLogRequests[1].GetInlineSource().Logs + require.Len(t, logs2, 320, "Expected 320 log entries in the second batch") + // verify variables for first log + require.Equal(t, logs2[0].EnvironmentNamespace, "test1") + require.Len(t, logs2[0].Labels, 2) + for key, label := range logs2[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + }, + }, + { + name: "Recursively split into batches because request size too large", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 2000, + BatchRequestSizeLimitHTTP: 5242880, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + logRecords := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + // 8192 * 1280 = 5242880 * 2 + body := tokenWithLength(8192) + for i := 0; i < 1280; i++ { + record1 := logRecords.AppendEmpty() + record1.Body().SetStr(string(body)) + record1.Attributes().FromRaw(map[string]any{"chronicle_log_type": "WINEVTLOGS1", "chronicle_namespace": "test1", `chronicle_ingestion_label["key1"]`: "value1", `chronicle_ingestion_label["key2"]`: "value2"}) + } + + return logs + }, + + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { + expectedLabels := map[string]string{ + "key1": "value1", + "key2": "value2", + } + // verify 1 request log type + require.Len(t, requests, 1, "Expected one log type for the requests") + winEvtLogRequests := requests["WINEVTLOGS1"] + require.Len(t, winEvtLogRequests, 4, "Expected four batches") + + logs1 := winEvtLogRequests[0].GetInlineSource().Logs + require.Len(t, logs1, 320, "Expected 320 log entries in the first batch") + // verify variables for first log + require.Equal(t, logs1[0].EnvironmentNamespace, "test1") + require.Len(t, logs1[0].Labels, 2) + for key, label := range logs1[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + + logs2 := winEvtLogRequests[1].GetInlineSource().Logs + require.Len(t, logs2, 320, "Expected 320 log entries in the second batch") + // verify variables for first log + require.Equal(t, logs2[0].EnvironmentNamespace, "test1") + require.Len(t, logs2[0].Labels, 2) + for key, label := range logs2[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + + logs3 := winEvtLogRequests[2].GetInlineSource().Logs + require.Len(t, logs3, 320, "Expected 320 log entries in the third batch") + // verify variables for first log + require.Equal(t, logs3[0].EnvironmentNamespace, "test1") + require.Len(t, logs3[0].Labels, 2) + for key, label := range logs3[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + + logs4 := winEvtLogRequests[3].GetInlineSource().Logs + require.Len(t, logs4, 320, "Expected 320 log entries in the fourth batch") + // verify variables for first log + require.Equal(t, logs4[0].EnvironmentNamespace, "test1") + require.Len(t, logs4[0].Labels, 2) + for key, label := range logs4[0].Labels { + require.Equal(t, expectedLabels[key], label.Value, "Expected ingestion label to be overridden by attribute") + } + }, + }, + { + name: "Unsplittable log record, single log exceeds request size limit", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 100000, + }, + labels: []*api.Label{ + {Key: "env", Value: "staging"}, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + record1 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + record1.Body().SetStr(string(tokenWithLength(100000))) + return logs + }, + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { + require.Len(t, requests, 1, "Expected one log type") + require.Len(t, requests["WINEVTLOG"], 0, "Expected WINEVTLOG log type to have zero requests") + }, + }, + { + name: "Unsplittable log record, single log exceeds request size limit, mixed with okay logs", + cfg: Config{ + CustomerID: uuid.New().String(), + LogType: "WINEVTLOG", + RawLogField: "body", + OverrideLogType: false, + BatchLogCountLimitHTTP: 1000, + BatchRequestSizeLimitHTTP: 100000, + }, + labels: []*api.Label{ + {Key: "env", Value: "staging"}, + }, + logRecords: func() plog.Logs { + logs := plog.NewLogs() + tooLargeBody := string(tokenWithLength(100001)) + // first normal log, then impossible to split log + logRecords1 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + record1 := logRecords1.AppendEmpty() + record1.Body().SetStr("First log message") + tooLargeRecord1 := logRecords1.AppendEmpty() + tooLargeRecord1.Body().SetStr(tooLargeBody) + // first impossible to split log, then normal log + logRecords2 := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + tooLargeRecord2 := logRecords2.AppendEmpty() + tooLargeRecord2.Body().SetStr(tooLargeBody) + record2 := logRecords2.AppendEmpty() + record2.Body().SetStr("Second log message") + return logs + }, + expectations: func(t *testing.T, requests map[string][]*api.ImportLogsRequest) { + require.Len(t, requests, 1, "Expected one log type") + winEvtLogRequests := requests["WINEVTLOG"] + require.Len(t, winEvtLogRequests, 2, "Expected WINEVTLOG log type to have zero requests") + + logs1 := winEvtLogRequests[0].GetInlineSource().Logs + require.Len(t, logs1, 1, "Expected 1 log entry in the first batch") + require.Equal(t, string(logs1[0].Data), "First log message") + + logs2 := winEvtLogRequests[1].GetInlineSource().Logs + require.Len(t, logs2, 1, "Expected 1 log entry in the second batch") + require.Equal(t, string(logs2[0].Data), "Second log message") + }, + }, } for _, tt := range tests { @@ -725,6 +1499,15 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { } } +func tokenWithLength(length int) []byte { + charset := "abcdefghijklmnopqrstuvwxyz" + b := make([]byte, length) + for i := range b { + b[i] = charset[rand.Intn(len(charset))] + } + return b +} + func mockLogRecord(body string, attributes map[string]any) plog.LogRecord { lr := plog.NewLogRecord() lr.Body().SetStr(body) @@ -920,5 +1703,4 @@ func Benchmark_getRawField(b *testing.B) { } }) } - } diff --git a/exporter/chronicleexporter/mock_log_marshaler.go b/exporter/chronicleexporter/mock_log_marshaler.go index 6c3fd6aa6..9550d1155 100644 --- a/exporter/chronicleexporter/mock_log_marshaler.go +++ b/exporter/chronicleexporter/mock_log_marshaler.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.1. DO NOT EDIT. +// Code generated by mockery v2.50.0. DO NOT EDIT. package chronicleexporter @@ -48,23 +48,23 @@ func (_m *MockMarshaler) MarshalRawLogs(ctx context.Context, ld plog.Logs) ([]*a } // MarshalRawLogsForHTTP provides a mock function with given fields: ctx, ld -func (_m *MockMarshaler) MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) (map[string]*api.ImportLogsRequest, error) { +func (_m *MockMarshaler) MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) (map[string][]*api.ImportLogsRequest, error) { ret := _m.Called(ctx, ld) if len(ret) == 0 { panic("no return value specified for MarshalRawLogsForHTTP") } - var r0 map[string]*api.ImportLogsRequest + var r0 map[string][]*api.ImportLogsRequest var r1 error - if rf, ok := ret.Get(0).(func(context.Context, plog.Logs) (map[string]*api.ImportLogsRequest, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, plog.Logs) (map[string][]*api.ImportLogsRequest, error)); ok { return rf(ctx, ld) } - if rf, ok := ret.Get(0).(func(context.Context, plog.Logs) map[string]*api.ImportLogsRequest); ok { + if rf, ok := ret.Get(0).(func(context.Context, plog.Logs) map[string][]*api.ImportLogsRequest); ok { r0 = rf(ctx, ld) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]*api.ImportLogsRequest) + r0 = ret.Get(0).(map[string][]*api.ImportLogsRequest) } }