Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(serverv2): remove unused interface methods, honuor context #22394

Merged
merged 6 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions runtime/v2/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"cosmossdk.io/core/store"
"cosmossdk.io/server/v2/stf"
storev2 "cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/proof"
)

// NewKVStoreService creates a new KVStoreService.
Expand All @@ -29,34 +28,12 @@ type Store interface {
// version. Must error when the version does not exist.
StateAt(version uint64) (store.ReaderMap, error)

// SetInitialVersion sets the initial version of the store.
SetInitialVersion(uint64) error

// WorkingHash writes the provided changeset to the state and returns
// the working hash of the state.
WorkingHash(changeset *store.Changeset) (store.Hash, error)

// Commit commits the provided changeset and returns the new state root of the state.
Commit(changeset *store.Changeset) (store.Hash, error)

// Query is a key/value query directly to the underlying database. This skips the appmanager
Query(storeKey []byte, version uint64, key []byte, prove bool) (storev2.QueryResult, error)

// GetStateStorage returns the SS backend.
GetStateStorage() storev2.VersionedWriter

// GetStateCommitment returns the SC backend.
GetStateCommitment() storev2.Committer

// LoadVersion loads the RootStore to the given version.
LoadVersion(version uint64) error

// LoadLatestVersion behaves identically to LoadVersion except it loads the
// latest version implicitly.
LoadLatestVersion() error

// LastCommitID returns the latest commit ID
LastCommitID() (proof.CommitID, error)
}

// StoreLoader allows for custom loading of the store, this is useful when upgrading the store from a previous version
Expand Down
8 changes: 4 additions & 4 deletions server/v2/api/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
serverv2 "cosmossdk.io/server/v2"
"cosmossdk.io/server/v2/api"
"cosmossdk.io/server/v2/api/grpc/gogoreflection"
)

Expand Down Expand Up @@ -197,13 +198,13 @@ func (s *Server[T]) Config() any {
return s.config
}

