Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for integration using Elasticsearch queries #5934

Merged
merged 2 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 15 additions & 64 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
package beater

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -377,15 +374,25 @@ func (s *serverRunner) run() error {

fleetManaged := s.beat.Manager != nil && s.beat.Manager.Enabled()
if !fleetManaged && s.config.DataStreams.Enabled && s.config.DataStreams.WaitForIntegration {
// TODO(axw) we should also try querying Elasticsearch in parallel
// (e.g. check for an index template created), for the case where
// there is no Kibana configuration.
if !s.config.Kibana.Enabled {
return errors.New("cannot wait for integration without Kibana config")
var esClient elasticsearch.Client
if cfg := elasticsearchOutputConfig(s.beat); cfg != nil {
esConfig := elasticsearch.DefaultConfig()
err := cfg.Unpack(&esConfig)
if err != nil {
return err
}
esClient, err = elasticsearch.NewClient(esConfig)
if err != nil {
return err
}
}
if kibanaClient == nil && esClient == nil {
return errors.New("cannot wait for integration without either Kibana or Elasticsearch config")
}
if err := waitForIntegration(
s.runServerContext,
kibanaClient,
esClient,
s.config.DataStreams.WaitForIntegrationInterval,
s.tracer,
s.logger,
Expand Down Expand Up @@ -700,59 +707,3 @@ func (p *reporterBatchProcessor) ProcessBatch(ctx context.Context, batch *model.
disableTracing, _ := ctx.Value(disablePublisherTracingKey{}).(bool)
return p.reporter(ctx, publish.PendingReq{Transformable: batch, Trace: !disableTracing})
}

// waitForIntegration waits for the APM integration to be installed by querying Kibana,
// or for the context to be cancelled.
func waitForIntegration(
ctx context.Context,
kibanaClient kibana_client.Client,
interval time.Duration,
tracer *apm.Tracer,
logger *logp.Logger,
) error {
logger.Info("waiting for integration package to be installed")
tx := tracer.StartTransaction("wait_for_integration", "init")
ctx = apm.ContextWithTransaction(ctx, tx)
var ticker *time.Ticker
for {
if ticker == nil {
ticker = time.NewTicker(interval)
defer ticker.Stop()
} else {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
if checkIntegrationInstalled(ctx, kibanaClient, logger) {
return nil
}
}
}

func checkIntegrationInstalled(ctx context.Context, kibanaClient kibana_client.Client, logger *logp.Logger) bool {
resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil)
if err != nil {
logger.Errorf("error querying integration package status: %s", err)
return false
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
logger.Errorf("unexpected status querying integration package status: %s (%s)", resp.Status, bytes.TrimSpace(body))
return false
}
var result struct {
Response struct {
Status string `json:"status"`
} `json:"response"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
logger.Errorf("error decoding integration package response: %s", err)
return false
}
logger.Infof("integration package status: %s", result.Response.Status)
return result.Response.Status == "installed"
}
9 changes: 5 additions & 4 deletions beater/config/data_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ type DataStreamsConfig struct {
// WaitForIntegration controls whether APM Server waits for the Fleet
// integration package to be installed before indexing events.
//
// This requires a connection to Kibana, and is ignored when running
// under Elastic Agent; it is intended for running APM Server standalone,
// relying on Fleet to install the integration for creating Elasticsearch
// index templates, ILM policies, and ingest pipelines.
// This config is ignored when running under Elastic Agent; it is intended
// for running APM Server standalone, relying on Fleet to install the integration
// for creating Elasticsearch index templates, ILM policies, and ingest pipelines.
//
// This configuration requires either a connection to Kibana or Elasticsearch.
WaitForIntegration bool `config:"wait_for_integration"`

// WaitForIntegrationInterval holds the interval for checks when waiting
Expand Down
46 changes: 36 additions & 10 deletions beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"net/http/httptest"
"net/url"
"os"
"path"
"reflect"
"runtime"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -523,11 +525,7 @@ func TestServerConfigReload(t *testing.T) {
assert.Error(t, err)
}

func TestServerWaitForIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping slow test")
}

func TestServerWaitForIntegrationKibana(t *testing.T) {
var requests int
mux := http.NewServeMux()
mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -553,15 +551,43 @@ func TestServerWaitForIntegration(t *testing.T) {
"kibana.enabled": true,
"kibana.host": srv.URL,
})
_, err := setupServer(t, cfg, nil, nil)
require.NoError(t, err)
assert.Equal(t, 3, requests)
}

func TestServerWaitForIntegrationElasticsearch(t *testing.T) {
var mu sync.Mutex
templateRequests := make(map[string]int)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
})
mux.HandleFunc("/_index_template/", func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
template := path.Base(r.URL.Path)
templateRequests[template]++
if template == "traces-apm" && templateRequests[template] == 1 {
w.WriteHeader(404)
}
})
srv := httptest.NewServer(mux)
defer srv.Close()

beat, cfg := newBeat(t, cfg, nil, nil)
tb, err := newTestBeater(t, beat, cfg, nil)
cfg := common.MustNewConfigFrom(map[string]interface{}{
"data_streams.enabled": true,
"data_streams.wait_for_integration_interval": "100ms",
})
var beatConfig beat.BeatConfig
err := beatConfig.Output.Unpack(common.MustNewConfigFrom(map[string]interface{}{
"elasticsearch.hosts": []string{srv.URL},
}))
require.NoError(t, err)
tb.start()

_, err = tb.waitListenAddr(30 * time.Second)
_, err = setupServer(t, cfg, &beatConfig, nil)
require.NoError(t, err)
assert.Equal(t, 3, requests)
assert.Equal(t, 2, templateRequests["traces-apm"])
}

type chanClient struct {
Expand Down
150 changes: 150 additions & 0 deletions beater/waitintegration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/pkg/errors"
"go.elastic.co/apm"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/kibana"
"github.com/elastic/go-elasticsearch/v7/esapi"
)

// waitForIntegration waits for the APM integration to be installed by querying Kibana,
// or for the context to be cancelled.
func waitForIntegration(
ctx context.Context,
kibanaClient kibana.Client,
esClient elasticsearch.Client,
interval time.Duration,
tracer *apm.Tracer,
logger *logp.Logger,
) error {
logger.Info("waiting for integration package to be installed")
tx := tracer.StartTransaction("wait_for_integration", "init")
ctx = apm.ContextWithTransaction(ctx, tx)
var ticker *time.Ticker
for {
if ticker == nil {
// We start the ticker on the first iteration, rather than
// before the loop, so we don't have to wait for a tick
// (5 seconds by default) before peforming the first check.
ticker = time.NewTicker(interval)
defer ticker.Stop()
} else {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
axw marked this conversation as resolved.
Show resolved Hide resolved
}
if kibanaClient != nil {
installed, err := checkIntegrationInstalledKibana(ctx, kibanaClient, logger)
if err != nil {
logger.Errorf("error querying Kibana for integration package status: %s", err)
} else {
if installed {
return nil
}
// We were able to query Kibana, but the package is not yet installed.
// We should continue querying the package status via Kibana, as it is
// more authoritative than checking for index template installation.
continue
}
}
if esClient != nil {
installed, err := checkIntegrationInstalledElasticsearch(ctx, esClient, logger)
if err != nil {
logger.Errorf("error querying Elasticsearch for integration index templates: %s", err)
} else if installed {
return nil
}
}
}
}

// checkIntegrationInstalledKibana checks if the APM integration package
// is installed by querying Kibana.
func checkIntegrationInstalledKibana(ctx context.Context, kibanaClient kibana.Client, logger *logp.Logger) (bool, error) {
resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil)
if err != nil {
return false, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return false, fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status, bytes.TrimSpace(body))
}
var result struct {
Response struct {
Status string `json:"status"`
} `json:"response"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return false, errors.Wrap(err, "error decoding integration package response")
}
logger.Infof("integration package status: %s", result.Response.Status)
return result.Response.Status == "installed", nil
}

func checkIntegrationInstalledElasticsearch(ctx context.Context, esClient elasticsearch.Client, logger *logp.Logger) (bool, error) {
// TODO(axw) generate the list of expected index templates.
templates := []string{
"traces-apm",
"traces-sampled",
"metrics-apm.app",
"metrics-apm.internal",
"logs-apm.error",
}
// IndicesGetIndexTemplateRequest accepts a slice of template names,
// but the REST API expects just one index template name. Query them
// in parallel.
axw marked this conversation as resolved.
Show resolved Hide resolved
g, ctx := errgroup.WithContext(ctx)
for _, template := range templates {
template := template // copy for closure
g.Go(func() error {
req := esapi.IndicesGetIndexTemplateRequest{Name: []string{template}}
resp, err := req.Do(ctx, esClient)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.IsError() {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("unexpected HTTP status: %s (%s)", resp.Status(), bytes.TrimSpace(body))
}
return nil
})
}
err := g.Wait()
return err == nil, err
}
3 changes: 2 additions & 1 deletion docs/configuration-process.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -201,5 +201,6 @@ Defaults to false.
[[data_streams.wait_for_integration]]
[float]
==== `wait_for_integration`
Wait for the `apm` Fleet integration to be installed by Kibana. Requires <<kibana-enabled>>.
Wait for the `apm` Fleet integration to be installed by Kibana. Requires either <<kibana-enabled>>
or for the <<elasticsearch-output, Elasticsearch output>> to be configured.
Defaults to true.