Skip to content

Commit

Permalink
Wait for integration using Elasticsearch queries (#5934) (#5937)
Browse files Browse the repository at this point in the history
* Wait for integration using Elasticsearch queries

Extend `data_streams.wait_for_integration` to use either
Kibana or Elasticsearch queries, depending on what is
available. If Kibana config is defined, and Kibana can
be successfully queried, then it will be used. Otherwise
if the Elasticsearch output is configured, Elasticsearch
queries will be made to check the presence of index templates
which imply the integration has been installed.

* Add comment, remove comment

(cherry picked from commit 7e7da2d)

Co-authored-by: Andrew Wilkins <[email protected]>
  • Loading branch information
mergify[bot] and axw authored Aug 13, 2021
1 parent a10e68f commit 26a6bea
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 79 deletions.
79 changes: 15 additions & 64 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
package beater

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -377,15 +374,25 @@ func (s *serverRunner) run() error {

fleetManaged := s.beat.Manager != nil && s.beat.Manager.Enabled()
if !fleetManaged && s.config.DataStreams.Enabled && s.config.DataStreams.WaitForIntegration {
// TODO(axw) we should also try querying Elasticsearch in parallel
// (e.g. check for an index template created), for the case where
// there is no Kibana configuration.
if !s.config.Kibana.Enabled {
return errors.New("cannot wait for integration without Kibana config")
var esClient elasticsearch.Client
if cfg := elasticsearchOutputConfig(s.beat); cfg != nil {
esConfig := elasticsearch.DefaultConfig()
err := cfg.Unpack(&esConfig)
if err != nil {
return err
}
esClient, err = elasticsearch.NewClient(esConfig)
if err != nil {
return err
}
}
if kibanaClient == nil && esClient == nil {
return errors.New("cannot wait for integration without either Kibana or Elasticsearch config")
}
if err := waitForIntegration(
s.runServerContext,
kibanaClient,
esClient,
s.config.DataStreams.WaitForIntegrationInterval,
s.tracer,
s.logger,
Expand Down Expand Up @@ -699,59 +706,3 @@ func (p *reporterBatchProcessor) ProcessBatch(ctx context.Context, batch *model.
disableTracing, _ := ctx.Value(disablePublisherTracingKey{}).(bool)
return p.reporter(ctx, publish.PendingReq{Transformable: batch, Trace: !disableTracing})
}

// waitForIntegration waits for the APM integration to be installed by querying Kibana,
// or for the context to be cancelled.
func waitForIntegration(
ctx context.Context,
kibanaClient kibana_client.Client,
interval time.Duration,
tracer *apm.Tracer,
logger *logp.Logger,
) error {
logger.Info("waiting for integration package to be installed")
tx := tracer.StartTransaction("wait_for_integration", "init")
ctx = apm.ContextWithTransaction(ctx, tx)
var ticker *time.Ticker
for {
if ticker == nil {
ticker = time.NewTicker(interval)
defer ticker.Stop()
} else {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
if checkIntegrationInstalled(ctx, kibanaClient, logger) {
return nil
}
}
}

func checkIntegrationInstalled(ctx context.Context, kibanaClient kibana_client.Client, logger *logp.Logger) bool {
resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil)
if err != nil {
logger.Errorf("error querying integration package status: %s", err)
return false
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
logger.Errorf("unexpected status querying integration package status: %s (%s)", resp.Status, bytes.TrimSpace(body))
return false
}
var result struct {
Response struct {
Status string `json:"status"`
} `json:"response"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
logger.Errorf("error decoding integration package response: %s", err)
return false
}
logger.Infof("integration package status: %s", result.Response.Status)
return result.Response.Status == "installed"
}
9 changes: 5 additions & 4 deletions beater/config/data_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ type DataStreamsConfig struct {
// WaitForIntegration controls whether APM Server waits for the Fleet
// integration package to be installed before indexing events.
//
// This requires a connection to Kibana, and is ignored when running
// under Elastic Agent; it is intended for running APM Server standalone,
// relying on Fleet to install the integration for creating Elasticsearch
// index templates, ILM policies, and ingest pipelines.
// This config is ignored when running under Elastic Agent; it is intended
// for running APM Server standalone, relying on Fleet to install the integration
// for creating Elasticsearch index templates, ILM policies, and ingest pipelines.
//
// This configuration requires either a connection to Kibana or Elasticsearch.
WaitForIntegration bool `config:"wait_for_integration"`

// WaitForIntegrationInterval holds the interval for checks when waiting
Expand Down
46 changes: 36 additions & 10 deletions beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"net/http/httptest"
"net/url"
"os"
"path"
"reflect"
"runtime"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -523,11 +525,7 @@ func TestServerConfigReload(t *testing.T) {
assert.Error(t, err)
}

func TestServerWaitForIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping slow test")
}

func TestServerWaitForIntegrationKibana(t *testing.T) {
var requests int
mux := http.NewServeMux()
mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -553,15 +551,43 @@ func TestServerWaitForIntegration(t *testing.T) {
"kibana.enabled": true,
"kibana.host": srv.URL,
})
_, err := setupServer(t, cfg, nil, nil)
require.NoError(t, err)
assert.Equal(t, 3, requests)
}

func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
var mu sync.Mutex
templateRequests := make(map[string]int)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
})
mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
template := path.Base(r.URL.Path)
templateRequests[template]++
if template == "traces-apm" && templateRequests[template] == 1 {
w.WriteHeader(404)
}
})
srv := httptest.NewServer(mux)
defer srv.Close()

beat, cfg := newBeat(t, cfg, nil, nil)
tb, err := newTestBeater(t, beat, cfg, nil)
cfg := common.MustNewConfigFrom(map[string]interface{}{
"data_streams.enabled": true,
"data_streams.wait_for_integration_interval": "100ms",
})
var beatConfig beat.BeatConfig
err := beatConfig.Output.Unpack(common.MustNewConfigFrom(map[string]interface{}{
"elasticsearch.hosts": []string{srv.URL},
}))
require.NoError(t, err)
tb.start()

_, err = tb.waitListenAddr(30 * time.Second)
_, err = setupServer(t, cfg, &beatConfig, nil)
require.NoError(t, err)
assert.Equal(t, 3, requests)
assert.Equal(t, 2, templateRequests["traces-apm"])
}

type chanClient struct {
Expand Down
150 changes: 150 additions & 0 deletions beater/waitintegration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/pkg/errors"
"go.elastic.co/apm"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/kibana"
"github.com/elastic/go-elasticsearch/v7/esapi"
)

// waitForIntegration waits for the APM integration to be installed by querying Kibana,
// or for the context to be cancelled.
func waitForIntegration(
ctx context.Context,
kibanaClient kibana.Client,
esClient elasticsearch.Client,
interval time.Duration,
tracer *apm.Tracer,
logger *logp.Logger,
) error {
logger.Info("waiting for integration package to be installed")
tx := tracer.StartTransaction("wait_for_integration", "init")
ctx = apm.ContextWithTransaction(ctx, tx)
var ticker *time.Ticker
for {
if ticker == nil {
// We start the ticker on the first iteration, rather than
// before the loop, so we don't have to wait for a tick
// (5 seconds by default) before peforming the first check.
ticker = time.NewTicker(interval)
defer ticker.Stop()
} else {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
if kibanaClient != nil {
installed, err := checkIntegrationInstalledKibana(ctx, kibanaClient, logger)
if err != nil {
logger.Errorf("error querying Kibana for integration package status: %s", err)
} else {
if installed {
return nil
}
// We were able to query Kibana, but the package is not yet installed.
// We should continue querying the package status via Kibana, as it is
// more authoritative than checking for index template installation.
continue
}
}
if esClient != nil {
installed, err := checkIntegrationInstalledElasticsearch(ctx, esClient, logger)
if err != nil {
logger.Errorf("error querying Elasticsearch for integration index templates: %s", err)
} else if installed {
return nil
}
}
}
}

// checkIntegrationInstalledKibana checks if the APM integration package
// is installed by querying Kibana.
func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient kibana.Client, logger *logp.Logger) (bool, error) {
resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil)
if err != nil {
return false, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return false, fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status, bytes.TrimSpace(body))
}
var result struct {
Response struct {
Status string `json:"status"`
} `json:"response"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return false, errors.Wrap(err, "error decoding integration package response")
}
logger.Infof("integration package status: %s", result.Response.Status)
return result.Response.Status == "installed", nil
}

func checkIntegrationInstalledElasticsearch(ctx context.Context, esClient elasticsearch.Client, logger *logp.Logger) (bool, error) {
// TODO(axw) generate the list of expected index templates.
templates := []string{
"traces-apm",
"traces-sampled",
"metrics-apm.app",
"metrics-apm.internal",
"logs-apm.error",
}
// IndicesGetIndexTemplateRequest accepts a slice of template names,
// but the REST API expects just one index template name. Query them
// in parallel.
g, ctx := errgroup.WithContext(ctx)
for _, template := range templates {
template := template // copy for closure
g.Go(func() error {
req := esapi.IndicesGetIndexTemplateRequest{Name: []string{template}}
resp, err := req.Do(ctx, esClient)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.IsError() {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status(), bytes.TrimSpace(body))
}
return nil
})
}
err := g.Wait()
return err == nil, err
}
3 changes: 2 additions & 1 deletion docs/configuration-process.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -201,5 +201,6 @@ Defaults to false.
[[data_streams.wait_for_integration]]
[float]
==== `wait_for_integration`
Wait for the `apm` Fleet integration to be installed by Kibana. Requires <<kibana-enabled>>.
Wait for the `apm` Fleet integration to be installed by Kibana. Requires either <<kibana-enabled>>
or for the <<elasticsearch-output, Elasticsearch output>> to be configured.
Defaults to true.

0 comments on commit 26a6bea

Please sign in to comment.