Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support index cleaner for rollover indices and add integration tests #1689

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ storage-integration-test: go-gen
go clean -testcache
bash -c "set -e; set -o pipefail; $(GOTEST) $(STORAGE_PKGS) | $(COLORIZE)"

.PHONY: index-cleaner-integration-test
index-cleaner-integration-test: docker-images-elastic
# Expire tests results for storage integration tests since the environment might change
# even though the code remains the same.
go clean -testcache
bash -c "set -e; set -o pipefail; $(GOTEST) -tags index_cleaner $(STORAGE_PKGS) | $(COLORIZE)"

all-pkgs:
@echo $(ALL_PKGS) | tr ' ' '\n' | sort

Expand Down
28 changes: 22 additions & 6 deletions plugin/storage/es/esCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def main():
print('HOSTNAME ... specifies which Elasticsearch hosts URL to search and delete indices from.')
print('TIMEOUT ... number of seconds to wait for master node response.'.format(TIMEOUT))
print('INDEX_PREFIX ... specifies index prefix.')
print('ARCHIVE ... specifies whether to remove archive indices (default false).')
print('ARCHIVE ... specifies whether to remove archive indices (only works for rollover) (default false).')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a change in behaviour, or did it never work for archive (with daily indices)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just a comment change.

It worked and works for archive - but it is only supported for archive managed by rollover. If there is one archive we do not touch it.

print('ROLLOVER ... specifies whether to remove indices created by rollover (default false).')
print('ES_USERNAME ... The username required by Elasticsearch.')
print('ES_PASSWORD ... The password required by Elasticsearch.')
print('ES_TLS ... enable TLS (default false).')
Expand Down Expand Up @@ -44,9 +45,12 @@ def main():
prefix += '-'

if str2bool(os.getenv("ARCHIVE", 'false')):
filter_archive_indices(ilo, prefix)
filter_archive_indices_rollover(ilo, prefix)
else:
filter_main_indices(ilo, prefix)
if str2bool(os.getenv("ROLLOVER", 'false')):
filter_main_indices_rollover(ilo, prefix)
else:
filter_main_indices(ilo, prefix)

empty_list(ilo, 'No indices to delete')

Expand All @@ -58,17 +62,29 @@ def main():


def filter_main_indices(ilo, prefix):
ilo.filter_by_regex(kind='prefix', value=prefix + "jaeger")
ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service|dependencies)-\d{4}-\d{2}-\d{2}")
empty_list(ilo, "No indices to delete")
# This excludes archive index as we use source='name'
# source `creation_date` would include archive index
ilo.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=int(sys.argv[1]))


def filter_archive_indices(ilo, prefix):
# Remove only archive indices when aliases are used
def filter_main_indices_rollover(ilo, prefix):
ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service)-\d{6}")
empty_list(ilo, "No indices to delete")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these required after each filter, or just at the end? Does it cause problems calling the filter_by_alias on an empty list?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's required otherwise it fails on empty cluster

# do not remove active write indices
ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-write'], exclude=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If filtering by the regex now, are these two aliases filters required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment above. We need to exclude current active write indices.

empty_list(ilo, "No indices to delete")
ilo.filter_by_alias(aliases=[prefix + 'jaeger-service-write'], exclude=True)
empty_list(ilo, "No indices to delete")
ilo.filter_by_age(source='creation_date', direction='older', unit='days', unit_count=int(sys.argv[1]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering about a particular scenario - if rollover criteria is based on index size (for example) in low activity environment, it might be possible that the same index is in use for more days that the unit_count, in which case it would remove the index - even though it is in current use?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right removing by size does not really make sense.



def filter_archive_indices_rollover(ilo, prefix):
# Remove only rollover archive indices
# Do not remove active write archive index
ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-write'], exclude=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to filter by regex here aswell to be consistent with the change in filter_main_indices_rollover?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunatelly this is not filter_by_regex method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what happened - the comment should have been associated with two lines down - so changing the ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-read']) to be ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service)-\d{6}") ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I do not understand what you mean.

  1. we use regex to find indices which should be removed
  2. but we also need to exclude indices which are associated with write alias - to do that we use filter for an alias.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just mean that filter_main_indices and filter_main_indices_rollover are both using filter_by_regex to identify the indices to be removed - whereas filter_archive_indices_rollover uses ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-read']) - just suggesting that it would be better if filter_archive_indices_rollover also uses filter_by_regex for consistency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds better #1693

