Skip to content

Commit

Permalink
Minor refactor of server code.
Browse files Browse the repository at this point in the history
  • Loading branch information
lthibault committed Jan 3, 2023
1 parent f090622 commit e71d2be
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 47 deletions.
12 changes: 4 additions & 8 deletions pkg/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ func decorateLogger(env Env, vat casm.Vat) log.Logger {
return log
}

func newServerNode(j server.Joiner, ps *pubsub.PubSub) (*server.Node, error) {
func newServerNode(vat casm.Vat, j server.Joiner, ps *pubsub.PubSub) (*server.Node, error) {
// TODO: this should use lx.OnStart to benefit from the start timeout.
return j.Join(ps)
return j.Join(vat, ps)
}

func bootServer(lx fx.Lifecycle, n *server.Node) {
Expand All @@ -315,12 +315,8 @@ func onclose(c io.Closer) func(context.Context) error {
}
}

type bootstrappable interface {
Bootstrap(context.Context) error
}

func bootstrap(b bootstrappable) func(context.Context) error {
func bootstrap(n *server.Node) func(context.Context) error {
return func(ctx context.Context) error {
return b.Bootstrap(ctx)
return n.Bootstrap(ctx)
}
}
47 changes: 38 additions & 9 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,55 @@
package server

import (
"context"
"time"

"github.com/lthibault/log"
"go.uber.org/fx"

casm "github.com/wetware/casm/pkg"
"github.com/wetware/casm/pkg/cluster"
"github.com/wetware/casm/pkg/cluster/pulse"
"github.com/wetware/casm/pkg/cluster/routing"
"github.com/wetware/casm/pkg/debug"
)

type RoutingConfig struct {
type ClusterConfig struct {
fx.In

Log log.Logger `optional:"true"`
TTL time.Duration `optional:"true"`
Meta pulse.Preparer `optional:"true"`
RoutingTable cluster.RoutingTable `optional:"true"`
}

func (rc RoutingConfig) Bind(r Router, ns string) (*cluster.Router, error) {
func (rc ClusterConfig) New(vat casm.Vat, r Router, log log.Logger) (Cluster, error) {
rt := rc.routingTable()

err := r.RegisterTopicValidator(ns, pulse.NewValidator(rt))
err := r.RegisterTopicValidator(vat.NS, pulse.NewValidator(rt))
if err != nil {
return nil, err
}

t, err := r.Join(ns)
return &cluster.Router{
t, err := r.Join(vat.NS)
if err != nil {
return nil, err
}

cr := &cluster.Router{
Topic: t,
Log: rc.Log,
Log: log,
TTL: rc.TTL,
Meta: rc.Meta,
RoutingTable: rt,
}, err
}

return router{
Router: cr,
r: r,
}, nil
}

func (rc RoutingConfig) routingTable() cluster.RoutingTable {
func (rc ClusterConfig) routingTable() cluster.RoutingTable {
if rc.RoutingTable == nil {
rc.RoutingTable = routing.New(time.Now())
}
Expand All @@ -62,3 +72,22 @@ func (dc DebugConfig) New() *debug.Server {
Profiles: dc.Profiles,
}
}

// Router binds the lifecycle of CASM's *cluster.Router to that of the
// local Router interface. This is needed because CASM requires us to
// seperately join the cluster topic and register a pulse.Validator on
// startup. In turn, this means we must *deregister* the validator and
// leave the cluster on shutdown.
type router struct {
*cluster.Router
r Router
}

func (r router) Close() error {
r.Stop()
return r.r.UnregisterTopicValidator(r.String())
}

func (r router) Bootstrap(ctx context.Context) error {
return r.Router.Bootstrap(ctx)
}
59 changes: 29 additions & 30 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,72 +10,71 @@ import (

casm "github.com/wetware/casm/pkg"
"github.com/wetware/casm/pkg/cluster"
"github.com/wetware/casm/pkg/cluster/routing"
"github.com/wetware/ww/pkg/anchor"
"github.com/wetware/ww/pkg/host"
"github.com/wetware/ww/pkg/pubsub"
)

// Router provides an interface for routing messages by topic, and supports
// per-message validation. It is used by the Joiner to create the cluster
// topic through which heartbeat messages are routed.
type Router interface {
Join(string, ...ps.TopicOpt) (*ps.Topic, error)
RegisterTopicValidator(topic string, val interface{}, opts ...ps.ValidatorOpt) error
UnregisterTopicValidator(topic string) error
}

type Node struct {
Vat casm.Vat
cluster *cluster.Router
pubsub interface{ UnregisterTopicValidator(string) error }
}

func (n Node) ID() routing.ID {
return n.cluster.ID()
// Cluster is a local model of the global Wetware cluster. It models the
// cluster as a PA/EL system and makes no consistency guarantees.
//
// https://en.wikipedia.org/wiki/PACELC_theorem
type Cluster interface {
Bootstrap(context.Context) error
View() cluster.View
String() string
Close() error
}

func (n Node) String() string {
return n.cluster.String()
// Node is a peer in the Wetware cluster. Manually populating Node's fields
// is NOT RECOMMENDED. Use Joiner instead.
type Node struct {
Vat casm.Vat
Cluster
}

func (n Node) Loggable() map[string]any {
return n.cluster.Loggable()
}

func (n Node) Bootstrap(ctx context.Context) error {
return n.cluster.Bootstrap(ctx)
}

func (n Node) Close() error {
n.cluster.Stop()
return n.pubsub.UnregisterTopicValidator(n.Vat.NS)
return n.Vat.Loggable()
}

// Joiner is a factory type that builds a Node from configuration,
// and joins the cluster. Joiners SHOULD NOT be reused, and should
// be promptly discarded after a call to Join.
type Joiner struct {
fx.In

Vat casm.Vat

Log log.Logger `optional:"true"`
Router RoutingConfig `optional:"true"`
Router ClusterConfig `optional:"true"`
Debugger DebugConfig `optional:"true"`
}

func (j Joiner) Join(r Router) (*Node, error) {
c, err := j.Router.Bind(r, j.Vat.NS)
// Join the cluster. Note that callers MUST call Bootstrap() on
// the returned *Node to complete the bootstrap process.
func (j Joiner) Join(vat casm.Vat, r Router) (*Node, error) {
c, err := j.Router.New(vat, r, j.Log)
if err != nil {
return nil, err
}

j.Vat.Export(host.Capability, host.Server{
vat.Export(host.Capability, host.Server{
ViewProvider: c,
PubSubProvider: j.pubsub(r),
AnchorProvider: j.anchor(),
DebugProvider: j.Debugger.New(),
})

return &Node{
Vat: j.Vat,
cluster: c,
pubsub: r,
Vat: vat,
Cluster: c,
}, nil
}

Expand Down

0 comments on commit e71d2be

Please sign in to comment.