Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
83529: DOC-4629: Update SQL diagrams surrounding storage parameters r=RichardJCai a=nickvigilante

Fixes DOC-4629

Release note: None

83678: sql: Use one BytesMonitor for all IE created by IEFactory  r=RichardJCai a=RichardJCai

Previously InternalExecutor's created via IEFactory would
create a monitor that would never be closed.

Now we have one monitor for all IEs that is closed with server
is closed.

Release note: None

83795: ci: ensure jemalloc is configured _without_ MADV_FREE r=srosenberg a=srosenberg

Previously, jemalloc would default to MADV_FREE which could lead
to surprising results; i.e., no reduction in RSS until memory pressure.
This change ensures that jemalloc is always compiled with MADV_FREE
disabled, thus using MADV_DONTNEED.
See the corresponding issue for further details.

Release note: None
Resolves: #83790

84007: changefeedccl: Add timeout to testfeed library. r=miretskiy a=miretskiy

Add timeout to testfeed sink implementations to timeout
if no messages are received for too long.

Informs #83946

Release Notes: None

84046: json: make checkLength assertion error "regular" r=mgartner a=mgartner

The error returned in `checkLength` is now a "regular" error instead of
an assertion error.

Fixes #77024

Release note: None

Co-authored-by: Nick Vigilante <[email protected]>
Co-authored-by: richardjcai <[email protected]>
Co-authored-by: Stan Rosenberg <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
6 people committed Jul 8, 2022
6 parents 18f6450 + 92a3eb6 + 8afd9f1 + 52a3a0a + d71549a + 723ff81 commit 4d8d0b5
Show file tree
Hide file tree
Showing 36 changed files with 459 additions and 164 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,13 @@ $(JEMALLOC_DIR)/Makefile: $(C_DEPS_DIR)/jemalloc-rebuild $(JEMALLOC_SRC_DIR)/con
mkdir -p $(JEMALLOC_DIR)
@# NOTE: If you change the configure flags below, bump the version in
@# $(C_DEPS_DIR)/jemalloc-rebuild. See above for rationale.
cd $(JEMALLOC_DIR) && $(JEMALLOC_SRC_DIR)/configure $(xconfigure-flags) --enable-prof
@# NOTE: we disable MADV_FREE; see https://github.com/cockroachdb/cockroach/issues/83790
export je_cv_madv_free="no" && cd $(JEMALLOC_DIR) && $(JEMALLOC_SRC_DIR)/configure $(xconfigure-flags) --enable-prof
JEMALLOC_MADV_FREE_ENABLED=$$(grep -E "^je_cv_madv_free=no$$" $(JEMALLOC_DIR)/config.log | awk -F'=' '{print $$2}'); \
if [[ "$$JEMALLOC_MADV_FREE_ENABLED" != "no" ]]; then \
echo "NOTE: using MADV_FREE with jemalloc can lead to surprising results; see https://github.com/cockroachdb/cockroach/issues/83790"; \
exit 1; \
fi

$(KRB5_SRC_DIR)/src/configure.in: | bin/.submodules-initialized

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

---

CockroachDB is a cloud-native distributed SQL database designed to build,
scale, and manage modern, data-intensive applications.
CockroachDB is a cloud-native distributed SQL database designed to build,
scale, and manage modern, data-intensive applications.

