Skip to content

Commit

Permalink
Project import generated by Copybara.
Browse files Browse the repository at this point in the history
FolderOrigin-RevId: /usr/local/google/home/robjs/copybara/temp/folder-destination9858892572349391303/.
  • Loading branch information
Googler authored and robshakir committed Mar 19, 2024
1 parent 255e855 commit 7c2aef8
Show file tree
Hide file tree
Showing 30 changed files with 5,325 additions and 852 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci-cpp-build-gnmi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ jobs:
runs-on: ubuntu-latest
env:
BAZEL: bazelisk-linux-amd64
USE_BAZEL_VERSION: 6.4.0
steps:
- uses: actions/checkout@v2
with:
Expand Down
93 changes: 77 additions & 16 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
log "github.com/golang/glog"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
"bitbucket.org/creachadair/stringset"
"github.com/openconfig/gnmi/ctree"
"github.com/openconfig/gnmi/errlist"
"github.com/openconfig/gnmi/latency"
Expand All @@ -50,18 +51,22 @@ var (

// ErrStale is the error returned if an update is stale.
ErrStale = errors.New("update is stale")
// ErrFuture is the error returned if an update is too far in the future.
ErrFuture = errors.New("update is too far in the future")
)

// A Target hosts an indexed cache of state for a single target.
type Target struct {
name string // name of the target
t *ctree.Tree // actual cache of target data
client func(*ctree.Leaf) // Function to pass all cache updates to.
sync bool // denotes whether this cache is in sync with target
meta *metadata.Metadata // metadata associated with target
lat *latency.Latency // latency measurements
tsmu sync.Mutex // protects latest timestamp
ts time.Time // latest timestamp for an update
name string // name of the target
t *ctree.Tree // actual cache of target data
client func(*ctree.Leaf) // Function to pass all cache updates to.
sync bool // denotes whether this cache is in sync with target
meta *metadata.Metadata // metadata associated with target
lat *latency.Latency // latency measurements
tsmu sync.Mutex // protects latest timestamp
ts time.Time // latest timestamp for an update
excludedMeta stringset.Set // set of metadata not to generate update for
futureThreshold time.Duration // how far in the future an update can be accepted
}

// Name returns the name of the target.
Expand All @@ -76,6 +81,8 @@ type options struct {
latencyWindows []time.Duration
avgLatencyPrecision time.Duration
serverName string
excludedUpdateMeta stringset.Set
futureThreshold time.Duration
}

// Option defines the function prototype to set options for creating a Cache.
Expand Down Expand Up @@ -114,6 +121,24 @@ func WithServerName(serverName string) Option {
}
}

// WithExcludedMeta returns an Option to exclude a set of metadata from
// generating updates in the Cache. These metadatas are still maintained
// internally and available for other uses.
func WithExcludedMeta(excluded []string) Option {
return func(o *options) {
o.excludedUpdateMeta = stringset.New(excluded...)
}
}

// WithFutureThreshold returns an Option to set the threshold for how far the
// timestamp of an update is in the future that still allows the update to be
// accepted in Cache.
func WithFutureThreshold(futureThreshold time.Duration) Option {
return func(o *options) {
o.futureThreshold = futureThreshold
}
}

