Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update SSAPI receiver to accept UTC timestamps (BPS-293) #2088

Merged
merged 5 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ require (
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/observiq/bindplane-otel-collector/counter v1.68.0 // indirect
github.com/observiq/bindplane-otel-collector/expr v1.68.0 // indirect
github.com/observiq/bindplane-otel-collector/internal/rehydration v1.62.0 // indirect
github.com/observiq/bindplane-otel-collector/internal/rehydration v1.68.0 // indirect
github.com/okta/okta-sdk-golang/v2 v2.20.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/googlemanagedprometheusexporter v0.116.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension v0.116.0 // indirect
Expand Down
6 changes: 3 additions & 3 deletions receiver/splunksearchapireceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Unlike other receivers, the SSAPI receiver is not built to collect live data. In
| token_type | string | `(no default)` | Specifies the type of token used to authenticate to Splunk using `auth_token`. Accepted values are "Bearer" or "Splunk". |
| job_poll_interval | duration | `5s` | The receiver uses an API call to determine if a search has completed. Specifies how long to wait between polling for search job completion. |
| searches.query | string | `required (no default)` | The Splunk search to run to retrieve the desired events. Queries must start with `search` and should not contain additional commands, nor any time fields (e.g. `earliesttime`) |
| searches.earliest_time | string | `required (no default)` | The earliest timestamp to collect logs. Only logs that occurred at or after this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. |
| searches.latest_time | string | `required (no default)` | The latest timestamp to collect logs. Only logs that occurred at or before this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. |
| searches.earliest_time | string | `required (no default)` | The earliest timestamp to collect logs. Only logs that occurred at or after this timestamp will be collected. Must be in 'yyyy-MM-ddTHH:mm' format (UTC). |
| searches.latest_time | string | `required (no default)` | The latest timestamp to collect logs. Only logs that occurred at or before this timestamp will be collected. Must be in 'yyyy-MM-ddTHH:mm' format (UTC). |
| searches.event_batch_size | int | `100` | The amount of events to query from Splunk for a single request. |
| storage | component | `required (no default)` | The component ID of a storage extension which can be used when polling for `logs`. The storage extension prevents duplication of data after an exporter error by remembering which events were previously exported. |

Expand Down Expand Up @@ -100,4 +100,4 @@ service:
receivers: [splunksearchapi]
exporters: [googlecloud]
```
You are now ready to migrate events from Splunk to Google Cloud Logging.
You are now ready to migrate events from Splunk to Google Cloud Logging.
9 changes: 5 additions & 4 deletions receiver/splunksearchapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"time"

"github.com/observiq/bindplane-otel-collector/internal/rehydration"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
)
Expand Down Expand Up @@ -113,14 +114,14 @@ func (cfg *Config) Validate() error {
}

// parse time strings to time.Time
_, err := time.Parse(time.RFC3339, search.EarliestTime)
_, err := time.Parse(rehydration.TimeFormat, search.EarliestTime)
if err != nil {
return errors.New("earliest_time failed to parse as RFC3339")
return errors.New("earliest_time failed to parse")
}

_, err = time.Parse(time.RFC3339, search.LatestTime)
_, err = time.Parse(rehydration.TimeFormat, search.LatestTime)
if err != nil {
return errors.New("latest_time failed to parse as RFC3339")
return errors.New("latest_time failed to parse")
}

}
Expand Down
74 changes: 37 additions & 37 deletions receiver/splunksearchapireceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
Expand All @@ -57,8 +57,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
Expand All @@ -72,8 +72,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
Expand All @@ -87,8 +87,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
Expand All @@ -103,8 +103,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
Expand All @@ -121,8 +121,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
Expand All @@ -136,8 +136,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
Expand All @@ -160,8 +160,8 @@ func TestValidate(t *testing.T) {
storage: "file_storage",
searches: []Search{
{
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
Expand All @@ -176,7 +176,7 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
LatestTime: "2024-10-30T14:00:00.000Z",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
Expand All @@ -192,11 +192,11 @@ func TestValidate(t *testing.T) {
{
Query: "search index=_internal",
EarliestTime: "-1hr",
LatestTime: "2024-10-30T14:00:00.000Z",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
errText: "earliest_time failed to parse as RFC3339",
errText: "earliest_time failed to parse",
},
{
desc: "Missing latest_time",
Expand All @@ -207,7 +207,7 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
},
},
errExpected: true,
Expand All @@ -222,12 +222,12 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "-1hr",
},
},
errExpected: true,
errText: "latest_time failed to parse as RFC3339",
errText: "latest_time failed to parse",
},
{
desc: "Invalid query chaining",
Expand All @@ -238,8 +238,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal | stats count by sourcetype",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
Expand All @@ -254,8 +254,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: false,
Expand All @@ -269,8 +269,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: false,
Expand All @@ -284,13 +284,13 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
{
Query: "search index=_audit",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: false,
Expand All @@ -304,8 +304,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
Limit: 10,
},
},
Expand All @@ -319,9 +319,9 @@ func TestValidate(t *testing.T) {
storage: "file_storage",
searches: []Search{
{
Query: "search index=_internal earliest=2024-10-30T04:00:00.000Z latest=2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
Query: "search index=_internal earliest=2024-10-30T04:00 latest=2024-10-30T14:00",
EarliestTime: "2024-10-30T04:00",
LatestTime: "2024-10-30T14:00",
},
},
errExpected: true,
Expand Down
5 changes: 5 additions & 0 deletions receiver/splunksearchapireceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/observiq/bindplane-otel-collector/receiver/splunksearchapirece
go 1.22.7

require (
github.com/observiq/bindplane-otel-collector/internal/rehydration v1.68.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.116.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.116.0
Expand Down Expand Up @@ -87,3 +88,7 @@ require (
google.golang.org/grpc v1.68.1 // indirect
google.golang.org/protobuf v1.35.2 // indirect
)

replace github.com/observiq/bindplane-otel-collector/internal/rehydration => ../../internal/rehydration

replace github.com/observiq/bindplane-otel-collector/internal/testutils => ../../internal/testutils
8 changes: 4 additions & 4 deletions receiver/splunksearchapireceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestSplunkResultsPaginationFailure(t *testing.T) {
cfg.Searches = []Search{
{
Query: "search index=otel",
EarliestTime: "2024-11-14T00:00:00.000Z",
LatestTime: "2024-11-14T23:59:59.000Z",
EarliestTime: "2024-11-14T00:00",
LatestTime: "2024-11-14T23:59",
EventBatchSize: 5,
},
}
Expand Down Expand Up @@ -104,8 +104,8 @@ func TestExporterFailure(t *testing.T) {
cfg.Searches = []Search{
{
Query: "search index=otel",
EarliestTime: "2024-11-14T00:00:00.000Z",
LatestTime: "2024-11-14T23:59:59.000Z",
EarliestTime: "2024-11-14T00:00",
LatestTime: "2024-11-14T23:59",
EventBatchSize: 3,
},
}
Expand Down
24 changes: 17 additions & 7 deletions receiver/splunksearchapireceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"time"

"github.com/observiq/bindplane-otel-collector/internal/rehydration"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -120,8 +122,16 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) {
}

// parse time strings to time.Time
earliestTime, _ := time.Parse(time.RFC3339, search.EarliestTime)
latestTime, _ := time.Parse(time.RFC3339, search.LatestTime)
earliestTime, err := time.Parse(rehydration.TimeFormat, search.EarliestTime)
if err != nil {
ssapir.logger.Error("error parsing earliest time", zap.Error(err))
return
}
latestTime, err := time.Parse(rehydration.TimeFormat, search.LatestTime)
if err != nil {
ssapir.logger.Error("error parsing earliest time", zap.Error(err))
return
}

// create search in Splunk
searchID, err := ssapir.createSplunkSearch(search)
Expand Down Expand Up @@ -155,18 +165,18 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) {
limitReached = true
break
}
// convert log timestamp to ISO 8601 (UTC() makes RFC 3339 into ISO 8601)

logTimestamp, err := time.Parse(time.RFC3339, splunkLog.Time)
if err != nil {
ssapir.logger.Error("error parsing log timestamp", zap.Error(err))
break
}
if logTimestamp.UTC().Before(earliestTime.UTC()) {
ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime.UTC()))
if logTimestamp.UTC().Before(earliestTime) {
ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime))
break
}
if logTimestamp.UTC().After(latestTime.UTC()) {
ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC()))
if logTimestamp.UTC().After(latestTime) {
ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime))
continue
}
log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
Expand Down