- [What is CockroachDB?](#what-is-cockroachdb)
- [Docs](#docs)
Expand Down
10 changes: 8 additions & 2 deletions c-deps/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,14 @@ configure_make(
"//conditions:default": [],
}),
env = select({
"//build/toolchains:dev": {"AR": ""},
"//conditions:default": {},
"//build/toolchains:dev": {
"AR": "",
# NOTE: we disable MADV_FREE; see https://github.com/cockroachdb/cockroach/issues/83790
"je_cv_madv_free": "no",
},
"//conditions:default": {
"je_cv_madv_free": "no",
},
}),
lib_source = "@jemalloc//:all",
out_static_libs = select({
Expand Down
8 changes: 8 additions & 0 deletions c-deps/buildcdeps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ for CONFIG in $CONFIGS; do
bazel build --config ci --config cross$CONFIG --//build/toolchains:prebuild_cdeps_flag $(echo "$TARGETS" | python3 -c 'import sys; input = sys.stdin.read().strip(); print(" ".join("//c-deps:{}_foreign".format(w) for w in input.split(" ")))')
BAZEL_BIN=$(bazel info bazel-bin --config ci --config cross$CONFIG)
for TARGET in $TARGETS; do
# verify jemalloc was configured without madv_free
if [[ $TARGET == libjemalloc ]]; then
JEMALLOC_MADV_FREE_ENABLED=$((grep -E "^je_cv_madv_free=no$" $BAZEL_BIN/c-deps/${TARGET}_foreign_foreign_cc/Configure.log | awk -F"=" '{print $2}') || true)
if [[ "$JEMALLOC_MADV_FREE_ENABLED" != "no" ]]; then
echo "NOTE: using MADV_FREE with jemalloc can lead to surprising results; see https://github.com/cockroachdb/cockroach/issues/83790"
exit 1
fi
fi
bundle $CONFIG $BAZEL_BIN/c-deps/${TARGET}_foreign
done
done
Expand Down
2 changes: 1 addition & 1 deletion c-deps/jemalloc-rebuild
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Bump the version below when changing jemalloc configure flags. Search for "BUILD
ARTIFACT CACHING" in build/common.mk for rationale.

5
6
4 changes: 4 additions & 0 deletions docs/generated/sql/bnf/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ FILES = [
"alter_table_locality_stmt",
"alter_table_owner_stmt",
"alter_table_partition_by",
"alter_table_reset_storage_param",
"alter_table_set_schema_stmt",
"alter_table_set_storage_param",
"alter_table_stmt",
"alter_tenant_csetting_stmt",
"alter_type",
Expand Down Expand Up @@ -75,6 +77,7 @@ FILES = [
"create_ddl_stmt",
"create_extension_stmt",
"create_index_stmt",
"create_index_with_storage_param",
"create_inverted_index_stmt",
"create_role_stmt",
"create_schedule_for_backup_stmt",
Expand All @@ -83,6 +86,7 @@ FILES = [
"create_stats_stmt",
"create_stmt",
"create_table_as_stmt",
"create_table_with_storage_param",
"create_table_stmt",
"create_type",
"create_view_stmt",
Expand Down
3 changes: 3 additions & 0 deletions docs/generated/sql/bnf/alter_table_reset_storage_param.bnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
alter_onetable_stmt ::=
'ALTER' 'TABLE' table_name 'RESET' '(' storage_parameter_key ( ( ',' storage_parameter_key ) )* ')'
| 'ALTER' 'TABLE' 'IF' 'EXISTS' table_name 'RESET' '(' storage_parameter_key ( ( ',' storage_parameter_key ) )* ')'
3 changes: 3 additions & 0 deletions docs/generated/sql/bnf/alter_table_set_storage_param.bnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
alter_onetable_stmt ::=
'ALTER' 'TABLE' table_name 'SET' '(' storage_parameter_key '=' var_value ) ) )* ')'
| 'ALTER' 'TABLE' 'IF' 'EXISTS' table_name 'SET' '(' storage_parameter_key '=' var_value ) ) )* ')'
5 changes: 5 additions & 0 deletions docs/generated/sql/bnf/create_index_with_storage_param.bnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create_index_stmt ::=
'CREATE' 'INDEX' opt_index_name 'ON' table_name '(' index_params ')' ( 'WITH' '(' ( ( ( storage_parameter_key '=' var_value ) ) ( ( ',' ( storage_parameter_key '=' var_value ) ) )* ) ')' )
| 'CREATE' 'INDEX' 'IF' 'NOT' 'EXISTS' index_name 'ON' table_name '(' index_params ')' ( 'WITH' '(' ( ( ( storage_parameter_key '=' var_value ) ) ( ( ',' ( storage_parameter_key '=' var_value ) ) )* ) ')' )
| 'CREATE' 'INVERTED' 'INDEX' opt_index_name 'ON' table_name '(' index_params ')' ( 'WITH' '(' ( ( ( storage_parameter_key '=' var_value ) ) ( ( ',' ( storage_parameter_key '=' var_value ) ) )* ) ')' )
| 'CREATE' 'INVERTED' 'INDEX' 'IF' 'NOT' 'EXISTS' index_name 'ON' table_name '(' index_params ')' ( 'WITH' '(' ( ( ( storage_parameter_key '=' var_value ) ) ( ( ',' ( storage_parameter_key '=' var_value ) ) )* ) ')' )
3 changes: 3 additions & 0 deletions docs/generated/sql/bnf/create_table_with_storage_param.bnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
create_table_stmt ::=
'CREATE' 'TABLE' table_name '(' ( table_definition | ) ')' ( ( 'WITH' '(' ( ( ( storage_parameter_key '=' var_value ) ) ( ( ',' ( storage_parameter_key '=' var_value ) ) )* ) ')' ) )
| 'CREATE' 'TABLE' 'IF' 'NOT' 'EXISTS' table_name '(' ( table_definition | ) ')' ( ( 'WITH' '(' ( ( ( storage_parameter_key '=' var_value ) ) ( ( ',' ( storage_parameter_key '=' var_value ) ) )* ) ')' ) )
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ go_test(
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
expectedValues[j] = fmt.Sprintf(`foo: [%d]->{"after": {"val": %d}}`, j, j)
expectedValues[j+numRowsPerTable] = fmt.Sprintf(`bar: [%d]->{"after": {"val": %d}}`, j, j)
}
return assertPayloadsBaseErr(testFeed, expectedValues, false, false)
return assertPayloadsBaseErr(context.Background(), testFeed, expectedValues, false, false)
})

