From e71d2bec0272fe08b34ef5101ca213e01b3e0279 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Tue, 3 Jan 2023 13:15:52 -0500 Subject: [PATCH] Minor refactor of server code. --- pkg/runtime/runtime.go | 12 +++------ pkg/server/config.go | 47 ++++++++++++++++++++++++++------- pkg/server/server.go | 59 +++++++++++++++++++++--------------------- 3 files changed, 71 insertions(+), 47 deletions(-) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 74d69dc4..b2b870e4 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -291,9 +291,9 @@ func decorateLogger(env Env, vat casm.Vat) log.Logger { return log } -func newServerNode(j server.Joiner, ps *pubsub.PubSub) (*server.Node, error) { +func newServerNode(vat casm.Vat, j server.Joiner, ps *pubsub.PubSub) (*server.Node, error) { // TODO: this should use lx.OnStart to benefit from the start timeout. - return j.Join(ps) + return j.Join(vat, ps) } func bootServer(lx fx.Lifecycle, n *server.Node) { @@ -315,12 +315,8 @@ func onclose(c io.Closer) func(context.Context) error { } } -type bootstrappable interface { - Bootstrap(context.Context) error -} - -func bootstrap(b bootstrappable) func(context.Context) error { +func bootstrap(n *server.Node) func(context.Context) error { return func(ctx context.Context) error { - return b.Bootstrap(ctx) + return n.Bootstrap(ctx) } } diff --git a/pkg/server/config.go b/pkg/server/config.go index e57027a3..7087dd32 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -1,45 +1,55 @@ package server import ( + "context" "time" "github.com/lthibault/log" "go.uber.org/fx" + casm "github.com/wetware/casm/pkg" "github.com/wetware/casm/pkg/cluster" "github.com/wetware/casm/pkg/cluster/pulse" "github.com/wetware/casm/pkg/cluster/routing" "github.com/wetware/casm/pkg/debug" ) -type RoutingConfig struct { +type ClusterConfig struct { fx.In - Log log.Logger `optional:"true"` TTL time.Duration `optional:"true"` Meta pulse.Preparer `optional:"true"` RoutingTable cluster.RoutingTable `optional:"true"` } -func (rc RoutingConfig) Bind(r Router, ns string) (*cluster.Router, error) { +func (rc ClusterConfig) New(vat casm.Vat, r Router, log log.Logger) (Cluster, error) { rt := rc.routingTable() - err := r.RegisterTopicValidator(ns, pulse.NewValidator(rt)) + err := r.RegisterTopicValidator(vat.NS, pulse.NewValidator(rt)) if err != nil { return nil, err } - t, err := r.Join(ns) - return &cluster.Router{ + t, err := r.Join(vat.NS) + if err != nil { + return nil, err + } + + cr := &cluster.Router{ Topic: t, - Log: rc.Log, + Log: log, TTL: rc.TTL, Meta: rc.Meta, RoutingTable: rt, - }, err + } + + return router{ + Router: cr, + r: r, + }, nil } -func (rc RoutingConfig) routingTable() cluster.RoutingTable { +func (rc ClusterConfig) routingTable() cluster.RoutingTable { if rc.RoutingTable == nil { rc.RoutingTable = routing.New(time.Now()) } @@ -62,3 +72,22 @@ func (dc DebugConfig) New() *debug.Server { Profiles: dc.Profiles, } } + +// Router binds the lifecycle of CASM's *cluster.Router to that of the +// local Router interface. This is needed because CASM requires us to +// seperately join the cluster topic and register a pulse.Validator on +// startup. In turn, this means we must *deregister* the validator and +// leave the cluster on shutdown. +type router struct { + *cluster.Router + r Router +} + +func (r router) Close() error { + r.Stop() + return r.r.UnregisterTopicValidator(r.String()) +} + +func (r router) Bootstrap(ctx context.Context) error { + return r.Router.Bootstrap(ctx) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index b702b5e6..d310839e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -10,62 +10,62 @@ import ( casm "github.com/wetware/casm/pkg" "github.com/wetware/casm/pkg/cluster" - "github.com/wetware/casm/pkg/cluster/routing" "github.com/wetware/ww/pkg/anchor" "github.com/wetware/ww/pkg/host" "github.com/wetware/ww/pkg/pubsub" ) +// Router provides an interface for routing messages by topic, and supports +// per-message validation. It is used by the Joiner to create the cluster +// topic through which heartbeat messages are routed. type Router interface { Join(string, ...ps.TopicOpt) (*ps.Topic, error) RegisterTopicValidator(topic string, val interface{}, opts ...ps.ValidatorOpt) error UnregisterTopicValidator(topic string) error } -type Node struct { - Vat casm.Vat - cluster *cluster.Router - pubsub interface{ UnregisterTopicValidator(string) error } -} - -func (n Node) ID() routing.ID { - return n.cluster.ID() +// Cluster is a local model of the global Wetware cluster. It models the +// cluster as a PA/EL system and makes no consistency guarantees. +// +// https://en.wikipedia.org/wiki/PACELC_theorem +type Cluster interface { + Bootstrap(context.Context) error + View() cluster.View + String() string + Close() error } -func (n Node) String() string { - return n.cluster.String() +// Node is a peer in the Wetware cluster. Manually populating Node's fields +// is NOT RECOMMENDED. Use Joiner instead. +type Node struct { + Vat casm.Vat + Cluster } func (n Node) Loggable() map[string]any { - return n.cluster.Loggable() -} - -func (n Node) Bootstrap(ctx context.Context) error { - return n.cluster.Bootstrap(ctx) -} - -func (n Node) Close() error { - n.cluster.Stop() - return n.pubsub.UnregisterTopicValidator(n.Vat.NS) + return n.Vat.Loggable() } +// Joiner is a factory type that builds a Node from configuration, +// and joins the cluster. Joiners SHOULD NOT be reused, and should +// be promptly discarded after a call to Join. type Joiner struct { fx.In - Vat casm.Vat - Log log.Logger `optional:"true"` - Router RoutingConfig `optional:"true"` + Router ClusterConfig `optional:"true"` Debugger DebugConfig `optional:"true"` } -func (j Joiner) Join(r Router) (*Node, error) { - c, err := j.Router.Bind(r, j.Vat.NS) +// Join the cluster. Note that callers MUST call Bootstrap() on +// the returned *Node to complete the bootstrap process. +func (j Joiner) Join(vat casm.Vat, r Router) (*Node, error) { + c, err := j.Router.New(vat, r, j.Log) if err != nil { return nil, err } - j.Vat.Export(host.Capability, host.Server{ + vat.Export(host.Capability, host.Server{ ViewProvider: c, PubSubProvider: j.pubsub(r), AnchorProvider: j.anchor(), @@ -73,9 +73,8 @@ func (j Joiner) Join(r Router) (*Node, error) { }) return &Node{ - Vat: j.Vat, - cluster: c, - pubsub: r, + Vat: vat, + Cluster: c, }, nil }