Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix es integration test 6094 #6157

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 101 additions & 7 deletions plugin/storage/es/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package spanstore
import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/zap"
Expand All @@ -21,10 +22,11 @@ import (
)

const (
spanType = "span"
serviceType = "service"
serviceCacheTTLDefault = 12 * time.Hour
indexCacheTTLDefault = 48 * time.Hour
spanType = "span"
serviceType = "service"
serviceCacheTTLDefault = 12 * time.Hour
indexCacheTTLDefault = 48 * time.Hour
defaultIndexWaitTimeout = 60 * time.Second
)

type spanWriterMetrics struct {
Expand All @@ -42,6 +44,63 @@ type SpanWriter struct {
serviceWriter serviceWriter
spanConverter dbmodel.FromDomain
spanServiceIndex spanAndServiceIndexFn
indexCache sync.Map
}

func (s *SpanWriter) ensureIndex(ctx context.Context, indexName string) error {
if _, exists := s.indexCache.Load(indexName); exists {
return nil
}

_, loaded := s.indexCache.LoadOrStore(indexName, struct{}{})
if loaded {
return nil
}

exists, err := s.client().IndexExists(indexName).Do(ctx)
if err != nil {
return fmt.Errorf("failed to check index existence: %w", err)
}

if !exists {
s.logger.Info("Creating index", zap.String("index", indexName))

// Set specific settings for the test environment
body := `{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"index.write.wait_for_active_shards": 1
}
}`

_, err = s.client().CreateIndex(indexName).Body(body).Do(ctx)
if err != nil {
return fmt.Errorf("failed to create index with settings: %w", err)
}
s.logger.Info("Index created with settings",
zap.String("index", indexName),
zap.String("settings", body))
}

// Wait for index to be ready by checking its existence repeatedly
deadline := time.Now().Add(defaultIndexWaitTimeout)
start := time.Now()
for time.Now().Before(deadline) {
exists, err := s.client().IndexExists(indexName).Do(ctx)
if err == nil && exists {
s.logger.Info("Index is ready",
zap.String("index", indexName),
zap.Duration("took", time.Since(start)))
return nil
}
s.logger.Debug("Waiting for index to be ready",
zap.String("index", indexName),
zap.Duration("elapsed", time.Since(start)))
time.Sleep(time.Second)
}

return fmt.Errorf("timeout waiting for index %s to be ready", indexName)
}

// SpanWriterParams holds constructor parameters for NewSpanWriter
Expand Down Expand Up @@ -121,14 +180,49 @@ func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn {
}

// WriteSpan writes a span and its corresponding service:operation in ElasticSearch
func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error {
func (s *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
spanIndexName, serviceIndexName := s.spanServiceIndex(span.StartTime)

// Ensure indices exist before writing
if err := s.ensureIndex(ctx, spanIndexName); err != nil {
return fmt.Errorf("failed to ensure span index: %w", err)
}
if serviceIndexName != "" {
if err := s.ensureIndex(ctx, serviceIndexName); err != nil {
return fmt.Errorf("failed to ensure service index: %w", err)
}
}

jsonSpan := s.spanConverter.FromDomainEmbedProcess(span)
if serviceIndexName != "" {
s.writeService(serviceIndexName, jsonSpan)
}
s.writeSpan(spanIndexName, jsonSpan)
s.logger.Debug("Wrote span to ES index", zap.String("index", spanIndexName))

// Write with retries
var lastErr error
for i := 0; i < 3; i++ {
err := s.writeSpanWithResult(ctx, spanIndexName, jsonSpan)
if err == nil {
return nil
}
lastErr = err
s.logger.Debug("Retrying span write",
zap.String("index", spanIndexName),
zap.Int("attempt", i+1),
zap.Error(lastErr))
time.Sleep(time.Duration(i+1) * 100 * time.Millisecond)
}

return fmt.Errorf("failed to write span after retries: %w", lastErr)
}

func (s *SpanWriter) writeSpanWithResult(_ context.Context, indexName string, jsonSpan *dbmodel.Span) error {
indexService := s.client().Index().
Index(indexName).
Type(spanType).
BodyJson(jsonSpan)

indexService.Add()
return nil
}

Expand Down