Skip to content

Commit

Permalink
sampling: fix pubsub implementation (#5126)
Browse files Browse the repository at this point in the history
* sampling: fix pubsub implementation

The initial implementation was written as a ~quick hack, with the
expectation that it would be replaced by the Changes API. It was
broken due to its ignorance of data streams, and multi-shard indices.
Sequence numbers are only comparable within a single shard.

Given that there is no known delivery date for the Changes API,
we propose to instead revise the pubsub implementation to address
the problems by:

 - enforcing single-shard indices for sampled trace data streams
 - searching (now single-shard) backing indices individually

In addition, we now use global checkpoints to bound searches, and
use PIT (point in time) for paging through results. Querying underlying
indices and global checkpoints requires an additional "monitor" index
privilege.

* sampling/pubsub: remove PIT again

Simplify by just using direct searches with a rnage on _seq_no,
using the most recently observed _seq_no value as the lower bound.
We can do this within the loop as well (i.e. until there are no
more results, or we've observed the global checkpoint.)

* sampling/pubsub: only query get metric from _stats

* pubsub: force-refresh indices

Refresh indices after observing an updated global checkpoint
to ensure document visibility is correct up to the observed
global checkpoint.

* Update changelog

* systemtest: fix spurious test failure
  • Loading branch information
axw authored May 26, 2021
1 parent 73e63c0 commit 94e3201
Show file tree
Hide file tree
Showing 9 changed files with 619 additions and 297 deletions.
7 changes: 7 additions & 0 deletions apmpackage/apm/0.2.0/data_stream/sampled_traces/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,10 @@ title: APM tail-sampled traces
type: traces
dataset: sampled
ilm_policy: traces-apm.sampled-default_policy
elasticsearch:
index_template:
settings:
# Create a single shard per index, so we can use
# global checkpoints as a way of limiting search
# results.
number_of_shards: 1
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
==== Bug fixes
* Don't auto-disable ILM due to a failure to communicate with Elasticsearch {pull}5264[5264]
* Fix panic due to misaligned 64-bit access on 32-bit architectures {pull}5277[5277]
* Fixed tail-based sampling pubsub to use _seq_no correctly {pull}5126[5126]
* Fix document grouping of translated OpenTelemetry Java metrics {pull}5309[5309]
* OpenTelemetry: record array attributes as labels {pull}5286[5286]
* model/modeldecoder: fix 32-bit timestamp decoding {pull}5308[5308]
Expand Down
4 changes: 4 additions & 0 deletions systemtest/sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package systemtest_test

import (
"context"
"errors"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -226,6 +227,9 @@ func refreshPeriodically(t *testing.T, interval time.Duration, index ...string)
case <-ticker.C:
}
if _, err := systemtest.Elasticsearch.Do(ctx, &request, nil); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return err
}
}
Expand Down
86 changes: 86 additions & 0 deletions x-pack/apm-server/sampling/pubsub/checkpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package pubsub

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/pkg/errors"

"github.com/elastic/go-elasticsearch/v7/esapi"

"github.com/elastic/apm-server/elasticsearch"
)

// getGlobalCheckpoints returns the current global checkpoint for each index
// underlying dataStream. Each index is required to have a single (primary) shard.
func getGlobalCheckpoints(
ctx context.Context,
client elasticsearch.Client,
dataStream string,
) (map[string]int64, error) {
indexGlobalCheckpoints := make(map[string]int64)
resp, err := esapi.IndicesStatsRequest{
Index: []string{dataStream},
Level: "shards",
// By default all metrics are returned; query just the "get" metric,
// which is very cheap.
Metric: []string{"get"},
}.Do(ctx, client)
if err != nil {
return nil, errors.New("index stats request failed")
}
defer resp.Body.Close()
if resp.IsError() {
switch resp.StatusCode {
case http.StatusNotFound:
// Data stream does not yet exist.
return indexGlobalCheckpoints, nil
}
message, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("index stats request failed: %s", message)
}

var stats dataStreamStats
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
return nil, err
}

for index, indexStats := range stats.Indices {
if n := len(indexStats.Shards); n > 1 {
return nil, fmt.Errorf("expected 1 shard, got %d for index %q", n, index)
}
for _, shardStats := range indexStats.Shards {
for _, shardStats := range shardStats {
if shardStats.Routing.Primary {
indexGlobalCheckpoints[index] = shardStats.SeqNo.GlobalCheckpoint
break
}
}
}
}
return indexGlobalCheckpoints, nil
}

type dataStreamStats struct {
Indices map[string]indexStats `json:"indices"`
}

type indexStats struct {
Shards map[string][]shardStats `json:"shards"`
}

type shardStats struct {
Routing struct {
Primary bool `json:"primary"`
} `json:"routing"`
SeqNo struct {
GlobalCheckpoint int64 `json:"global_checkpoint"`
} `json:"seq_no"`
}
3 changes: 2 additions & 1 deletion x-pack/apm-server/sampling/pubsub/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ const dataStreamIndexTemplate = `{
"data_stream": {},
"template": {
"settings": {
"index.lifecycle.name": %q
"index.lifecycle.name": %q,
"index.number_of_shards": 1
},
"mappings": {
"properties": {
Expand Down
Loading

0 comments on commit 94e3201

Please sign in to comment.