Skip to content

Commit

Permalink
*: support get-old-kv in watch
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Jul 5, 2016
1 parent c8c5f41 commit 4b453d5
Show file tree
Hide file tree
Showing 12 changed files with 381 additions and 231 deletions.
11 changes: 10 additions & 1 deletion Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@
"preserveKVs": {
"type": "boolean",
"format": "boolean",
"description": "If preserveKVs is set, the deleted KVs will be preserved.\nThe preserved KVs will be returned as response.\nIt requires read permission to read the deleted KVs."
"description": "If preserveKVs is set, the deleted KVs will be preserved for delete events\nThe preserved KVs will be returned as response.\nIt requires read permission to read the deleted KVs."
},
"range_end": {
"type": "string",
Expand Down Expand Up @@ -1199,6 +1199,11 @@
},
"description": "filters filter the events at server side before it sends back to the watcher."
},
"getOldKV": {
"type": "boolean",
"format": "boolean",
"description": "if getOldKV is set, created watcher will always try to get the KV before the event happens."
},
"key": {
"type": "string",
"format": "byte",
Expand Down Expand Up @@ -1273,6 +1278,10 @@
"$ref": "#/definitions/mvccpbKeyValue",
"description": "kv holds the KeyValue for the event.\nA PUT event contains current kv pair.\nA PUT event with kv.Version=1 indicates the creation of a key.\nA DELETE/EXPIRE event contains the deleted key with\nits modification revision set to the revision of deletion."
},
"oldKV": {
"$ref": "#/definitions/mvccpbKeyValue",
"description": "oldkv holds the KeyValue before the event happens when requested."
},
"type": {
"$ref": "#/definitions/EventEventType",
"description": "type is the kind of event. If type is a PUT, it indicates\nnew data has been stored to the key. If type is a DELETE,\nit indicates the key was deleted."
Expand Down
10 changes: 10 additions & 0 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Op struct {
// for range, watch
rev int64

getOldKV bool

// for delete
preserveKVs bool

Expand Down Expand Up @@ -276,3 +278,11 @@ func WithProgressNotify() OpOption {
op.progressNotify = true
}
}

// WithGetOldKV gets the old kv before the event happens. If the old KV is already compacted,
// nothing will be returned.
func WithGetOldKV() OpOption {
return func(op *Op) {
op.getOldKV = true
}
}
6 changes: 5 additions & 1 deletion clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ type watchRequest struct {
key string
end string
rev int64
// progressNotify is for progress updates.
// progressNotify is for progress updates
progressNotify bool
// get kv before the event happens
getOldKV bool
// retc receives a chan WatchResponse once the watcher is established
retc chan chan WatchResponse
}
Expand Down Expand Up @@ -209,6 +211,7 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
getOldKV: ow.getOldKV,
retc: retc,
}

