Skip to content

Commit

Permalink
feat: SSAPI Receiver (BPS-289) (#1951)
Browse files Browse the repository at this point in the history
* chore: Update modules to v1.67.0

* fix: QRadar README typo (#2028)

Fix README typo

* fix: Shut down zombie goroutine in chronicleexporter (#2029)

* Properly shut down chronicleexporter zombie goroutine

* Fix lint

* Fix the same problem for the GRPC workflow

* ssapi mvp

* lint

* tls

* WIP

* ticker, other pr feedback

* pagination functionality

* break if results earlier than earliest_time

* fix lint

* check for earliest/latest in query

* config unit tests

* package comment

* feat(chronicleexporter): Support dynamic namespace and ingestion labels  (#1939)

* add namespace and ingenstion logs initial commit

* working except ingestion labels

* ingestion labels from attributes

* use proper log entry batch

* namespace and ingestion logs no config overwrite

* delete OverrideNamespace and OverrideIngestionLabeles

* PR changes

* fix unit tests

* modify tests again

* marshal changes

* readme and namespace check

* debug logs

* rm unnecessary clauses

* fix error wording

* rm space

* wip

* client tests

* checkpoint methods

* WIP

* functional checkpoint

* debug logs, rm print

* loadCheckpoint return error

* splunk failure test

* WIP

* encode req body

* stricter query validation

* storage config test

* lint, tidy

* return error on export fail

* tidy

* receiver tests

* receiver tests

* lint

* fix TestCheckpoint

* rename structs

* exporter fail test

* fix search checkpointing

* auth token

* lint

* fix struct name

* rm prints, fix error messages

* fix tests

* default batch size

* log end of export

* readme

* how-to

* how-to example config

* change how-to conf values

* change test batch size

* fix test case

* fix client test

* fix rebase errors

* tidy

* feat: Enforce request maximum size and number of logs (#2033)

* feat: Enforce request maximum size and number of logs

* Fix lint

* Refactor to be more go-idiomatic

* Update Chronicle exporter readme with new flags

* fix: Delete empty values iterates through nested arrays (#2034)

* delete empty values processor iterates through slices

* log body implementation

* pr review

* initial feedback

* chore: Minor cleanup of chronicle exporter names (#2046)

* chore: Save component.TelemetrySettings on chronicle exporter (#2047)

* chore: Minor cleanup of chronicle exporter names

* chore: Chronicle exporter - save component.TelemetrySettings

* safe shutdown()

* chore: Localize chronicle exporter's metrics concerns (#2048)

chore: Pull metrics-specific concerns into hostMetricsReporter

* rm err checkk from time parsing

* chore: Add debug logging (#2042)

Add debug logging

* chore: Add new tests for chronicle exporter with http and grpc servers (#2049)

* ctx check, doc notes

* chore: Rename to `bindplane-otel-collector` (#2043)

* rename to bindplane-otel-collector

* fix website links

* update report card link

* fix: Shut down zombie goroutine in chronicleexporter (#2029)

* Properly shut down chronicleexporter zombie goroutine

* Fix lint

* Fix the same problem for the GRPC workflow

* chore: Save component.TelemetrySettings on chronicle exporter (#2047)

* chore: Minor cleanup of chronicle exporter names

* chore: Chronicle exporter - save component.TelemetrySettings

* chore: Localize chronicle exporter's metrics concerns (#2048)

chore: Pull metrics-specific concerns into hostMetricsReporter

* chore: Add new tests for chronicle exporter with http and grpc servers (#2049)

* fix: Rebase cleanup (#2063)

rebase cleanup

* chore: separate http and grpc exporters (#2050)

* fix: Shut down zombie goroutine in chronicleexporter (#2029)

* Properly shut down chronicleexporter zombie goroutine

* Fix lint

* Fix the same problem for the GRPC workflow

* ssapi mvp

* initial feedback

* chore: Save component.TelemetrySettings on chronicle exporter (#2047)

* chore: Minor cleanup of chronicle exporter names

* chore: Chronicle exporter - save component.TelemetrySettings

* chore: Localize chronicle exporter's metrics concerns (#2048)

chore: Pull metrics-specific concerns into hostMetricsReporter

* chore: Add new tests for chronicle exporter with http and grpc servers (#2049)

* chore: Save component.TelemetrySettings on chronicle exporter (#2047)

* chore: Minor cleanup of chronicle exporter names

* chore: Chronicle exporter - save component.TelemetrySettings

* chore: Localize chronicle exporter's metrics concerns (#2048)

chore: Pull metrics-specific concerns into hostMetricsReporter

* chore: Add new tests for chronicle exporter with http and grpc servers (#2049)

* fix: Shut down zombie goroutine in chronicleexporter (#2029)

* Properly shut down chronicleexporter zombie goroutine

* Fix lint

* Fix the same problem for the GRPC workflow

* fix rebase stuff

---------

Co-authored-by: Dakota Paasman <[email protected]>
Co-authored-by: Sam Hazlehurst <[email protected]>
Co-authored-by: Ian Adams <[email protected]>
Co-authored-by: Justin Voss <[email protected]>
Co-authored-by: Daniel Jaglowski <[email protected]>
  • Loading branch information
6 people authored Dec 18, 2024
1 parent 7776a7e commit be7ff1b
Show file tree
Hide file tree
Showing 18 changed files with 2,599 additions and 0 deletions.
2 changes: 2 additions & 0 deletions factories/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/observiq/bindplane-otel-collector/receiver/pluginreceiver"
"github.com/observiq/bindplane-otel-collector/receiver/routereceiver"
"github.com/observiq/bindplane-otel-collector/receiver/sapnetweaverreceiver"
"github.com/observiq/bindplane-otel-collector/receiver/splunksearchapireceiver"
"github.com/observiq/bindplane-otel-collector/receiver/telemetrygeneratorreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/activedirectorydsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/aerospikereceiver"
Expand Down Expand Up @@ -157,6 +158,7 @@ var defaultReceivers = []receiver.Factory{
sapnetweaverreceiver.NewFactory(),
simpleprometheusreceiver.NewFactory(),
snmpreceiver.NewFactory(),
splunksearchapireceiver.NewFactory(),
splunkhecreceiver.NewFactory(),
sqlqueryreceiver.NewFactory(),
sqlserverreceiver.NewFactory(),
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/observiq/bindplane-otel-collector/receiver/pluginreceiver v1.67.0
github.com/observiq/bindplane-otel-collector/receiver/routereceiver v1.67.0
github.com/observiq/bindplane-otel-collector/receiver/sapnetweaverreceiver v1.67.0
github.com/observiq/bindplane-otel-collector/receiver/splunksearchapireceiver v1.67.0
github.com/observiq/bindplane-otel-collector/receiver/telemetrygeneratorreceiver v1.67.0
github.com/oklog/ulid/v2 v2.1.0
github.com/open-telemetry/opamp-go v0.17.0
Expand Down Expand Up @@ -875,6 +876,8 @@ replace github.com/observiq/bindplane-otel-collector/internal/report => ./intern

replace github.com/observiq/bindplane-otel-collector/internal/measurements => ./internal/measurements

replace github.com/observiq/bindplane-otel-collector/receiver/splunksearchapireceiver => ./receiver/splunksearchapireceiver

// Does not build with windows and only used in configschema executable
// Relevant issue https://github.com/mattn/go-ieproxy/issues/45
replace github.com/mattn/go-ieproxy => github.com/mattn/go-ieproxy v0.0.1
Expand Down
103 changes: 103 additions & 0 deletions receiver/splunksearchapireceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Splunk Search API Receiver
This receiver collects Splunk events using the [Splunk Search API](https://docs.splunk.com/Documentation/Splunk/9.3.1/RESTREF/RESTsearch).

## Supported Pipelines
- Logs

## Prerequisites
- Splunk admin credentials
- Configured storage extension

## Use Case
Unlike other receivers, the SSAPI receiver is not built to collect live data. Instead, it collects a finite set of historical data and transfers it to a destination, preserving the timestamp from the source. For this reason, the SSAPI recevier only needs to be left running until all Splunk events have been migrated, which is denoted by the log message: "all search results exported". Until this log message or some other error is printed, avoid cancelling the collector for any reason, as it will unnecessarily interfere with the receiver's ability to protect against writing duplicate events.

## Configuration
| Field | Type | Default | Description |
|---------------------|----------|-------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| endpoint | string | `required` `(no default)` | The endpoint of the splunk instance to collect from. |
| splunk_username | string | `(no default)` | Specifies the username used to authenticate to Splunk using basic auth. |
| splunk_password | string | `(no default)` | Specifies the password used to authenticate to Splunk using basic auth. |
| auth_token | string | `(no default)` | Specifies the token used to authenticate to Splunk using token auth. Mutually exclusive with basic auth using `splunk_username` and `splunk_password`. |
| token_type | string | `(no default)` | Specifies the type of token used to authenticate to Splunk using `auth_token`. Accepted values are "Bearer" or "Splunk". |
| job_poll_interval | duration | `5s` | The receiver uses an API call to determine if a search has completed. Specifies how long to wait between polling for search job completion. |
| searches.query | string | `required (no default)` | The Splunk search to run to retrieve the desired events. Queries must start with `search` and should not contain additional commands, nor any time fields (e.g. `earliesttime`) |
| searches.earliest_time | string | `required (no default)` | The earliest timestamp to collect logs. Only logs that occurred at or after this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. |
| searches.latest_time | string | `required (no default)` | The latest timestamp to collect logs. Only logs that occurred at or before this timestamp will be collected. Must be in ISO 8601 or RFC3339 format. |
| searches.event_batch_size | int | `100` | The amount of events to query from Splunk for a single request. |
| storage | component | `required (no default)` | The component ID of a storage extension which can be used when polling for `logs`. The storage extension prevents duplication of data after an exporter error by remembering which events were previously exported. |

### Example Configuration
```yaml
receivers:
splunksearchapi:
endpoint: "https://splunk-c4-0.example.localnet:8089"
tls:
insecure_skip_verify: true
splunk_username: "user"
splunk_password: "pass"
job_poll_interval: 5s
searches:
- query: 'search index=my_index'
earliest_time: "2024-11-01T01:00:00.000-05:00"
latest_time: "2024-11-30T23:59:59.999-05:00"
event_batch_size: 500
storage: file_storage

extensions:
file_storage:
directory: "./local/storage"
```
## How To
### Migrate historical events to Google Cloud Logging
1. Identify the Splunk index to migrate events from. Create a Splunk search to capture the events from that index. This will be the `searches.query` you pass to the receiver.
- Example: `search index=my_index`
- Note: queries must begin with the explicit `search` command, and must not include additional commands, nor any time fields (e.g. `earliesttime`)
2. Determine the timeframe you want to migrate events from, and set the `searches.earliest_time` and `searches.latest_time` config fields accordingly.
- To migrate events from December 2024, EST (UTC-5):
- `earliest_time: "2024-12-01T00:00:00.000-05:00"`
- `latest_time: "2024-12-31T23:59:59.999-05:00"`
- Note: By default, GCL will not accept logs with a timestamp older than 30 days. Contact Google to modify this rule.
3. Repeat steps 1 & 2 for each index you wish to collect from
3. Configure a storage extension to store checkpointing data for the receiver.
4. Configure the rest of the receiver fields according to your Splunk environment.
5. Add a `googlecloud` exporter to your config. Configure the exporter to send to a GCP project where your service account has Logging Admin role. To check the permissions of service accounts in your project, go to the [IAM page](https://console.cloud.google.com/iam-admin/iam).
6. Disable the `sending_queue` field on the GCP exporter. The sending queue introduces an asynchronous step to the pipeline, which will jeopardize the receiver's ability to checkpoint correctly and recover from errors. For this same reason, avoid using any asynchronous processors (e.g., batch processor).

After following these steps, your configuration should look something like this:
```yaml
receivers:
splunksearchapi:
endpoint: "https://splunk-c4-0.example.localnet:8089"
tls:
insecure_skip_verify: true
splunk_username: "user"
splunk_password: "pass"
job_poll_interval: 5s
searches:
- query: 'search index=my_index'
earliest_time: "2024-12-01T00:00:00.000-05:00"
latest_time: "2024-12-31T23:59:59.999-05:00"
event_batch_size: 500
storage: file_storage
exporters:
googlecloud:
project: "my-gcp-project"
log:
default_log_name: "splunk-events"
sending_queue:
enabled: false
extensions:
file_storage:
directory: "./local/storage"
service:
extensions: [file_storage]
pipelines:
logs:
receivers: [splunksearchapi]
exporters: [googlecloud]
```
You are now ready to migrate events from Splunk to Google Cloud Logging.
175 changes: 175 additions & 0 deletions receiver/splunksearchapireceiver/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package splunksearchapireceiver contains the Splunk Search API receiver.
package splunksearchapireceiver

import (
"bytes"
"context"
"encoding/json"
"encoding/xml"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
)

type splunkSearchAPIClient interface {
CreateSearchJob(search string) (CreateJobResponse, error)
GetJobStatus(searchID string) (SearchJobStatusResponse, error)
GetSearchResults(searchID string, offset int, batchSize int) (SearchResults, error)
}

type defaultSplunkSearchAPIClient struct {
client *http.Client
endpoint string
logger *zap.Logger
username string
password string
authToken string
tokenType string
}

func newDefaultSplunkSearchAPIClient(ctx context.Context, settings component.TelemetrySettings, conf Config, host component.Host) (*defaultSplunkSearchAPIClient, error) {
client, err := conf.ClientConfig.ToClient(ctx, host, settings)
if err != nil {
return nil, err
}

return &defaultSplunkSearchAPIClient{
client: client,
endpoint: conf.Endpoint,
logger: settings.Logger,
username: conf.Username,
password: conf.Password,
authToken: conf.AuthToken,
tokenType: conf.TokenType,
}, nil
}

func (c *defaultSplunkSearchAPIClient) CreateSearchJob(search string) (CreateJobResponse, error) {
endpoint := fmt.Sprintf("%s/services/search/jobs", c.endpoint)

if !strings.Contains(search, strings.ToLower("starttime=")) || !strings.Contains(search, strings.ToLower("endtime=")) || !strings.Contains(search, strings.ToLower("timeformat=")) {
return CreateJobResponse{}, fmt.Errorf("search query must contain starttime, endtime, and timeformat")
}

reqBody := fmt.Sprintf(`search=%s`, url.QueryEscape(search))
resp, err := c.doSplunkRequest("POST", endpoint, bytes.NewBuffer([]byte(reqBody)))
if err != nil {
return CreateJobResponse{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusCreated {
return CreateJobResponse{}, fmt.Errorf("create search job: %d", resp.StatusCode)
}

var jobResponse CreateJobResponse
body, err := io.ReadAll(resp.Body)
if err != nil {
return CreateJobResponse{}, fmt.Errorf("read search job create response: %w", err)
}

err = xml.Unmarshal(body, &jobResponse)
if err != nil {
return CreateJobResponse{}, fmt.Errorf("unmarshal search job create response: %w", err)
}
return jobResponse, nil
}

func (c *defaultSplunkSearchAPIClient) GetJobStatus(sid string) (SearchJobStatusResponse, error) {
endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s", c.endpoint, sid)

resp, err := c.doSplunkRequest("GET", endpoint, nil)
if err != nil {
return SearchJobStatusResponse{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return SearchJobStatusResponse{}, fmt.Errorf("get search job status: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return SearchJobStatusResponse{}, fmt.Errorf("read search job status response: %w", err)
}
var jobStatusResponse SearchJobStatusResponse
err = xml.Unmarshal(body, &jobStatusResponse)
if err != nil {
return SearchJobStatusResponse{}, fmt.Errorf("unmarshal search job status response: %w", err)
}

return jobStatusResponse, nil
}

func (c *defaultSplunkSearchAPIClient) GetSearchResults(sid string, offset int, batchSize int) (SearchResults, error) {
endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json&offset=%d&count=%d", c.endpoint, sid, offset, batchSize)
resp, err := c.doSplunkRequest("GET", endpoint, nil)
if err != nil {
return SearchResults{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return SearchResults{}, fmt.Errorf("get search job results: %d", resp.StatusCode)
}

var searchResults SearchResults
body, err := io.ReadAll(resp.Body)
if err != nil {
return SearchResults{}, fmt.Errorf("read search job results response: %w", err)
}
err = json.Unmarshal(body, &searchResults)
if err != nil {
return SearchResults{}, fmt.Errorf("unmarshal search job results response: %w", err)
}

return searchResults, nil
}

func (c *defaultSplunkSearchAPIClient) doSplunkRequest(method, endpoint string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequest(method, endpoint, body)
if err != nil {
return nil, fmt.Errorf("new http request: %w", err)
}
err = c.setSplunkRequestAuth(req)
if err != nil {
return nil, fmt.Errorf("set splunk request auth: %w", err)
}
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("client do request: %w", err)
}
return resp, nil
}

func (c *defaultSplunkSearchAPIClient) setSplunkRequestAuth(req *http.Request) error {
if c.authToken != "" {
if strings.EqualFold(c.tokenType, TokenTypeBearer) {
req.Header.Set("Authorization", "Bearer "+string(c.authToken))
} else if strings.EqualFold(c.tokenType, TokenTypeSplunk) {
req.Header.Set("Authorization", "Splunk "+string(c.authToken))
}
} else {
req.SetBasicAuth(c.username, c.password)
}
return nil
}
Loading

0 comments on commit be7ff1b

Please sign in to comment.