From 83e2621bb100c90e4936aa8514131c8f2be43a2b Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Thu, 25 Jul 2019 15:40:34 +0200 Subject: [PATCH 1/2] Support index cleaning for rollover Signed-off-by: Pavol Loffay --- Makefile | 7 + plugin/storage/es/esCleaner.py | 33 ++- .../storage/integration/elasticsearch_test.go | 3 - .../integration/es_index_cleaner_test.go | 201 ++++++++++++++++++ scripts/travis/es-integration-test.sh | 8 +- 5 files changed, 243 insertions(+), 9 deletions(-) create mode 100644 plugin/storage/integration/es_index_cleaner_test.go diff --git a/Makefile b/Makefile index c29341b8148..a23e5d08a44 100644 --- a/Makefile +++ b/Makefile @@ -96,6 +96,13 @@ storage-integration-test: go-gen go clean -testcache bash -c "set -e; set -o pipefail; $(GOTEST) $(STORAGE_PKGS) | $(COLORIZE)" +.PHONY: index-cleaner-integration-test +index-cleaner-integration-test: docker-images-elastic + # Expire tests results for storage integration tests since the environment might change + # even though the code remains the same. + go clean -testcache + bash -c "set -e; set -o pipefail; $(GOTEST) -tags index_cleaner $(STORAGE_PKGS) | $(COLORIZE)" + all-pkgs: @echo $(ALL_PKGS) | tr ' ' '\n' | sort diff --git a/plugin/storage/es/esCleaner.py b/plugin/storage/es/esCleaner.py index 050cb7a41cd..8a943f2552e 100755 --- a/plugin/storage/es/esCleaner.py +++ b/plugin/storage/es/esCleaner.py @@ -15,7 +15,8 @@ def main(): print('HOSTNAME ... specifies which Elasticsearch hosts URL to search and delete indices from.') print('TIMEOUT ... number of seconds to wait for master node response.'.format(TIMEOUT)) print('INDEX_PREFIX ... specifies index prefix.') - print('ARCHIVE ... specifies whether to remove archive indices (default false).') + print('ARCHIVE ... specifies whether to remove archive indices (only works for rollover) (default false).') + print('ROLLOVER ... specifies whether to remove indices created by rollover (default false).') print('ES_USERNAME ... The username required by Elasticsearch.') print('ES_PASSWORD ... The password required by Elasticsearch.') print('ES_TLS ... enable TLS (default false).') @@ -46,7 +47,10 @@ def main(): if str2bool(os.getenv("ARCHIVE", 'false')): filter_archive_indices(ilo, prefix) else: - filter_main_indices(ilo, prefix) + if str2bool(os.getenv("ROLLOVER", 'false')): + filter_main_indices_rollover(ilo, prefix) + else: + filter_main_indices(ilo, prefix) empty_list(ilo, 'No indices to delete') @@ -60,13 +64,36 @@ def main(): def filter_main_indices(ilo, prefix): ilo.filter_by_regex(kind='prefix', value=prefix + "jaeger") empty_list(ilo, "No indices to delete") + ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-read'], exclude=True) + ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-write'], exclude=True) + ilo.filter_by_alias(aliases=[prefix + 'jaeger-service-read'], exclude=True) + ilo.filter_by_alias(aliases=[prefix + 'jaeger-service-write'], exclude=True) + ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-read'], exclude=True) + ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-write'], exclude=True) + empty_list(ilo, "No indices to delete") # This excludes archive index as we use source='name' # source `creation_date` would include archive index ilo.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=int(sys.argv[1])) +def filter_main_indices_rollover(ilo, prefix): + ilo.filter_by_alias(aliases=[prefix + 'jaeger-*-read']) + empty_list(ilo, "No indices to delete") + # do not remove active write indices + ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-write'], exclude=True) + empty_list(ilo, "No indices to delete") + ilo.filter_by_alias(aliases=[prefix + 'jaeger-service-write'], exclude=True) + empty_list(ilo, "No indices to delete") + ilo.filter_by_regex(kind='prefix', value=prefix + 'jaeger-span-archive*', exclude=True) + empty_list(ilo, "No indices to delete") + # This excludes archive index as we use source='name' + # source `creation_date` would include archive index + # TODO it might be useful to allow filter_by_space + ilo.filter_by_age(source='creation_date', direction='older', unit='days', unit_count=int(sys.argv[1])) + + def filter_archive_indices(ilo, prefix): - # Remove only archive indices when aliases are used + # Remove only archive indices when aliases/rollover are used # Do not remove active write archive index ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-write'], exclude=True) ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-read']) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 619a67ff507..bf4548f3687 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -41,8 +41,6 @@ const ( queryPort = "9200" queryHostPort = host + ":" + queryPort queryURL = "http://" + queryHostPort - username = "elastic" // the elasticsearch default username - password = "changeme" // the elasticsearch default password indexPrefix = "integration-test" tagKeyDeDotChar = "@" maxSpanAge = time.Hour * 72 @@ -59,7 +57,6 @@ type ESStorageIntegration struct { func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error { rawClient, err := elastic.NewClient( elastic.SetURL(queryURL), - elastic.SetBasicAuth(username, password), elastic.SetSniff(false)) if err != nil { return err diff --git a/plugin/storage/integration/es_index_cleaner_test.go b/plugin/storage/integration/es_index_cleaner_test.go new file mode 100644 index 00000000000..6805e9047b8 --- /dev/null +++ b/plugin/storage/integration/es_index_cleaner_test.go @@ -0,0 +1,201 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build index_cleaner + +package integration + +import ( + "context" + "fmt" + "os/exec" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/olivere/elastic.v5" +) + +// 1. removes daily indices, but does not remove rollover or archive +// 2. removes rollover but does not remove daily or archive + +const ( + archiveIndexName = "jaeger-span-archive" + dependenciesIndexName = "jaeger-dependencies-2019-01-01" + spanIndexName = "jaeger-span-2019-01-01" + serviceIndexName = "jaeger-service-2019-01-01" + indexCleanerImage = "jaegertracing/jaeger-es-index-cleaner:latest" + rolloverImage = "jaegertracing/jaeger-es-rollover:latest" + rolloverNowEnvVar = "CONDITIONS='{\"max_age\":\"0s\"}'" +) + +func TestIndexCleaner_doNotFailOnEmptyStorage(t *testing.T) { + client, err := createESClient() + require.NoError(t, err) + _, err = client.DeleteIndex("*").Do(context.Background()) + require.NoError(t, err) + + tests := []struct{ + envs []string + }{ + {envs:[]string{"ROLLOVER=false"}}, + {envs:[]string{"ROLLOVER=true"}}, + {envs:[]string{"ARCHIVE=true"}}, + } + for _, test := range tests { + err := runEsCleaner(7, test.envs) + require.NoError(t, err) + } +} + +func TestIndexCleaner(t *testing.T) { + client, err := createESClient() + require.NoError(t, err) + + tests := []struct{ + name string + envVars []string + expectedIndices []string + }{ + { + name: "RemoveDailyIndices", + envVars: []string{}, + expectedIndices: []string{ + archiveIndexName, + "jaeger-span-000001", "jaeger-service-000001", "jaeger-span-000002", "jaeger-service-000002", + "jaeger-span-archive-000001", "jaeger-span-archive-000002", + }, + }, + { + name: "RemoveRolloverIndices", + envVars: []string{"ROLLOVER=true"}, + expectedIndices: []string{ + archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName, + "jaeger-span-000002", "jaeger-service-000002", + "jaeger-span-archive-000001", "jaeger-span-archive-000002", + }, + }, + { + name: "RemoveArchiveIndices", + envVars: []string{"ARCHIVE=true"}, + expectedIndices: []string{ + archiveIndexName, spanIndexName, serviceIndexName, dependenciesIndexName, + "jaeger-span-000001", "jaeger-service-000001", "jaeger-span-000002", "jaeger-service-000002", + "jaeger-span-archive-000002", + }, + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("IndexCleanerRunWith[%v]_no_prefix", test.envVars), func(t *testing.T) { + runIndexCleanerTest(t, client, "", test.expectedIndices, test.envVars) + }) + t.Run(fmt.Sprintf("%s_prefix", test.name), func(t *testing.T) { + runIndexCleanerTest(t, client, indexPrefix, test.expectedIndices, append(test.envVars, "INDEX_PREFIX="+indexPrefix)) + }) + } +} + +func runIndexCleanerTest(t *testing.T, client *elastic.Client, prefix string, expectedIndices, envVars []string) { + // make sure ES is clean + _, err := client.DeleteIndex("*").Do(context.Background()) + require.NoError(t, err) + + err = createAllIndices(client, prefix) + require.NoError(t, err) + err = runEsCleaner(0, envVars) + require.NoError(t, err) + + indices, err := client.IndexNames() + require.NoError(t, err) + if prefix != "" { + prefix = prefix + "-" + } + var expected []string + for _, index := range expectedIndices { + expected = append(expected, prefix + index) + } + assert.ElementsMatch(t, indices, expected) +} + +func createAllIndices(client *elastic.Client, prefix string) error { + prefixWithSeparator := prefix + if prefix != "" { + prefixWithSeparator = prefixWithSeparator + "-" + } + // create daily indices and archive index + err := createEsIndices(client, []string{ + prefixWithSeparator + spanIndexName, prefixWithSeparator + serviceIndexName, + prefixWithSeparator + dependenciesIndexName, prefixWithSeparator + archiveIndexName}) + if err != nil { + return err + } + // create rollover archive index and roll alias to the new index + err = runEsRollover("init", []string{"ARCHIVE=true", "INDEX_PREFIX=" + prefix}) + if err != nil { + return err + } + err = runEsRollover("rollover", []string{"ARCHIVE=true", "INDEX_PREFIX=" + prefix, rolloverNowEnvVar}) + if err != nil { + return err + } + // create rollover main indices and roll over to the new index + err = runEsRollover("init", []string{"ARCHIVE=false", "INDEX_PREFIX=" + prefix}) + if err != nil { + return err + } + err = runEsRollover("rollover", []string{"ARCHIVE=false", "INDEX_PREFIX=" + prefix, rolloverNowEnvVar}) + if err != nil { + return err + } + return nil +} + +func createEsIndices(client *elastic.Client, indices []string) error { + for _, index := range indices { + if _, err := client.CreateIndex(index).Do(context.Background()); err != nil { + return err + } + } + return nil +} + +func runEsCleaner(days int, envs []string) error { + var dockerEnv string + for _, e := range envs { + dockerEnv += fmt.Sprintf(" -e %s", e) + } + args := fmt.Sprintf("sudo docker run %s --net=host %s %d http://%s", dockerEnv, indexCleanerImage, days, queryHostPort) + cmd := exec.Command("/bin/sh", "-c", args) + out, err := cmd.CombinedOutput() + fmt.Println(string(out)) + return err +} + +func runEsRollover(action string, envs []string) error { + var dockerEnv string + for _, e := range envs { + dockerEnv += fmt.Sprintf(" -e %s", e) + } + args := fmt.Sprintf("sudo docker run %s --rm --net=host %s %s http://%s", dockerEnv, rolloverImage, action, queryHostPort) + cmd := exec.Command("/bin/sh", "-c", args) + out, err := cmd.CombinedOutput() + fmt.Println(string(out)) + return err +} + +func createESClient() (*elastic.Client, error) { + return elastic.NewClient( + elastic.SetURL(queryURL), + elastic.SetSniff(false)) +} diff --git a/scripts/travis/es-integration-test.sh b/scripts/travis/es-integration-test.sh index dad6bb87c4c..ca6cd54a10d 100755 --- a/scripts/travis/es-integration-test.sh +++ b/scripts/travis/es-integration-test.sh @@ -3,7 +3,9 @@ set -e docker pull docker.elastic.co/elasticsearch/elasticsearch:5.6.12 -CID=$(docker run -d -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" docker.elastic.co/elasticsearch/elasticsearch:5.6.12) -export STORAGE=elasticsearch -make storage-integration-test +CID=$(docker run -d -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "xpack.security.enabled=false" -e "xpack.monitoring.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:5.6.12) + +STORAGE=elasticsearch make storage-integration-test +make index-cleaner-integration-test + docker kill $CID From 5f28d638273db56b90180e55c15c743a7e1be7ad Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Fri, 26 Jul 2019 15:13:56 +0200 Subject: [PATCH 2/2] Simplify regexes Signed-off-by: Pavol Loffay --- plugin/storage/es/esCleaner.py | 23 +++++----------- .../integration/es_index_cleaner_test.go | 27 +++++++++++++++---- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/plugin/storage/es/esCleaner.py b/plugin/storage/es/esCleaner.py index 8a943f2552e..4c051ecc402 100755 --- a/plugin/storage/es/esCleaner.py +++ b/plugin/storage/es/esCleaner.py @@ -45,7 +45,7 @@ def main(): prefix += '-' if str2bool(os.getenv("ARCHIVE", 'false')): - filter_archive_indices(ilo, prefix) + filter_archive_indices_rollover(ilo, prefix) else: if str2bool(os.getenv("ROLLOVER", 'false')): filter_main_indices_rollover(ilo, prefix) @@ -62,14 +62,7 @@ def main(): def filter_main_indices(ilo, prefix): - ilo.filter_by_regex(kind='prefix', value=prefix + "jaeger") - empty_list(ilo, "No indices to delete") - ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-read'], exclude=True) - ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-write'], exclude=True) - ilo.filter_by_alias(aliases=[prefix + 'jaeger-service-read'], exclude=True) - ilo.filter_by_alias(aliases=[prefix + 'jaeger-service-write'], exclude=True) - ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-read'], exclude=True) - ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-write'], exclude=True) + ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service|dependencies)-\d{4}-\d{2}-\d{2}") empty_list(ilo, "No indices to delete") # This excludes archive index as we use source='name' # source `creation_date` would include archive index @@ -77,25 +70,21 @@ def filter_main_indices(ilo, prefix): def filter_main_indices_rollover(ilo, prefix): - ilo.filter_by_alias(aliases=[prefix + 'jaeger-*-read']) + ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service)-\d{6}") empty_list(ilo, "No indices to delete") # do not remove active write indices ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-write'], exclude=True) empty_list(ilo, "No indices to delete") ilo.filter_by_alias(aliases=[prefix + 'jaeger-service-write'], exclude=True) empty_list(ilo, "No indices to delete") - ilo.filter_by_regex(kind='prefix', value=prefix + 'jaeger-span-archive*', exclude=True) - empty_list(ilo, "No indices to delete") - # This excludes archive index as we use source='name' - # source `creation_date` would include archive index - # TODO it might be useful to allow filter_by_space ilo.filter_by_age(source='creation_date', direction='older', unit='days', unit_count=int(sys.argv[1])) -def filter_archive_indices(ilo, prefix): - # Remove only archive indices when aliases/rollover are used +def filter_archive_indices_rollover(ilo, prefix): + # Remove only rollover archive indices # Do not remove active write archive index ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-write'], exclude=True) + empty_list(ilo, "No indices to delete") ilo.filter_by_alias(aliases=[prefix + 'jaeger-span-archive-read']) ilo.filter_by_age(source='creation_date', direction='older', unit='days', unit_count=int(sys.argv[1])) diff --git a/plugin/storage/integration/es_index_cleaner_test.go b/plugin/storage/integration/es_index_cleaner_test.go index 6805e9047b8..acc98255145 100644 --- a/plugin/storage/integration/es_index_cleaner_test.go +++ b/plugin/storage/integration/es_index_cleaner_test.go @@ -27,9 +27,6 @@ import ( "gopkg.in/olivere/elastic.v5" ) -// 1. removes daily indices, but does not remove rollover or archive -// 2. removes rollover but does not remove daily or archive - const ( archiveIndexName = "jaeger-span-archive" dependenciesIndexName = "jaeger-dependencies-2019-01-01" @@ -59,6 +56,26 @@ func TestIndexCleaner_doNotFailOnEmptyStorage(t *testing.T) { } } +func TestIndexCleaner_doNotFailOnFullStorage(t *testing.T) { + client, err := createESClient() + require.NoError(t, err) + tests := []struct{ + envs []string + }{ + {envs:[]string{"ROLLOVER=false"}}, + {envs:[]string{"ROLLOVER=true"}}, + {envs:[]string{"ARCHIVE=true"}}, + } + for _, test := range tests { + _, err = client.DeleteIndex("*").Do(context.Background()) + require.NoError(t, err) + err := createAllIndices(client, "") + require.NoError(t, err) + err = runEsCleaner(1500, test.envs) + require.NoError(t, err) + } +} + func TestIndexCleaner(t *testing.T) { client, err := createESClient() require.NoError(t, err) @@ -97,7 +114,7 @@ func TestIndexCleaner(t *testing.T) { }, } for _, test := range tests { - t.Run(fmt.Sprintf("IndexCleanerRunWith[%v]_no_prefix", test.envVars), func(t *testing.T) { + t.Run(fmt.Sprintf("%s_no_prefix", test.name), func(t *testing.T) { runIndexCleanerTest(t, client, "", test.expectedIndices, test.envVars) }) t.Run(fmt.Sprintf("%s_prefix", test.name), func(t *testing.T) { @@ -125,7 +142,7 @@ func runIndexCleanerTest(t *testing.T, client *elastic.Client, prefix string, ex for _, index := range expectedIndices { expected = append(expected, prefix + index) } - assert.ElementsMatch(t, indices, expected) + assert.ElementsMatch(t, indices, expected, fmt.Sprintf("indices found: %v, expected: %v", indices, expected)) } func createAllIndices(client *elastic.Client, prefix string) error {