Expand Down Expand Up @@ -682,6 +685,7 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
Key: []byte(wr.key),
RangeEnd: []byte(wr.end),
ProgressNotify: wr.progressNotify,
GetOldKV: wr.getOldKV,
}
cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
Expand Down
4 changes: 3 additions & 1 deletion etcdctl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ Watch watches events stream on keys or prefixes, [key or prefix, range_end) if `
- prefix -- watch on a prefix if prefix is set.
- get-old-key -- get the old kv before the event happens.
- rev -- the revision to start watching. Specifying a revision is useful for observing past events.
#### Input Format
Expand All @@ -245,7 +247,7 @@ watch [options] <key or prefix>\n
##### Simple reply
- \<event\>\n\<key\>\n\<value\>\n\<event\>\n\<next_key\>\n\<next_value\>\n...
- \<event\>[\n\<old_key\>\n\<old_value\>]\n\<key\>\n\<value\>\n\<event\>\n\<next_key\>\n\<next_value\>\n...
- Additional error string if WATCH failed. Exit code is non-zero.
Expand Down
3 changes: 3 additions & 0 deletions etcdctl/ctlv3/command/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func (s *simplePrinter) Txn(resp v3.TxnResponse) {
func (s *simplePrinter) Watch(resp v3.WatchResponse) {
for _, e := range resp.Events {
fmt.Println(e.Type)
if e.OldKV != nil {
printKV(s.isHex, e.OldKV)
}
printKV(s.isHex, e.Kv)
}
}
Expand Down
6 changes: 6 additions & 0 deletions etcdctl/ctlv3/command/watch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (
watchRev int64
watchPrefix bool
watchInteractive bool
watchGetOldKey bool
)

// NewWatchCommand returns the cobra command for "watch".
Expand All @@ -42,6 +43,7 @@ func NewWatchCommand() *cobra.Command {
cmd.Flags().BoolVarP(&watchInteractive, "interactive", "i", false, "Interactive mode")
cmd.Flags().BoolVar(&watchPrefix, "prefix", false, "Watch on a prefix if prefix is set")
cmd.Flags().Int64Var(&watchRev, "rev", 0, "Revision to start watching")
cmd.Flags().BoolVar(&watchGetOldKey, "get-old-key", false, "get the old kv before the event happens")

return cmd
}
Expand All @@ -68,6 +70,10 @@ func watchCommandFunc(cmd *cobra.Command, args []string) {
if watchPrefix {
opts = append(opts, clientv3.WithPrefix())
}
if watchGetOldKey {
opts = append(opts, clientv3.WithGetOldKV())
}

c := mustClientFromCmd(cmd)
wc := c.Watch(context.TODO(), key, opts...)
printWatchCh(wc)
Expand Down
35 changes: 29 additions & 6 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type watchServer struct {
clusterID int64
memberID int64
raftTimer etcdserver.RaftTimer
watchable mvcc.Watchable
watchable mvcc.WatchableKV
}

func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
Expand Down Expand Up @@ -82,13 +82,18 @@ type serverWatchStream struct {
memberID int64
raftTimer etcdserver.RaftTimer

watchable mvcc.WatchableKV

gRPCStream pb.Watch_WatchServer
watchStream mvcc.WatchStream
ctrlStream chan *pb.WatchResponse

// progress tracks the watchID that stream might need to send
// progress to.
// TOOD: combine progress and getOldKV into a single struct?
progress map[mvcc.WatchID]bool
getOldKV map[mvcc.WatchID]bool

// mu protects progress
mu sync.Mutex

Expand All @@ -101,14 +106,18 @@ type serverWatchStream struct {

func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
sws := serverWatchStream{
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,
clusterID: ws.clusterID,
memberID: ws.memberID,
raftTimer: ws.raftTimer,

watchable: ws.watchable,

gRPCStream: stream,
watchStream: ws.watchable.NewWatchStream(),
// chan for sending control response like watcher created and canceled.
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
progress: make(map[mvcc.WatchID]bool),
getOldKV: make(map[mvcc.WatchID]bool),
closec: make(chan struct{}),
}

Expand Down Expand Up @@ -181,8 +190,13 @@ func (sws *serverWatchStream) recvLoop() error {
rev = wsrev + 1
}
id := sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev, filters...)
if id != -1 && creq.ProgressNotify {
sws.progress[id] = true
if id != -1 {
if creq.ProgressNotify {
sws.progress[id] = true
}
if creq.GetOldKV {
sws.getOldKV[id] = true
}
}
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
Expand All @@ -207,6 +221,7 @@ func (sws *serverWatchStream) recvLoop() error {
}
sws.mu.Lock()
delete(sws.progress, mvcc.WatchID(id))
delete(sws.getOldKV, mvcc.WatchID(id))
sws.mu.Unlock()
}
}
Expand Down Expand Up @@ -253,6 +268,14 @@ func (sws *serverWatchStream) sendLoop() {
events := make([]*mvccpb.Event, len(evs))
for i := range evs {
events[i] = &evs[i]

if sws.getOldKV[wresp.WatchID] {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].OldKV = &(r.KVs[0])
}
}
}

wr := &pb.WatchResponse{
Expand Down
Loading

0 comments on commit 4b453d5

Please sign in to comment.