Skip to content

Commit

Permalink
Merge #66768
Browse files Browse the repository at this point in the history
66768: roachtest: add test for authentication under network partition r=rafiss,knz a=knz

Supersedes #66550

Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
craig[bot] and rafiss committed Sep 14, 2021
2 parents a59ea3e + ffc4ef9 commit a043a34
Showing 1 changed file with 276 additions and 0 deletions.
276 changes: 276 additions & 0 deletions pkg/cmd/roachtest/tests/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@ package tests
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync/atomic"
"time"

toxiproxy "github.com/Shopify/toxiproxy/client"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
_ "github.com/lib/pq" // register postgres driver
"github.com/stretchr/testify/require"
)

// runNetworkSanity is just a sanity check to make sure we're setting up toxiproxy
Expand Down Expand Up @@ -97,6 +106,265 @@ select age, message from [ show trace for session ];
m.Wait()
}

// runNetworkAuthentication creates a network black hole to the leaseholder
// of system.users, and then validates that the time required to create
// new connections to the cluster afterwards remains under a reasonable limit.
func runNetworkAuthentication(ctx context.Context, t test.Test, c cluster.Cluster) {
n := c.Spec().NodeCount
serverNodes, clientNode := c.Range(1, n-1), c.Node(n)

c.Put(ctx, t.Cockroach(), "./cockroach", c.All())

t.L().Printf("starting nodes to initialize TLS certs...")
// NB: we need to start two times, because when we use
// c.Start() separately on nodes 1 and nodes 2-3,
// the logic will find the certs don't exist on node 2 and
// 3 will re-recreate a separate set of certs, which
// we don't want. Starting all nodes at once ensures
// that they use coherent certs.
c.Start(ctx, serverNodes, option.StartArgs("--secure"))
require.NoError(t, c.StopE(ctx, serverNodes))

t.L().Printf("restarting nodes...")
c.Start(ctx, c.Node(1), option.StartArgs(
"--secure",
"--args=--locality=node=1",
"--args=--accept-sql-without-tls",
// For troubleshooting the test, the engineer can add the following
// environment variables to make the rebalancing faster.
// However, they should be removed for the production version
// of the test, because they make the cluster recover from a failure
// in a way that is unrealistically fast.
// "--env=COCKROACH_SCAN_INTERVAL=200ms",
// "--env=COCKROACH_SCAN_MAX_IDLE_TIME=20ms",
))
c.Start(ctx, c.Range(2, n-1), option.StartArgs(
"--secure",
"--args=--locality=node=other",
"--args=--accept-sql-without-tls",
// See comment above about env vars.
// "--env=COCKROACH_SCAN_INTERVAL=200ms",
// "--env=COCKROACH_SCAN_MAX_IDLE_TIME=20ms",
))

t.L().Printf("retrieving server addresses...")
serverAddrs, err := c.InternalAddr(ctx, serverNodes)
require.NoError(t, err)

t.L().Printf("fetching certs...")
certsDir := "/home/ubuntu/certs"
localCertsDir, err := filepath.Abs("./network-certs")
require.NoError(t, err)
require.NoError(t, os.RemoveAll(localCertsDir))
require.NoError(t, c.Get(ctx, t.L(), certsDir, localCertsDir, c.Node(1)))
require.NoError(t, filepath.Walk(localCertsDir, func(path string, info os.FileInfo, err error) error {
// Don't change permissions for the certs directory.
if path == localCertsDir {
return nil
}
if err != nil {
return err
}
return os.Chmod(path, os.FileMode(0600))
}))

t.L().Printf("connecting to cluster from roachtest...")
db, err := c.ConnE(ctx, 1)
require.NoError(t, err)
defer db.Close()

// Wait for up-replication. This will also print a progress message.
WaitFor3XReplication(t, db)

t.L().Printf("creating test user...")
_, err = db.Exec(`CREATE USER testuser WITH PASSWORD 'password' VALID UNTIL '2060-01-01'`)
require.NoError(t, err)
_, err = db.Exec(`GRANT admin TO testuser`)
require.NoError(t, err)

const expectedLeaseholder = 1
lh := fmt.Sprintf("%d", expectedLeaseholder)

t.L().Printf("configuring zones to move ranges to node 1...")
for _, zone := range []string{
`RANGE liveness`,
`RANGE meta`,
`RANGE system`,
`RANGE default`,
`DATABASE system`,
} {
zoneCmd := `ALTER ` + zone + ` CONFIGURE ZONE USING lease_preferences = '[[+node=` + lh + `]]', constraints = '{"+node=` + lh + `": 1}'`
t.L().Printf("SQL: %s", zoneCmd)
_, err = db.Exec(zoneCmd)
require.NoError(t, err)
}

t.L().Printf("waiting for leases to move...")
{
tStart := timeutil.Now()
for ok := false; !ok; time.Sleep(time.Second) {
if timeutil.Since(tStart) > 30*time.Second {
t.L().Printf("still waiting for leases to move")
// The leases have not moved yet, so display some progress.
dumpRangesCmd := fmt.Sprintf(`./cockroach sql --certs-dir %s -e 'TABLE crdb_internal.ranges'`, certsDir)
t.L().Printf("SQL: %s", dumpRangesCmd)
err := c.RunE(ctx, c.Node(1), dumpRangesCmd)
require.NoError(t, err)
}

const waitLeases = `
SELECT $1::INT = ALL (
SELECT lease_holder
FROM crdb_internal.ranges
WHERE (start_pretty = '/System/NodeLiveness' AND end_pretty = '/System/NodeLivenessMax')
OR (database_name = 'system' AND table_name IN ('users', 'role_members', 'role_options'))
)`
t.L().Printf("SQL: %s", waitLeases)
require.NoError(t, db.QueryRow(waitLeases, expectedLeaseholder).Scan(&ok))
}
}

cancelTestCtx, cancelTest := context.WithCancel(ctx)

// Channel to expedite the end of the waiting below
// in case an error occurs.
woopsCh := make(chan struct{}, len(serverNodes)-1)

m := c.NewMonitor(ctx)

var numConns uint32

for i := 1; i <= c.Spec().NodeCount-1; i++ {
if i == expectedLeaseholder {
continue
}

// Ensure that every goroutine below gets a different copy of i.
server := i

// Start a client loop for the server "i".
m.Go(func(ctx context.Context) error {
errCount := 0
for attempt := 0; ; attempt++ {
select {
case <-ctx.Done():
// The monitor has decided that this goroutine needs to go away, presumably
// because another goroutine encountered an error.
t.L().Printf("server %d: stopping connections due to error", server)

// Expedite the wait below. This is not strictly required for correctness,
// and makes the test faster to terminate in case of failure.
woopsCh <- struct{}{}

// Stop this goroutine too.
return ctx.Err()

case <-cancelTestCtx.Done():
// The main goroutine below is instructing this client
// goroutine to terminate gracefully before the test terminates.
t.L().Printf("server %d: stopping connections due to end of test", server)
return nil

case <-time.After(500 * time.Millisecond):
// Wait for .5 second between connection attempts.
}

// Construct a connection URL to server i.
url := fmt.Sprintf("postgres://testuser:password@%s/defaultdb?sslmode=require", serverAddrs[server-1])

// Attempt a client connection to that server.
t.L().Printf("server %d, attempt %d; url: %s\n", server, attempt, url)
b, err := c.RunWithBuffer(ctx, t.L(), clientNode, "time", "-p", "./cockroach", "sql",
"--url", url, "--certs-dir", certsDir, "-e", "'SELECT 1'")

// Report the results of execution.
t.L().Printf("server %d, attempt %d, result:\n%s\n", server, attempt, b)
// Indicate, to the main goroutine, that we have at least one connection
// attempt completed.
atomic.AddUint32(&numConns, 1)

if err != nil {
if errCount == 0 {
// We tolerate the first error as acceptable.
t.L().Printf("server %d, attempt %d (1st ERROR, TOLERATE): %v", server, attempt, err)
errCount++
continue
}
// Any error beyond the first is unacceptable.
t.L().Printf("server %d, attempt %d (2nd ERROR, BAD): %v", server, attempt, err)

// Expedite the wait below. This is not strictly required for correctness,
// and makes the test faster to terminate in case of failure.
woopsCh <- struct{}{}
return err
}
}
})
}

// Main test goroutine. Run the body of the test, including the
// network partition, into a sub-function. This ensures that the
// network partition is resolved by the time the monitor finishes
// waiting on the servers.
func() {
t.L().Printf("waiting for clients to start connecting...")
testutils.SucceedsSoon(t, func() error {
select {
case <-woopsCh:
t.Fatal("connection error before network partition")
default:
}
if atomic.LoadUint32(&numConns) == 0 {
return errors.New("no connection yet")
}
return nil
})

t.L().Printf("blocking networking on node 1...")
const netConfigCmd = `
# ensure any failure fails the entire script.
set -e;
# Setting default filter policy
sudo iptables -P INPUT ACCEPT;
sudo iptables -P OUTPUT ACCEPT;
# Drop any node-to-node crdb traffic.
sudo iptables -A INPUT -p tcp --dport 26257 -j DROP;
sudo iptables -A OUTPUT -p tcp --dport 26257 -j DROP;
sudo iptables-save
`
t.L().Printf("partitioning using iptables; config cmd:\n%s", netConfigCmd)
require.NoError(t, c.RunE(ctx, c.Node(expectedLeaseholder), netConfigCmd))

// (attempt to) restore iptables when test end, so that cluster
// can be investigated afterwards.
defer func() {
const restoreNet = `
set -e;
sudo iptables -D INPUT -p tcp --dport 26257 -j DROP;
sudo iptables -D OUTPUT -p tcp --dport 26257 -j DROP;
sudo iptables-save
`
t.L().Printf("restoring iptables; config cmd:\n%s", restoreNet)
require.NoError(t, c.RunE(ctx, c.Node(expectedLeaseholder), restoreNet))
}()

t.L().Printf("waiting while clients attempt to connect...")
select {
case <-time.After(20 * time.Second):
case <-woopsCh:
}

// Terminate all the async goroutines.
cancelTest()
}()

// Test finished.
m.Wait()
}

func runNetworkTPCC(ctx context.Context, t test.Test, origC cluster.Cluster, nodes int) {
n := origC.Spec().NodeCount
serverNodes, workerNode := origC.Range(1, n-1), origC.Node(n)
Expand Down Expand Up @@ -246,6 +514,14 @@ func registerNetwork(r registry.Registry) {
runNetworkSanity(ctx, t, c, numNodes)
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("network/authentication/nodes=%d", numNodes),
Owner: registry.OwnerServer,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runNetworkAuthentication(ctx, t, c)
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("network/tpcc/nodes=%d", numNodes),
Owner: registry.OwnerKV,
Expand Down

0 comments on commit a043a34

Please sign in to comment.