Skip to content

Commit

Permalink
added newrelic close buffer (#577)
Browse files Browse the repository at this point in the history
* added newrelic close buffer

* made necessary fixes

* made necessary fixes

* unexport client from client.go
  • Loading branch information
armstrmi authored Feb 28, 2022
1 parent 4c1a539 commit 9129334
Show file tree
Hide file tree
Showing 3 changed files with 380 additions and 103 deletions.
112 changes: 112 additions & 0 deletions operator/builtin/output/newrelic/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package newrelic

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"

"github.com/observiq/stanza/v2/version"
otelerrors "github.com/open-telemetry/opentelemetry-log-collection/errors"
)

// Client is an interface for sending a log payload to new relic
type client interface {
SendPayload(context.Context, LogPayload) error
TestConnection(context.Context) error
}

// client is the standard implementation of the Client interface
type nroClient struct {
endpoint *url.URL
headers http.Header
httpClient *http.Client
}

// NewClient creates a standard client for sending logs to new relic
func newClient(endpoint *url.URL, headers http.Header) client {
return &nroClient{
endpoint: endpoint,
headers: headers,
httpClient: &http.Client{},
}
}

// SendPayload creates an http request from a log payload and sends it to new relic
func (c *nroClient) SendPayload(ctx context.Context, payload LogPayload) error {
req, err := c.createRequest(ctx, payload)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

res, err := c.httpClient.Do(req)
if err != nil {
return err
}

return c.checkResponse(res)
}

// TestConnection tests the connection to the new relic api
func (c *nroClient) TestConnection(ctx context.Context) error {
logs := make([]*LogMessage, 0, 0)
payload := LogPayload{{
Common: LogPayloadCommon{
Attributes: map[string]interface{}{
"plugin": map[string]interface{}{
"type": "stanza",
"version": version.GetVersion(),
},
},
},
Logs: logs,
}}

err := c.SendPayload(ctx, payload)
if err != nil {
return fmt.Errorf("failed to send empty payload: %w", err)
}

return nil
}

// createRequest creates a new http.Request with the given context and log payload
func (c *nroClient) createRequest(ctx context.Context, payload LogPayload) (*http.Request, error) {
var buf bytes.Buffer
wr := gzip.NewWriter(&buf)
enc := json.NewEncoder(wr)
if err := enc.Encode(payload); err != nil {
return nil, otelerrors.Wrap(err, "encode payload")
}
if err := wr.Close(); err != nil {
return nil, err
}

req, err := http.NewRequestWithContext(ctx, "POST", c.endpoint.String(), &buf)
if err != nil {
return nil, err
}
req.Header = c.headers

return req, nil
}

// checkResponse checks a response from the new relic api
func (c *nroClient) checkResponse(res *http.Response) error {
defer func() {
_ = res.Body.Close()
}()

if !(res.StatusCode >= 200 && res.StatusCode < 300) {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return otelerrors.NewError("unexpected status code", "", "status", res.Status)
}
return otelerrors.NewError("unexpected status code", "", "status", res.Status, "body", string(body))
}
return nil
}
106 changes: 30 additions & 76 deletions operator/builtin/output/newrelic/newrelic.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package newrelic

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"sync"
Expand Down Expand Up @@ -80,9 +76,7 @@ func (c NewRelicOutputConfig) Build(bc operator.BuildContext) ([]operator.Operat
OutputOperator: outputOperator,
buffer: buffer,
flusher: flusher,
client: &http.Client{},
headers: headers,
url: url,
client: newClient(url, headers),
timeout: c.Timeout.Raw(),
messageField: c.MessageField,
ctx: ctx,
Expand Down Expand Up @@ -115,9 +109,7 @@ type NewRelicOutput struct {
buffer buffer.Buffer
flusher *flusher.Flusher

client *http.Client
url *url.URL
headers http.Header
client client
timeout time.Duration
messageField entry.Field

Expand Down Expand Up @@ -146,8 +138,22 @@ func (nro *NewRelicOutput) Stop() error {
nro.cancel()
nro.wg.Wait()
nro.flusher.Stop()
// TODO deal with buffer Drain
_, err := nro.buffer.Close()

entries, err := nro.buffer.Close()
if err != nil {
return fmt.Errorf("failed to close buffer: %w", err)
}

if len(entries) != 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

err = nro.sendEntries(ctx, entries)
if err != nil {
return err
}
}

return err
}

Expand All @@ -156,21 +162,21 @@ func (nro *NewRelicOutput) Process(ctx context.Context, entry *entry.Entry) erro
return nro.buffer.Add(ctx, entry)
}

func (nro *NewRelicOutput) testConnection() error {
ctx, cancel := context.WithTimeout(context.Background(), nro.timeout)
defer cancel()

req, err := nro.newRequest(ctx, nil)
func (nro *NewRelicOutput) sendEntries(ctx context.Context, entries []*entry.Entry) error {
payload := LogPayloadFromEntries(entries, nro.messageField)
err := nro.client.SendPayload(ctx, payload)
if err != nil {
return err
return fmt.Errorf("Failed to send entries: %s", err)
}

res, err := nro.client.Do(req)
if err != nil {
return err
}
return nil
}

func (nro *NewRelicOutput) testConnection() error {
ctx, cancel := context.WithTimeout(context.Background(), nro.timeout)
defer cancel()

return nro.handleResponse(res)
return nro.client.TestConnection(ctx)
}

func (nro *NewRelicOutput) feedFlusher(ctx context.Context) {
Expand All @@ -185,59 +191,7 @@ func (nro *NewRelicOutput) feedFlusher(ctx context.Context) {
}

nro.flusher.Do(ctx, func(flushCtx context.Context) error {
req, err := nro.newRequest(flushCtx, entries)
if err != nil {
// drop these logs because we couldn't creat a request and a retry won't help
nro.Errorw("Failed to create request from payload", zap.Error(err))
return nil
}

res, err := nro.client.Do(req)
if err != nil {
return err
}

return nro.handleResponse(res)
return nro.sendEntries(flushCtx, entries)
})
}
}

// newRequest creates a new http.Request with the given context and entries
func (nro *NewRelicOutput) newRequest(ctx context.Context, entries []*entry.Entry) (*http.Request, error) {
payload := LogPayloadFromEntries(entries, nro.messageField)

var buf bytes.Buffer
wr := gzip.NewWriter(&buf)
enc := json.NewEncoder(wr)
if err := enc.Encode(payload); err != nil {
return nil, otelerrors.Wrap(err, "encode payload")
}
if err := wr.Close(); err != nil {
return nil, err
}

req, err := http.NewRequestWithContext(ctx, "POST", nro.url.String(), &buf)
if err != nil {
return nil, err
}
req.Header = nro.headers

return req, nil
}

func (nro *NewRelicOutput) handleResponse(res *http.Response) error {
defer func() {
if err := res.Body.Close(); err != nil {
nro.Errorf(err.Error())
}
}()

if !(res.StatusCode >= 200 && res.StatusCode < 300) {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return otelerrors.NewError("unexpected status code", "", "status", res.Status)
}
return otelerrors.NewError("unexpected status code", "", "status", res.Status, "body", string(body))
}
return nil
}
Loading

0 comments on commit 9129334

Please sign in to comment.