Skip to content

Commit

Permalink
Update for libbeat "eslegclient" changes
Browse files Browse the repository at this point in the history
Closes #3433
  • Loading branch information
axw committed Mar 7, 2020
1 parent 3f6dcd1 commit 1b4acd0
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
9 changes: 5 additions & 4 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"

Expand Down Expand Up @@ -207,15 +208,15 @@ func (bt *beater) registerPipelineCallback(b *beat.Beat) error {

// ensure setup cmd is working properly
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
conn, err := eslegclient.NewConnectedClient(esConfig)
if err != nil {
return err
}
return pipeline.RegisterPipelines(esClient, overwrite, path)
return pipeline.RegisterPipelines(conn, overwrite, path)
}
// ensure pipelines are registered when new ES connection is established.
_, err := elasticsearch.RegisterConnectCallback(func(esClient *elasticsearch.Client) error {
return pipeline.RegisterPipelines(esClient, overwrite, path)
_, err := elasticsearch.RegisterConnectCallback(func(conn *eslegclient.Connection) error {
return pipeline.RegisterPipelines(conn, overwrite, path)
})
return err
}
Expand Down
8 changes: 4 additions & 4 deletions ingest/pipeline/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (

logs "github.com/elastic/apm-server/log"

"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
es "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/paths"
)

func RegisterPipelines(esClient *es.Client, overwrite bool, path string) error {
func RegisterPipelines(conn *eslegclient.Connection, overwrite bool, path string) error {
logger := logp.NewLogger(logs.Pipelines)
pipelines, err := loadPipelinesFromJSON(path)
if err != nil {
Expand All @@ -37,13 +37,13 @@ func RegisterPipelines(esClient *es.Client, overwrite bool, path string) error {
var exists bool
for _, p := range pipelines {
if !overwrite {
exists, err = esClient.Connection.PipelineExists(p.Id)
exists, err = conn.PipelineExists(p.Id)
if err != nil {
return err
}
}
if overwrite || !exists {
_, _, err := esClient.Connection.CreatePipeline(p.Id, nil, p.Body)
_, _, err := conn.CreatePipeline(p.Id, nil, p.Body)
if err != nil {
logger.Errorf("Pipeline registration failed for %s.", p.Id)
return err
Expand Down
6 changes: 3 additions & 3 deletions ingest/pipeline/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (

"github.com/elastic/apm-server/tests/loader"
"github.com/elastic/beats/v7/libbeat/common"
es "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
)

func TestRegisterPipelines(t *testing.T) {
esClients, err := es.NewElasticsearchClients(getFakeESConfig(9200))
esClients, err := eslegclient.NewClients(getFakeESConfig(9200))
require.NoError(t, err)
esClient := &esClients[0]
path, err := loader.FindFile("..", "ingest", "pipeline", "definition.json")
Expand All @@ -49,7 +49,7 @@ func TestRegisterPipelines(t *testing.T) {
assert.NoError(t, err)

// invalid esClient
invalidClients, err := es.NewElasticsearchClients(getFakeESConfig(1234))
invalidClients, err := eslegclient.NewClients(getFakeESConfig(1234))
require.NoError(t, err)
err = RegisterPipelines(&invalidClients[0], true, path)
assert.Error(t, err)
Expand Down

0 comments on commit 1b4acd0

Please sign in to comment.