func (s *Server[T]) Start(context.Context) error {
func (s *Server[T]) Start(ctx context.Context) error {
if !s.config.Enable {
s.logger.Info(fmt.Sprintf("%s server is disabled via config", s.Name()))
return nil
}

listener, err := net.Listen("tcp", s.config.Address)
listener, err := (&net.ListenConfig{}).Listen(ctx, "tcp", s.config.Address)
if err != nil {
return fmt.Errorf("failed to listen on address %s: %w", s.config.Address, err)
}
Expand All @@ -222,8 +223,7 @@ func (s *Server[T]) Stop(ctx context.Context) error {
}

s.logger.Info("stopping gRPC server...", "address", s.config.Address)
s.grpcSrv.GracefulStop()
return nil
return api.DoUntilCtxExpired(ctx, s.grpcSrv.GracefulStop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding a configurable shutdown timeout.

While using DoUntilCtxExpired with GracefulStop is good for context-aware shutdown, consider adding a configurable timeout in the server config to prevent potentially long shutdown times with hanging connections.

Add a timeout configuration to the Config struct:

type Config struct {
    // ... existing fields ...
    ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout"`
}

Then use it in the Stop method:

func (s *Server[T]) Stop(ctx context.Context) error {
    if !s.config.Enable {
        return nil
    }

    s.logger.Info("stopping gRPC server...", "address", s.config.Address)
    
    // Create a timeout context if shutdown timeout is configured
    if s.config.ShutdownTimeout > 0 {
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(ctx, s.config.ShutdownTimeout)
        defer cancel()
    }
    
    return api.DoUntilCtxExpired(ctx, s.grpcSrv.GracefulStop)
}

}

// GetGRPCServer returns the underlying gRPC server.
Expand Down
22 changes: 22 additions & 0 deletions server/v2/api/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package api

import "context"

// DoUntilCtxExpired runs the given function until the context is expired or
// the function exits.
// This forces context to be honored.
func DoUntilCtxExpired(ctx context.Context, f func()) error {
done := make(chan struct{})
go func() {
defer close(done)

f()
}()
Comment on lines +10 to +14

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
}
Comment on lines +8 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential goroutine leak on context cancellation

The current implementation has a potential goroutine leak. When the context is cancelled, the goroutine continues running without any way to stop it. This could lead to resource leaks in long-running applications.

Consider adding a way to signal the goroutine to stop:

 func DoUntilCtxExpired(ctx context.Context, f func()) error {
 	done := make(chan struct{})
+	// Create a child context that we can cancel
+	runCtx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
 	go func() {
 		defer close(done)
-
-		f()
+		// Pass the cancellable context to the function
+		go func() {
+			f()
+			cancel()
+		}()
+
+		// Wait for either completion or cancellation
+		<-runCtx.Done()
 	}()

 	select {
 	case <-ctx.Done():
+		// Cancel the running function
+		cancel()
 		return ctx.Err()
 	case <-done:
 		return nil
 	}
 }

Committable suggestion was skipped due to low confidence.

35 changes: 35 additions & 0 deletions server/v2/api/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package api

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestDoUntilCtxExpired(t *testing.T) {
t.Run("success", func(t *testing.T) {
ctx := context.Background()

funcRan := false
err := DoUntilCtxExpired(ctx, func() {
funcRan = true
})
require.NoError(t, err)
require.True(t, funcRan)
})
Comment on lines +12 to +21
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider enhancing test robustness.

While the test correctly verifies the basic functionality, consider these improvements:

  1. Use a timeout context to prevent potential test hangs
  2. Add execution time assertions
  3. Document the test scenario
 t.Run("success", func(t *testing.T) {
-    ctx := context.Background()
+    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+    defer cancel()
+
+    start := time.Now()
     funcRan := false
     err := DoUntilCtxExpired(ctx, func() {
         funcRan = true
     })
     require.NoError(t, err)
     require.True(t, funcRan)
+    require.Less(t, time.Since(start), 5*time.Second)
 })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
t.Run("success", func(t *testing.T) {
ctx := context.Background()
funcRan := false
err := DoUntilCtxExpired(ctx, func() {
funcRan = true
})
require.NoError(t, err)
require.True(t, funcRan)
})
t.Run("success", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
start := time.Now()
funcRan := false
err := DoUntilCtxExpired(ctx, func() {
funcRan = true
})
require.NoError(t, err)
require.True(t, funcRan)
require.Less(t, time.Since(start), 5*time.Second)
})


t.Run("context expired", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

funcRan := false
err := DoUntilCtxExpired(ctx, func() {
cancel()
funcRan = true
<-time.After(time.Second)
})
require.ErrorIs(t, err, context.Canceled)
require.True(t, funcRan)
})
Comment on lines +23 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential test flakiness and goroutine leak.

The current implementation has several issues:

  1. Hard-coded 1-second delay makes tests unnecessarily slow
  2. time.After creates a goroutine leak
  3. No assertion that the function actually attempted to wait
 t.Run("context expired", func(t *testing.T) {
     ctx, cancel := context.WithCancel(context.Background())
+    defer cancel()
 
+    waited := make(chan struct{})
     funcRan := false
     err := DoUntilCtxExpired(ctx, func() {
         cancel()
         funcRan = true
-        <-time.After(time.Second)
+        close(waited)
+        <-ctx.Done()
     })
+    <-waited
     require.ErrorIs(t, err, context.Canceled)
     require.True(t, funcRan)
 })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
t.Run("context expired", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
funcRan := false
err := DoUntilCtxExpired(ctx, func() {
cancel()
funcRan = true
<-time.After(time.Second)
})
require.ErrorIs(t, err, context.Canceled)
require.True(t, funcRan)
})
t.Run("context expired", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
waited := make(chan struct{})
funcRan := false
err := DoUntilCtxExpired(ctx, func() {
cancel()
funcRan = true
close(waited)
<-ctx.Done()
})
<-waited
require.ErrorIs(t, err, context.Canceled)
require.True(t, funcRan)
})

}
Loading