empty_list(ilo, "No indices to delete")
ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-read'])
ilo.filter_by_age(source='creation_date', direction='older', unit='days', unit_count=int(sys.argv[1]))

Expand Down
3 changes: 0 additions & 3 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ const (
queryPort = "9200"
queryHostPort = host + ":" + queryPort
queryURL = "http://" + queryHostPort
username = "elastic" // the elasticsearch default username
password = "changeme" // the elasticsearch default password
indexPrefix = "integration-test"
tagKeyDeDotChar = "@"
maxSpanAge = time.Hour * 72
Expand All @@ -59,7 +57,6 @@ type ESStorageIntegration struct {
func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error {
rawClient, err := elastic.NewClient(
elastic.SetURL(queryURL),
elastic.SetBasicAuth(username, password),
elastic.SetSniff(false))
if err != nil {
return err
Expand Down
218 changes: 218 additions & 0 deletions plugin/storage/integration/es_index_cleaner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build index_cleaner

package integration

import (
"context"
"fmt"
"os/exec"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/olivere/elastic.v5"
)

const (
archiveIndexName = "jaeger-span-archive"
dependenciesIndexName = "jaeger-dependencies-2019-01-01"
spanIndexName = "jaeger-span-2019-01-01"
serviceIndexName = "jaeger-service-2019-01-01"
indexCleanerImage = "jaegertracing/jaeger-es-index-cleaner:latest"
rolloverImage = "jaegertracing/jaeger-es-rollover:latest"
rolloverNowEnvVar = "CONDITIONS='{\"max_age\":\"0s\"}'"
)

func TestIndexCleaner_doNotFailOnEmptyStorage(t *testing.T) {
client, err := createESClient()
require.NoError(t, err)
_, err = client.DeleteIndex("*").Do(context.Background())
require.NoError(t, err)

tests := []struct{
envs []string
}{
{envs:[]string{"ROLLOVER=false"}},
{envs:[]string{"ROLLOVER=true"}},
{envs:[]string{"ARCHIVE=true"}},
}
for _, test := range tests {
err := runEsCleaner(7, test.envs)
require.NoError(t, err)
}
}

func TestIndexCleaner_doNotFailOnFullStorage(t *testing.T) {
client, err := createESClient()
require.NoError(t, err)
tests := []struct{
envs []string
}{
{envs:[]string{"ROLLOVER=false"}},
{envs:[]string{"ROLLOVER=true"}},
{envs:[]string{"ARCHIVE=true"}},
}
for _, test := range tests {
_, err = client.DeleteIndex("*").Do(context.Background())
require.NoError(t, err)
err := createAllIndices(client, "")
require.NoError(t, err)
err = runEsCleaner(1500, test.envs)
require.NoError(t, err)
}
}

func TestIndexCleaner(t *testing.T) {
client, err := createESClient()
require.NoError(t, err)

tests := []struct{
name string
envVars []string
expectedIndices []string
}{
{
name: "RemoveDailyIndices",
envVars: []string{},
expectedIndices: []string{
archiveIndexName,
"jaeger-span-000001", "jaeger-service-000001", "jaeger-span-000002", "jaeger-service-000002",
"jaeger-span-archive-000001", "jaeger-span-archive-000002",
},
},
{
name: "RemoveRolloverIndices",
envVars: []string{"ROLLOVER=true"},
expectedIndices: []string{
archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName,
"jaeger-span-000002", "jaeger-service-000002",
"jaeger-span-archive-000001", "jaeger-span-archive-000002",
},
},
{
name: "RemoveArchiveIndices",
envVars: []string{"ARCHIVE=true"},
expectedIndices: []string{
archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName,
"jaeger-span-000001", "jaeger-service-000001", "jaeger-span-000002", "jaeger-service-000002",
"jaeger-span-archive-000002",
},
},
}
for _, test := range tests {
t.Run(fmt.Sprintf("%s_no_prefix", test.name), func(t *testing.T) {
runIndexCleanerTest(t, client, "", test.expectedIndices, test.envVars)
})
t.Run(fmt.Sprintf("%s_prefix", test.name), func(t *testing.T) {
runIndexCleanerTest(t, client, indexPrefix, test.expectedIndices, append(test.envVars, "INDEX_PREFIX="+indexPrefix))
})
}
}

func runIndexCleanerTest(t *testing.T, client *elastic.Client, prefix string, expectedIndices, envVars []string) {
// make sure ES is clean
_, err := client.DeleteIndex("*").Do(context.Background())
require.NoError(t, err)

err = createAllIndices(client, prefix)
require.NoError(t, err)
err = runEsCleaner(0, envVars)
require.NoError(t, err)

indices, err := client.IndexNames()
require.NoError(t, err)
if prefix != "" {
prefix = prefix + "-"
}
var expected []string
for _, index := range expectedIndices {
expected = append(expected, prefix + index)
}
assert.ElementsMatch(t, indices, expected, fmt.Sprintf("indices found: %v, expected: %v", indices, expected))
}

func createAllIndices(client *elastic.Client, prefix string) error {
prefixWithSeparator := prefix
if prefix != "" {
prefixWithSeparator = prefixWithSeparator + "-"
}
// create daily indices and archive index
err := createEsIndices(client, []string{
prefixWithSeparator + spanIndexName, prefixWithSeparator + serviceIndexName,
prefixWithSeparator + dependenciesIndexName, prefixWithSeparator + archiveIndexName})
if err != nil {
return err
}
// create rollover archive index and roll alias to the new index
err = runEsRollover("init", []string{"ARCHIVE=true", "INDEX_PREFIX=" + prefix})
if err != nil {
return err
}
err = runEsRollover("rollover", []string{"ARCHIVE=true", "INDEX_PREFIX=" + prefix, rolloverNowEnvVar})
if err != nil {
return err
}
// create rollover main indices and roll over to the new index
err = runEsRollover("init", []string{"ARCHIVE=false", "INDEX_PREFIX=" + prefix})
if err != nil {
return err
}
err = runEsRollover("rollover", []string{"ARCHIVE=false", "INDEX_PREFIX=" + prefix, rolloverNowEnvVar})
if err != nil {
return err
}
return nil
}

func createEsIndices(client *elastic.Client, indices []string) error {
for _, index := range indices {
if _, err := client.CreateIndex(index).Do(context.Background()); err != nil {
return err
}
}
return nil
}

func runEsCleaner(days int, envs []string) error {
var dockerEnv string
for _, e := range envs {
dockerEnv += fmt.Sprintf(" -e %s", e)
}
args := fmt.Sprintf("sudo docker run %s --net=host %s %d http://%s", dockerEnv, indexCleanerImage, days, queryHostPort)
cmd := exec.Command("/bin/sh", "-c", args)
out, err := cmd.CombinedOutput()
fmt.Println(string(out))
return err
}

func runEsRollover(action string, envs []string) error {
var dockerEnv string
for _, e := range envs {
dockerEnv += fmt.Sprintf(" -e %s", e)
}
args := fmt.Sprintf("sudo docker run %s --rm --net=host %s %s http://%s", dockerEnv, rolloverImage, action, queryHostPort)
cmd := exec.Command("/bin/sh", "-c", args)
out, err := cmd.CombinedOutput()
fmt.Println(string(out))
return err
}

func createESClient() (*elastic.Client, error) {
return elastic.NewClient(
elastic.SetURL(queryURL),
elastic.SetSniff(false))
}
8 changes: 5 additions & 3 deletions scripts/travis/es-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
set -e

docker pull docker.elastic.co/elasticsearch/elasticsearch:5.6.12
CID=$(docker run -d -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" docker.elastic.co/elasticsearch/elasticsearch:5.6.12)
export STORAGE=elasticsearch
make storage-integration-test
CID=$(docker run -d -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "xpack.security.enabled=false" -e "xpack.monitoring.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:5.6.12)

STORAGE=elasticsearch make storage-integration-test
make index-cleaner-integration-test

docker kill $CID