Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update SSAPI receiver to accept UTC timestamps (BPS-293) #2088

Merged
merged 5 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions receiver/splunksearchapireceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Unlike other receivers, the SSAPI receiver is not built to collect live data. In
| token_type | string | `(no default)` | Specifies the type of token used to authenticate to Splunk using `auth_token`. Accepted values are "Bearer" or "Splunk". |
| job_poll_interval | duration | `5s` | The receiver uses an API call to determine if a search has completed. Specifies how long to wait between polling for search job completion. |
| searches.query | string | `required (no default)` | The Splunk search to run to retrieve the desired events. Queries must start with `search` and should not contain additional commands, nor any time fields (e.g. `earliesttime`) |
| searches.earliest_time | string | `required (no default)` | The earliest timestamp to collect logs. Only logs that occurred at or after this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. |
| searches.latest_time | string | `required (no default)` | The latest timestamp to collect logs. Only logs that occurred at or before this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. |
| searches.earliest_time | string | `required (no default)` | The earliest timestamp to collect logs. Only logs that occurred at or after this timestamp will be collected. Must be in 'yyyy-MM-ddTHH:mm:ss' format (UTC). |
schmikei marked this conversation as resolved.
Show resolved Hide resolved
| searches.latest_time | string | `required (no default)` | The latest timestamp to collect logs. Only logs that occurred at or before this timestamp will be collected. Must be in 'yyyy-MM-ddTHH:mm:ss' format (UTC). |
| searches.event_batch_size | int | `100` | The amount of events to query from Splunk for a single request. |
| storage | component | `required (no default)` | The component ID of a storage extension which can be used when polling for `logs`. The storage extension prevents duplication of data after an exporter error by remembering which events were previously exported. |