defer func() {
Expand Down
33 changes: 32 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5241,7 +5241,7 @@ func TestChangefeedCheckpointSchemaChange(t *testing.T) {
`foo: [1]->{"after": {"a": 1}}`,
`foo: [2]->{"after": {"a": 2}}`,
}
msgs, err := readNextMessages(foo, len(expected))
msgs, err := readNextMessages(context.Background(), foo, len(expected))
require.NoError(t, err)

var msgsFormatted []string
Expand Down Expand Up @@ -6782,3 +6782,34 @@ func TestChangefeedFailedTelemetryLogs(t *testing.T) {
require.Equal(t, failLogs[0].NumTables, int32(1))
}, feedTestForceSink("pubsub"))
}

func TestChangefeedTestTimesOut(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
nada := feed(t, f, "CREATE CHANGEFEED FOR foo WITH resolved='100ms'")
defer closeFeed(t, nada)

expectResolvedTimestamp(t, nada) // Make sure feed is running.

const expectTimeout = 500 * time.Millisecond
var observedError error
require.NoError(t,
testutils.SucceedsWithinError(func() error {
observedError = withTimeout(
nada, expectTimeout,
func(ctx context.Context) error {
return assertPayloadsBaseErr(
ctx, nada, []string{`nada: [2]->{"after": {}}`}, false, false)
})
return nil
}, 20*expectTimeout))

require.Error(t, observedError)
}

cdcTest(t, testFn)
}
35 changes: 31 additions & 4 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
Expand Down Expand Up @@ -82,9 +83,14 @@ func waitForSchemaChange(
})
}

