Skip to content

Commit

Permalink
Use telemetry from persistencekit package.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmalloc committed Mar 2, 2024
1 parent 3c696b6 commit d5116c9
Show file tree
Hide file tree
Showing 26 changed files with 60 additions and 865 deletions.
3 changes: 2 additions & 1 deletion cmd/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"os/signal"
"syscall"

"log/slog"

"github.com/dogmatiq/example"
"github.com/dogmatiq/persistencekit/driver/memory/memoryjournal"
"github.com/dogmatiq/persistencekit/driver/memory/memorykv"
"github.com/dogmatiq/veracity"
"golang.org/x/exp/slog"
)

func main() {
Expand Down
3 changes: 2 additions & 1 deletion engineoption.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package veracity

import (
"log/slog"

"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/persistencekit/kv"
"github.com/dogmatiq/veracity/internal/engineconfig"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"
"google.golang.org/grpc"
)

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ require (
github.com/dogmatiq/example v0.0.0-20230606031437-2bd84c72050b
github.com/dogmatiq/ferrite v1.2.1
github.com/dogmatiq/marshalkit v0.7.3
github.com/dogmatiq/persistencekit v0.2.0
github.com/dogmatiq/persistencekit v0.2.1
github.com/dogmatiq/primo v0.2.0
github.com/dogmatiq/spruce v0.1.0
github.com/google/go-cmp v0.6.0
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/metric v1.24.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ github.com/dogmatiq/linger v1.1.0 h1:kGL9sL79qRa6Cr8PhadeJ/ptbum+b48pAaNWWlyVVKg
github.com/dogmatiq/linger v1.1.0/go.mod h1:OOWJUwTxNkFolhuVdaTYjO4FmFLjZHZ8EMc5H5qOJ7Q=
github.com/dogmatiq/marshalkit v0.7.3 h1:kBymR5txcHFBJcYvzld6kFWshqL9YqfBfnFzl9KwEaI=
github.com/dogmatiq/marshalkit v0.7.3/go.mod h1:gGiQXt9aHidlR1GIgHlZxJU8QAd004kFxU6beq+MPmI=
github.com/dogmatiq/persistencekit v0.2.0 h1:XI0ZGfE5xwX0k8fZoT3gq8tuP/Ze2ZRayWiB0zisSvY=
github.com/dogmatiq/persistencekit v0.2.0/go.mod h1:zI5wGbprKk6DnnJRHYvQI1gYGtWgKeEFoWDc2N2S5T4=
github.com/dogmatiq/persistencekit v0.2.1 h1:ExUrNrFKTyYuauvWJ8QgB5v8LRgPlBoMLDM4/yMvdNw=
github.com/dogmatiq/persistencekit v0.2.1/go.mod h1:I/1PayTDy6B21EgmUbOJR40z/Uo0Ewz2Si4RFZf3wt0=
github.com/dogmatiq/primo v0.2.0 h1:XSgal1oykHCFtHvHXdsaSDvQ2x/V/h+clDS1YIqtwHM=
github.com/dogmatiq/primo v0.2.0/go.mod h1:c1EGDvqJQSaIlTxpT1jPgXMBhOXLnQ3jtKPRH5nuUis=
github.com/dogmatiq/projectionkit v0.6.5 h1:3Ues+QL5oVtJcx4WogMA6XjJF1QyOlcx1uRmUrl2ghI=
github.com/dogmatiq/projectionkit v0.6.5/go.mod h1:FfbWIzePx6RDAl0yl/FZ/9UaGq6wkEDKvfeDi3dg4EE=
github.com/dogmatiq/spruce v0.1.0 h1:xIcWPJA33Et+qIC1RORjP+8gSNtErdcTr0eEbtFk2oU=
github.com/dogmatiq/spruce v0.1.0/go.mod h1:0+zqOtlidouuzQr2k2Od7RRFR7Sk4p373kVyTtH6ovw=
github.com/dogmatiq/sqltest v0.3.0 h1:DCwyLWfVk/ZHsqq5Itq3H/Lqsh/CIQ6nIRwI4YLywFc=
github.com/dogmatiq/sqltest v0.3.0/go.mod h1:a8Da8NhU4m3lq5Sybhiv+ZQowSnGHWTIJHFNInVtffg=
github.com/dogmatiq/testkit v0.13.11 h1:ikXg/Cxq58tzHL27JKCkVqUUElJCHcso7N/ymd3Wins=
Expand Down
8 changes: 4 additions & 4 deletions internal/cluster/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cluster
import (
"context"
"errors"
"log/slog"
"strings"
"time"

Expand All @@ -12,7 +13,6 @@ import (
"github.com/dogmatiq/veracity/internal/fsm"
"github.com/dogmatiq/veracity/internal/protobuf/protokv"
"github.com/dogmatiq/veracity/internal/signaling"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -84,7 +84,7 @@ func (r *Registrar) register(ctx context.Context) error {
return err
}

r.Logger.DebugCtx(
r.Logger.DebugContext(
ctx,
"cluster node registered",
slog.String("node_id", r.Node.ID.AsString()),
Expand All @@ -102,7 +102,7 @@ func (r *Registrar) deregister(ctx context.Context) error {
return err
}

r.Logger.DebugCtx(
r.Logger.DebugContext(
ctx,
"cluster node deregistered",
slog.String("node_id", r.Node.ID.AsString()),
Expand Down Expand Up @@ -137,7 +137,7 @@ func (r *Registrar) renew(ctx context.Context) error {
return err
}

r.Logger.DebugCtx(
r.Logger.DebugContext(
ctx,
"cluster node registration renewed",
slog.String("node_id", r.Node.ID.AsString()),
Expand Down
3 changes: 2 additions & 1 deletion internal/cluster/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/driver/memory/memorykv"
"github.com/dogmatiq/spruce"
. "github.com/dogmatiq/veracity/internal/cluster"
"github.com/dogmatiq/veracity/internal/test"
)
Expand Down Expand Up @@ -35,7 +36,7 @@ func TestRegistry(t *testing.T) {
Keyspaces: keyspaces,
Node: deps.Node,
RenewInterval: 10 * time.Millisecond,
Logger: test.NewLogger(t),
Logger: spruce.NewLogger(t),
}

deps.MembershipChanged = make(chan MembershipChanged)
Expand Down
21 changes: 12 additions & 9 deletions internal/engineconfig/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/dogmatiq/ferrite"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/persistencekit/kv"
"github.com/dogmatiq/veracity/internal/telemetry/instrumentedpersistence"
)

// journalStoreDSN is the DSN describing which journal store to use.
Expand Down Expand Up @@ -52,13 +51,17 @@ func (c *Config) finalizePersistence() {
panic("no key/value store is configured, set VERACITY_KV_DSN or provide the WithKeyValueStore() option")
}

c.Persistence.Journals = &instrumentedpersistence.JournalStore{
Next: c.Persistence.Journals,
Telemetry: c.Telemetry,
}
c.Persistence.Journals = journal.WithTelemetry(
c.Persistence.Journals,
c.Telemetry.TracerProvider,
c.Telemetry.MeterProvider,
c.Telemetry.Logger,
)

c.Persistence.Keyspaces = &instrumentedpersistence.KeyValueStore{
Next: c.Persistence.Keyspaces,
Telemetry: c.Telemetry,
}
c.Persistence.Keyspaces = kv.WithTelemetry(
c.Persistence.Keyspaces,
c.Telemetry.TracerProvider,
c.Telemetry.MeterProvider,
c.Telemetry.Logger,
)
}
3 changes: 2 additions & 1 deletion internal/engineconfig/telemetry.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package engineconfig

import (
"log/slog"

"github.com/dogmatiq/veracity/internal/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"
)

func (c *Config) finalizeTelemetry() {
Expand Down
3 changes: 2 additions & 1 deletion internal/eventstream/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
. "github.com/dogmatiq/marshalkit/fixtures"
"github.com/dogmatiq/persistencekit/driver/memory/memoryjournal"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/spruce"
"github.com/dogmatiq/veracity/internal/envelope"
. "github.com/dogmatiq/veracity/internal/eventstream"
"github.com/dogmatiq/veracity/internal/eventstream/internal/journalpb"
Expand All @@ -37,7 +38,7 @@ func TestAppend(t *testing.T) {
deps.Supervisor = &Supervisor{
Journals: deps.Journals,
Events: events,
Logger: test.NewLogger(t),
Logger: spruce.NewLogger(t),
}

deps.Events = events
Expand Down
2 changes: 1 addition & 1 deletion internal/eventstream/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package eventstream
import (
"context"
"errors"
"log/slog"

"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"github.com/dogmatiq/persistencekit/journal"
"github.com/dogmatiq/veracity/internal/fsm"
"github.com/dogmatiq/veracity/internal/messaging"
"github.com/dogmatiq/veracity/internal/signaling"
"golang.org/x/exp/slog"
)

// errShuttingDown is sent in response to append requests that are not serviced
Expand Down
10 changes: 5 additions & 5 deletions internal/eventstream/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eventstream

import (
"context"
"log/slog"
"time"

"github.com/dogmatiq/persistencekit/journal"
Expand All @@ -10,7 +11,6 @@ import (
"github.com/dogmatiq/veracity/internal/messaging"
"github.com/dogmatiq/veracity/internal/protobuf/protojournal"
"github.com/dogmatiq/veracity/internal/signaling"
"golang.org/x/exp/slog"
)

const defaultIdleTimeout = 5 * time.Minute
Expand Down Expand Up @@ -46,8 +46,8 @@ type worker struct {
// It processes requests until ctx is canceled, r.Shutdown is latched, or
// an error occurrs.
func (w *worker) Run(ctx context.Context) (err error) {
w.Logger.DebugCtx(ctx, "event stream worker started")
defer w.Logger.DebugCtx(ctx, "event stream worker stopped")
w.Logger.DebugContext(ctx, "event stream worker started")
defer w.Logger.DebugContext(ctx, "event stream worker stopped")

pos, rec, ok, err := protojournal.GetLatest[*journalpb.Record](ctx, w.Journal)
if err != nil {
Expand Down Expand Up @@ -151,7 +151,7 @@ func (w *worker) appendEvents(

if ok {
for i, e := range req.Events {
w.Logger.WarnCtx(
w.Logger.WarnContext(
ctx,
"ignored event that has already been appended to the stream",
slog.Uint64("stream_offset", uint64(rec.StreamOffsetBefore)+uint64(i)),
Expand Down Expand Up @@ -188,7 +188,7 @@ func (w *worker) appendEvents(
}

for i, e := range req.Events {
w.Logger.InfoCtx(
w.Logger.InfoContext(
ctx,
"appended event to the stream",
slog.Uint64("stream_offset", uint64(before)+uint64(i)),
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package integration

import (
"context"
"slices"

"github.com/dogmatiq/dogma"
"github.com/dogmatiq/enginekit/protobuf/envelopepb"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/dogmatiq/veracity/internal/messaging"
"github.com/dogmatiq/veracity/internal/protobuf/protojournal"
"github.com/dogmatiq/veracity/internal/signaling"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/proto"
)

Expand Down
3 changes: 1 addition & 2 deletions internal/messaging/topic.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package messaging

import (
"slices"
"sync"

"golang.org/x/exp/slices"
)

// Topic is a source of messages of type T.
Expand Down
3 changes: 2 additions & 1 deletion internal/optimistic/orderedset.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package optimistic

import (
"slices"

"github.com/dogmatiq/enginekit/protobuf/uuidpb"
"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"
)

// OrderedSet is a read-optimized lock-free ordered set.
Expand Down
16 changes: 11 additions & 5 deletions internal/optimistic/orderedset_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package optimistic_test

import (
"slices"
"testing"

. "github.com/dogmatiq/veracity/internal/optimistic"
"github.com/dogmatiq/veracity/internal/test"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"pgregory.net/rapid"
)

Expand All @@ -16,6 +15,13 @@ func TestOrderedSet(t *testing.T) {
rapid.Check(t, func(t *rapid.T) {
var set OrderedSet[int8, OrderedComparator[int8]]
members := map[int8]struct{}{}
keys := func() []int8 {
var keys []int8
for k := range members {
keys = append(keys, k)
}
return keys
}

t.Repeat(
map[string]func(*rapid.T){
Expand All @@ -27,7 +33,7 @@ func TestOrderedSet(t *testing.T) {
len(members),
)

sorted := maps.Keys(members)
sorted := keys()
slices.Sort(sorted)

test.Expect(
Expand Down Expand Up @@ -56,7 +62,7 @@ func TestOrderedSet(t *testing.T) {
}

m := rapid.
SampledFrom(maps.Keys(members)).
SampledFrom(keys()).
Draw(t, "member")

set.Add(m)
Expand All @@ -67,7 +73,7 @@ func TestOrderedSet(t *testing.T) {
}

m := rapid.
SampledFrom(maps.Keys(members)).
SampledFrom(keys()).
Draw(t, "member")

set.Delete(m)
Expand Down
2 changes: 1 addition & 1 deletion internal/telemetry/attribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package telemetry

import (
"fmt"
"log/slog"
"math"
"reflect"

"go.opentelemetry.io/otel/attribute"
"golang.org/x/exp/constraints"
"golang.org/x/exp/slog"
)

// Attr is a telemetry attribute.
Expand Down
3 changes: 0 additions & 3 deletions internal/telemetry/instrumentedpersistence/doc.go

This file was deleted.

23 changes: 0 additions & 23 deletions internal/telemetry/instrumentedpersistence/handle.go

This file was deleted.

Loading

0 comments on commit d5116c9

Please sign in to comment.