Expand Down Expand Up @@ -100,4 +100,4 @@ service:
receivers: [splunksearchapi]
exporters: [googlecloud]
```
You are now ready to migrate events from Splunk to Google Cloud Logging.
You are now ready to migrate events from Splunk to Google Cloud Logging.
9 changes: 5 additions & 4 deletions receiver/splunksearchapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"time"

"github.com/observiq/bindplane-otel-collector/internal/rehydration"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
)
Expand Down Expand Up @@ -113,14 +114,14 @@ func (cfg *Config) Validate() error {
}

// parse time strings to time.Time
_, err := time.Parse(time.RFC3339, search.EarliestTime)
_, err := time.Parse(rehydration.TimeFormat, search.EarliestTime)
if err != nil {
return errors.New("earliest_time failed to parse as RFC3339")
return errors.New("earliest_time failed to parse")
}

_, err = time.Parse(time.RFC3339, search.LatestTime)
_, err = time.Parse(rehydration.TimeFormat, search.LatestTime)
if err != nil {
return errors.New("latest_time failed to parse as RFC3339")
return errors.New("latest_time failed to parse")
}

}
Expand Down
74 changes: 37 additions & 37 deletions receiver/splunksearchapireceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
Expand All @@ -57,8 +57,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
Expand All @@ -72,8 +72,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
Expand All @@ -87,8 +87,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
Expand All @@ -103,8 +103,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
Expand All @@ -121,8 +121,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
Expand All @@ -136,8 +136,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
Expand All @@ -160,8 +160,8 @@ func TestValidate(t *testing.T) {
storage: "file_storage",
searches: []Search{
{
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
Expand All @@ -176,7 +176,7 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
LatestTime: "2024-10-30T14:00:00.000Z",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
Expand All @@ -192,11 +192,11 @@ func TestValidate(t *testing.T) {
{
Query: "search index=_internal",
EarliestTime: "-1hr",
LatestTime: "2024-10-30T14:00:00.000Z",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
errText: "earliest_time failed to parse as RFC3339",
errText: "earliest_time failed to parse",
},
{
desc: "Missing latest_time",
Expand All @@ -207,7 +207,7 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
},
},
errExpected: true,
Expand All @@ -222,12 +222,12 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "-1hr",
},
},
errExpected: true,
errText: "latest_time failed to parse as RFC3339",
errText: "latest_time failed to parse",
},
{
desc: "Invalid query chaining",
Expand All @@ -238,8 +238,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal | stats count by sourcetype",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
Expand All @@ -254,8 +254,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: false,
Expand All @@ -269,8 +269,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: false,
Expand All @@ -284,13 +284,13 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
{
Query: "search index=_audit",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: false,
Expand All @@ -304,8 +304,8 @@ func TestValidate(t *testing.T) {
searches: []Search{
{
Query: "search index=_internal",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
Limit: 10,
},
},
Expand All @@ -319,9 +319,9 @@ func TestValidate(t *testing.T) {
storage: "file_storage",
searches: []Search{
{
Query: "search index=_internal earliest=2024-10-30T04:00:00.000Z latest=2024-10-30T14:00:00.000Z",
EarliestTime: "2024-10-30T04:00:00.000Z",
LatestTime: "2024-10-30T14:00:00.000Z",
Query: "search index=_internal earliest=2024-10-30T04:00:00 latest=2024-10-30T14:00:00",
EarliestTime: "2024-10-30T04:00:00",
LatestTime: "2024-10-30T14:00:00",
},
},
errExpected: true,
Expand Down
1 change: 1 addition & 0 deletions receiver/splunksearchapireceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/observiq/bindplane-otel-collector/receiver/splunksearchapirece
go 1.22.7

require (
github.com/observiq/bindplane-otel-collector/internal/rehydration v1.67.1
dpaasman00 marked this conversation as resolved.
Show resolved Hide resolved
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.116.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.116.0
Expand Down
4 changes: 4 additions & 0 deletions receiver/splunksearchapireceiver/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/observiq/bindplane-otel-collector/internal/rehydration v1.67.1 h1:X+uOWmvP0wphb/hAZhvNmyyaHDjG78ALV2t7Afx6pJg=
github.com/observiq/bindplane-otel-collector/internal/rehydration v1.67.1/go.mod h1:A4f0dvwsSmyGE+n/CzKLwjKqLbYvVsDuEuM0eNkeg8E=
github.com/observiq/bindplane-otel-collector/internal/testutils v1.67.1 h1:sKiB1mY9Vr9jb3L83F7XeIdPaojs9/qY5sZsoHDpD2g=
github.com/observiq/bindplane-otel-collector/internal/testutils v1.67.1/go.mod h1:fvKN6ExdQDlebj7vsHXdlU1ail8L2lPfj319Ow/r5o8=
github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.116.0 h1:0MkHxKsm35xWibXRs4hPdQldI7DOhr0pil4cbQ9rOkY=
github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.116.0/go.mod h1:ueNOmFMNZYsp+o20TP/L92ieSmK88tyvmnAr5uEurhg=
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.116.0 h1:va5jloH1Uwrnx8rymcG7ZqLJ49/zWGMz5Dy/iMm1JzI=
Expand Down
8 changes: 4 additions & 4 deletions receiver/splunksearchapireceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestSplunkResultsPaginationFailure(t *testing.T) {
cfg.Searches = []Search{
{
Query: "search index=otel",
EarliestTime: "2024-11-14T00:00:00.000Z",
LatestTime: "2024-11-14T23:59:59.000Z",
EarliestTime: "2024-11-14T00:00:00",
LatestTime: "2024-11-14T23:59:59",
EventBatchSize: 5,
},
}
Expand Down Expand Up @@ -104,8 +104,8 @@ func TestExporterFailure(t *testing.T) {
cfg.Searches = []Search{
{
Query: "search index=otel",
EarliestTime: "2024-11-14T00:00:00.000Z",
LatestTime: "2024-11-14T23:59:59.000Z",
EarliestTime: "2024-11-14T00:00:00",
LatestTime: "2024-11-14T23:59:59",
EventBatchSize: 3,
},
}
Expand Down
24 changes: 17 additions & 7 deletions receiver/splunksearchapireceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"time"

"github.com/observiq/bindplane-otel-collector/internal/rehydration"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -120,8 +122,16 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) {
}

// parse time strings to time.Time
earliestTime, _ := time.Parse(time.RFC3339, search.EarliestTime)
latestTime, _ := time.Parse(time.RFC3339, search.LatestTime)
earliestTime, err := time.Parse(rehydration.TimeFormat, search.EarliestTime)
if err != nil {
ssapir.logger.Error("error parsing earliest time", zap.Error(err))
return
}
latestTime, err := time.Parse(rehydration.TimeFormat, search.LatestTime)
if err != nil {
ssapir.logger.Error("error parsing earliest time", zap.Error(err))
return
}

// create search in Splunk
searchID, err := ssapir.createSplunkSearch(search)
Expand Down Expand Up @@ -155,18 +165,18 @@ func (ssapir *splunksearchapireceiver) runQueries(ctx context.Context) {
limitReached = true
break
}
// convert log timestamp to ISO 8601 (UTC() makes RFC 3339 into ISO 8601)

logTimestamp, err := time.Parse(time.RFC3339, splunkLog.Time)
if err != nil {
ssapir.logger.Error("error parsing log timestamp", zap.Error(err))
break
}
if logTimestamp.UTC().Before(earliestTime.UTC()) {
ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime.UTC()))
if logTimestamp.UTC().Before(earliestTime) {
ssapir.logger.Info("skipping log entry - timestamp before earliestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("earliestTime", earliestTime))
break
}
if logTimestamp.UTC().After(latestTime.UTC()) {
ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime.UTC()))
if logTimestamp.UTC().After(latestTime) {
ssapir.logger.Info("skipping log entry - timestamp after latestTime", zap.Time("time", logTimestamp.UTC()), zap.Time("latestTime", latestTime))
continue
}
log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
Expand Down
Loading