// Cache is a structure holding state information for multiple targets.
type Cache struct {
opts options
Expand Down Expand Up @@ -255,11 +280,13 @@ func (c *Cache) Add(target string) *Target {
latOpts = &latency.Options{AvgPrecision: c.opts.avgLatencyPrecision}
}
t := &Target{
t: &ctree.Tree{},
name: target,
meta: metadata.New(),
client: c.client,
lat: latency.New(c.opts.latencyWindows, latOpts),
t: &ctree.Tree{},
name: target,
meta: metadata.New(),
client: c.client,
lat: latency.New(c.opts.latencyWindows, latOpts),
excludedMeta: c.opts.excludedUpdateMeta,
futureThreshold: c.opts.futureThreshold,
}
t.meta.SetStr(metadata.ServerName, c.opts.serverName)
c.targets[target] = t
Expand Down Expand Up @@ -344,10 +371,15 @@ func (c *Cache) GnmiUpdate(n *pb.Notification) error {
// each individual Update/Delete is sent to cache as
// a separate gnmi.Notification.
func (t *Target) GnmiUpdate(n *pb.Notification) error {
updateTS := false
if u := n.GetUpdate(); len(u) > 0 {
if p := u[0].GetPath().GetElem(); len(p) > 0 && p[0].GetName() != metadata.Root {
// Record latest timestamp from the device, excluding all 'meta' paths.
t.checkTimestamp(T(n.GetTimestamp()))
defer func(ts int64) {
if updateTS {
t.checkTimestamp(T(ts))
}
}(n.GetTimestamp())
}
}
switch {
Expand All @@ -365,6 +397,7 @@ func (t *Target) GnmiUpdate(n *pb.Notification) error {
if err != nil {
return err
}
updateTS = true
if nd != nil {
t.meta.AddInt(metadata.UpdateCount, int64(l))
t.client(nd)
Expand All @@ -389,6 +422,7 @@ func (t *Target) GnmiUpdate(n *pb.Notification) error {
errs.Add(err)
continue
}
updateTS = true
if nd != nil {
t.meta.AddInt(metadata.UpdateCount, 1)
t.client(nd)
Expand All @@ -412,6 +446,7 @@ func (t *Target) GnmiUpdate(n *pb.Notification) error {
if err != nil {
return err
}
updateTS = true
if nd != nil {
t.meta.AddInt(metadata.UpdateCount, 1)
t.client(nd)
Expand Down Expand Up @@ -490,7 +525,7 @@ func (t *Target) gnmiUpdate(n *pb.Notification) (*ctree.Leaf, error) {
if !ok {
return nil, fmt.Errorf("corrupt schema with collision for path %q, got %T", path, oldval.Value())
}
switch {
switch nts := T(n.GetTimestamp()); {
case n.GetTimestamp() < old.GetTimestamp():
// Update rejected. Timestamp < previous recorded timestamp.
t.meta.AddInt(metadata.StaleCount, 1)
Expand All @@ -505,6 +540,19 @@ func (t *Target) gnmiUpdate(n *pb.Notification) (*ctree.Leaf, error) {
t.meta.AddInt(metadata.StaleCount, 1)
return nil, ErrStale
}
case t.futureThreshold > 0 && nts.Sub(Now()) > t.futureThreshold:
if t.ts.UnixNano() <= 0 {
// This is the first accepted update as t.ts is uninitialized (assuming
// the first accepted timestamp is > 0 (1970-01-01 00:00:00 UTC).
log.Warningf("Accepting the first update with a timestamp in the future %s", prototext.Format(n))
} else if nts.Sub(t.ts) <= t.futureThreshold {
if log.V(1) {
log.Warningf("Accepting non-first update with a timestamp in the future but not exceeding the threshold comparing with latestTimestamp (%v): %s", t.ts, prototext.Format(n))
}
} else {
t.meta.AddInt(metadata.FutureCount, 1)
return nil, ErrFuture
}
}
oldval.Update(n)
// Simulate event-driven for all non-atomic updates.
Expand Down Expand Up @@ -614,9 +662,16 @@ func (t *Target) updateMeta(clients func(*ctree.Leaf)) {
latest := t.ts
t.tsmu.Unlock()
t.meta.SetInt(metadata.LatestTimestamp, latest.UnixNano())

t.lat.UpdateReset(t.meta)

t.generateMetaUpdates(clients)
}

func (t *Target) generateMetaUpdates(clients func(*ctree.Leaf)) {
for value := range metadata.TargetBoolValues {
if t.excludedMeta.Contains(value) {
continue
}
v, err := t.meta.GetBool(value)
if err != nil {
continue
Expand All @@ -634,6 +689,9 @@ func (t *Target) updateMeta(clients func(*ctree.Leaf)) {
}

for value := range metadata.TargetIntValues {
if t.excludedMeta.Contains(value) {
continue
}
v, err := t.meta.GetInt(value)
if err != nil {
continue
Expand All @@ -651,6 +709,9 @@ func (t *Target) updateMeta(clients func(*ctree.Leaf)) {
}

for value := range metadata.TargetStrValues {
if t.excludedMeta.Contains(value) {
continue
}
v, err := t.meta.GetStr(value)
if err != nil {
continue
Expand Down
Loading

0 comments on commit 7c2aef8

Please sign in to comment.