func readNextMessages(f cdctest.TestFeed, numMessages int) ([]cdctest.TestFeedMessage, error) {
func readNextMessages(
ctx context.Context, f cdctest.TestFeed, numMessages int,
) ([]cdctest.TestFeedMessage, error) {
var actual []cdctest.TestFeedMessage
for len(actual) < numMessages {
if ctx.Err() != nil {
return nil, ctx.Err()
}
m, err := f.Next()
if log.V(1) {
if m != nil {
Expand Down Expand Up @@ -171,13 +177,18 @@ func assertPayloadsBase(
t testing.TB, f cdctest.TestFeed, expected []string, stripTs bool, perKeyOrdered bool,
) {
t.Helper()
require.NoError(t, assertPayloadsBaseErr(f, expected, stripTs, perKeyOrdered))
require.NoError(t,
withTimeout(f, assertPayloadsTimeout(),
func(ctx context.Context) error {
return assertPayloadsBaseErr(ctx, f, expected, stripTs, perKeyOrdered)
},
))
}

func assertPayloadsBaseErr(
f cdctest.TestFeed, expected []string, stripTs bool, perKeyOrdered bool,
ctx context.Context, f cdctest.TestFeed, expected []string, stripTs bool, perKeyOrdered bool,
) error {
actual, err := readNextMessages(f, len(expected))
actual, err := readNextMessages(ctx, f, len(expected))
if err != nil {
return err
}
Expand Down Expand Up @@ -216,6 +227,22 @@ func assertPayloadsBaseErr(
return nil
}

func assertPayloadsTimeout() time.Duration {
if util.RaceEnabled {
return 5 * time.Minute
}
return 30 * time.Second
}

func withTimeout(
f cdctest.TestFeed, timeout time.Duration, fn func(ctx context.Context) error,
) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
defer stopFeedWhenDone(ctx, f)()
return fn(ctx)
}

func assertPayloads(t testing.TB, f cdctest.TestFeed, expected []string) {
t.Helper()
assertPayloadsBase(t, f, expected, false, false)
Expand Down
56 changes: 20 additions & 36 deletions pkg/ccl/changefeedccl/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -236,47 +235,36 @@ func TestKafkaSink(t *testing.T) {
defer cleanup()

// No inflight
if err := sink.Flush(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.Flush(ctx))

// Timeout
if err := sink.EmitRow(ctx, topic(`t`), []byte(`1`), nil, zeroTS, zeroTS, zeroAlloc); err != nil {
t.Fatal(err)
}
require.NoError(t,
sink.EmitRow(ctx, topic(`t`), []byte(`1`), nil, zeroTS, zeroTS, zeroAlloc))

m1 := <-p.inputCh
for i := 0; i < 2; i++ {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Millisecond)
defer cancel()
if err := sink.Flush(timeoutCtx); !testutils.IsError(
err, `context deadline exceeded`,
) {
t.Fatalf(`expected "context deadline exceeded" error got: %+v`, err)
}
require.True(t, errors.Is(context.DeadlineExceeded, sink.Flush(timeoutCtx)))
}
go func() { p.successesCh <- m1 }()
if err := sink.Flush(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.Flush(ctx))

// Check no inflight again now that we've sent something
if err := sink.Flush(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.Flush(ctx))

// Mixed success and error.
var pool testAllocPool
if err := sink.EmitRow(ctx, topic(`t`), []byte(`2`), nil, zeroTS, zeroTS, pool.alloc()); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.EmitRow(ctx,
topic(`t`), []byte(`2`), nil, zeroTS, zeroTS, pool.alloc()))
m2 := <-p.inputCh
if err := sink.EmitRow(ctx, topic(`t`), []byte(`3`), nil, zeroTS, zeroTS, pool.alloc()); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.EmitRow(
ctx, topic(`t`), []byte(`3`), nil, zeroTS, zeroTS, pool.alloc()))

m3 := <-p.inputCh
if err := sink.EmitRow(ctx, topic(`t`), []byte(`4`), nil, zeroTS, zeroTS, pool.alloc()); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.EmitRow(
ctx, topic(`t`), []byte(`4`), nil, zeroTS, zeroTS, pool.alloc()))

m4 := <-p.inputCh
go func() { p.successesCh <- m2 }()
go func() {
Expand All @@ -286,19 +274,15 @@ func TestKafkaSink(t *testing.T) {
}
}()
go func() { p.successesCh <- m4 }()
if err := sink.Flush(ctx); !testutils.IsError(err, `m3`) {
t.Fatalf(`expected "m3" error got: %+v`, err)
}
require.Regexp(t, "m3", sink.Flush(ctx))

// Check simple success again after error
if err := sink.EmitRow(ctx, topic(`t`), []byte(`5`), nil, zeroTS, zeroTS, pool.alloc()); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.EmitRow(
ctx, topic(`t`), []byte(`5`), nil, zeroTS, zeroTS, pool.alloc()))

m5 := <-p.inputCh
go func() { p.successesCh <- m5 }()
if err := sink.Flush(ctx); err != nil {
t.Fatal(err)
}
require.NoError(t, sink.Flush(ctx))
// At the end, all of the resources has been released
require.EqualValues(t, 0, pool.used())
}
Expand Down
Loading

0 comments on commit 4d8d0b5

Please sign in to comment.