Skip to content

Commit

Permalink
update timestamp if exists
Browse files Browse the repository at this point in the history
Signed-off-by: kevindiu <[email protected]>
  • Loading branch information
kevindiu committed Sep 19, 2023
1 parent 4ada1d4 commit 4186158
Show file tree
Hide file tree
Showing 10 changed files with 612 additions and 367 deletions.
740 changes: 413 additions & 327 deletions apis/grpc/v1/payload/payload.pb.go

Large diffs are not rendered by default.

90 changes: 82 additions & 8 deletions apis/grpc/v1/payload/payload_vtproto.pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,11 @@ func (m *Update_Config) CloneVT() *Update_Config {
return (*Update_Config)(nil)
}
r := &Update_Config{
SkipStrictExistCheck: m.SkipStrictExistCheck,
Filters: m.Filters.CloneVT(),
Timestamp: m.Timestamp,
DisableBalancedUpdate: m.DisableBalancedUpdate,
SkipStrictExistCheck: m.SkipStrictExistCheck,
Filters: m.Filters.CloneVT(),
Timestamp: m.Timestamp,
DisableBalancedUpdate: m.DisableBalancedUpdate,
UpdateTimestampIfExists: m.UpdateTimestampIfExists,
}
if len(m.unknownFields) > 0 {
r.unknownFields = make([]byte, len(m.unknownFields))
Expand Down Expand Up @@ -698,10 +699,11 @@ func (m *Upsert_Config) CloneVT() *Upsert_Config {
return (*Upsert_Config)(nil)
}
r := &Upsert_Config{
SkipStrictExistCheck: m.SkipStrictExistCheck,
Filters: m.Filters.CloneVT(),
Timestamp: m.Timestamp,
DisableBalancedUpdate: m.DisableBalancedUpdate,
SkipStrictExistCheck: m.SkipStrictExistCheck,
Filters: m.Filters.CloneVT(),
Timestamp: m.Timestamp,
DisableBalancedUpdate: m.DisableBalancedUpdate,
UpdateTimestampIfExists: m.UpdateTimestampIfExists,
}
if len(m.unknownFields) > 0 {
r.unknownFields = make([]byte, len(m.unknownFields))
Expand Down Expand Up @@ -2432,6 +2434,9 @@ func (this *Update_Config) EqualVT(that *Update_Config) bool {
if this.DisableBalancedUpdate != that.DisableBalancedUpdate {
return false
}
if this.UpdateTimestampIfExists != that.UpdateTimestampIfExists {
return false
}
return string(this.unknownFields) == string(that.unknownFields)
}

Expand Down Expand Up @@ -2589,6 +2594,9 @@ func (this *Upsert_Config) EqualVT(that *Upsert_Config) bool {
if this.DisableBalancedUpdate != that.DisableBalancedUpdate {
return false
}
if this.UpdateTimestampIfExists != that.UpdateTimestampIfExists {
return false
}
return string(this.unknownFields) == string(that.unknownFields)
}

Expand Down Expand Up @@ -5173,6 +5181,16 @@ func (m *Update_Config) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if m.UpdateTimestampIfExists {
i--
if m.UpdateTimestampIfExists {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x28
}
if m.DisableBalancedUpdate {
i--
if m.DisableBalancedUpdate {
Expand Down Expand Up @@ -5480,6 +5498,16 @@ func (m *Upsert_Config) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if m.UpdateTimestampIfExists {
i--
if m.UpdateTimestampIfExists {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x28
}
if m.DisableBalancedUpdate {
i--
if m.DisableBalancedUpdate {
Expand Down Expand Up @@ -8211,6 +8239,9 @@ func (m *Update_Config) SizeVT() (n int) {
if m.DisableBalancedUpdate {
n += 2
}
if m.UpdateTimestampIfExists {
n += 2
}
n += len(m.unknownFields)
return n
}
Expand Down Expand Up @@ -8316,6 +8347,9 @@ func (m *Upsert_Config) SizeVT() (n int) {
if m.DisableBalancedUpdate {
n += 2
}
if m.UpdateTimestampIfExists {
n += 2
}
n += len(m.unknownFields)
return n
}
Expand Down Expand Up @@ -11926,6 +11960,26 @@ func (m *Update_Config) UnmarshalVT(dAtA []byte) error {
}
}
m.DisableBalancedUpdate = bool(v != 0)
case 5:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field UpdateTimestampIfExists", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.UpdateTimestampIfExists = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
Expand Down Expand Up @@ -12575,6 +12629,26 @@ func (m *Upsert_Config) UnmarshalVT(dAtA []byte) error {
}
}
m.DisableBalancedUpdate = bool(v != 0)
case 5:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field UpdateTimestampIfExists", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.UpdateTimestampIfExists = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
Expand Down
48 changes: 38 additions & 10 deletions apis/swagger/v1/vald/apis/proto/v1/vald/filter.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
"title": "apis/proto/v1/vald/filter.proto",
"version": "version not set"
},
"consumes": ["application/json"],
"produces": ["application/json"],
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"paths": {
"/insert/object": {
"post": {
Expand Down Expand Up @@ -35,7 +39,9 @@
}
}
],
"tags": ["Filter"]
"tags": [
"Filter"
]
}
},
"/insert/object/multiple": {
Expand Down Expand Up @@ -66,7 +72,9 @@
}
}
],
"tags": ["Filter"]
"tags": [
"Filter"
]
}
},
"/search/object": {
Expand Down Expand Up @@ -97,7 +105,9 @@
}
}
],
"tags": ["Filter"]
"tags": [
"Filter"
]
}
},
"/search/object/multiple": {
Expand Down Expand Up @@ -128,7 +138,9 @@
}
}
],
"tags": ["Filter"]
"tags": [
"Filter"
]
}
},
"/update/object": {
Expand Down Expand Up @@ -159,7 +171,9 @@
}
}
],
"tags": ["Filter"]
"tags": [
"Filter"
]
}
},
"/update/object/multiple": {
Expand Down Expand Up @@ -190,7 +204,9 @@
}
}
],
"tags": ["Filter"]
"tags": [
"Filter"
]
}
},
"/upsert/object": {
Expand Down Expand Up @@ -221,7 +237,9 @@
}
}
],
"tags": ["Filter"]
"tags": [
"Filter"
]
}
},
"/upsert/object/multiple": {
Expand Down Expand Up @@ -252,7 +270,9 @@
}
}
],
"tags": ["Filter"]
"tags": [
"Filter"
]
}
}
},
Expand Down Expand Up @@ -649,6 +669,10 @@
"disableBalancedUpdate": {
"type": "boolean",
"description": "A flag to disable balanced update (split remove -\u003e insert operation)\nduring update operation."
},
"updateTimestampIfExists": {
"type": "boolean",
"description": "A flag to enable timestamp update when the same index exists."
}
},
"description": "Represent the update configuration."
Expand Down Expand Up @@ -703,6 +727,10 @@
"disableBalancedUpdate": {
"type": "boolean",
"description": "A flag to disable balanced update (split remove -\u003e insert operation)\nduring update operation."
},
"updateTimestampIfExists": {
"type": "boolean",
"description": "A flag to enable timestamp update when the same index exists."
}
},
"description": "Represent the upsert configuration."
Expand Down
20 changes: 16 additions & 4 deletions apis/swagger/v1/vald/apis/proto/v1/vald/update.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
"title": "apis/proto/v1/vald/update.proto",
"version": "version not set"
},
"consumes": ["application/json"],
"produces": ["application/json"],
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"paths": {
"/update": {
"post": {
Expand Down Expand Up @@ -35,7 +39,9 @@
}
}
],
"tags": ["Update"]
"tags": [
"Update"
]
}
},
"/update/multiple": {
Expand Down Expand Up @@ -66,7 +72,9 @@
}
}
],
"tags": ["Update"]
"tags": [
"Update"
]
}
}
},
Expand Down Expand Up @@ -271,6 +279,10 @@
"disableBalancedUpdate": {
"type": "boolean",
"description": "A flag to disable balanced update (split remove -\u003e insert operation)\nduring update operation."
},
"updateTimestampIfExists": {
"type": "boolean",
"description": "A flag to enable timestamp update when the same index exists."
}
},
"description": "Represent the update configuration."
Expand Down
20 changes: 16 additions & 4 deletions apis/swagger/v1/vald/apis/proto/v1/vald/upsert.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
"title": "apis/proto/v1/vald/upsert.proto",
"version": "version not set"
},
"consumes": ["application/json"],
"produces": ["application/json"],
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"paths": {
"/upsert": {
"post": {
Expand Down Expand Up @@ -35,7 +39,9 @@
}
}
],
"tags": ["Upsert"]
"tags": [
"Upsert"
]
}
},
"/upsert/multiple": {
Expand Down Expand Up @@ -66,7 +72,9 @@
}
}
],
"tags": ["Upsert"]
"tags": [
"Upsert"
]
}
}
},
Expand Down Expand Up @@ -271,6 +279,10 @@
"disableBalancedUpdate": {
"type": "boolean",
"description": "A flag to disable balanced update (split remove -\u003e insert operation)\nduring update operation."
},
"updateTimestampIfExists": {
"type": "boolean",
"description": "A flag to enable timestamp update when the same index exists."
}
},
"description": "Represent the upsert configuration."
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/core/ngt/handler/grpc/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func (s *server) Update(ctx context.Context, req *payload.Update_Request) (res *
log.Warn(err)
return nil, err
}
err = s.ngt.UpdateWithTime(uuid, vec.GetVector(), req.GetConfig().GetTimestamp())
cfg := req.GetConfig()
err = s.ngt.UpdateWithTime(uuid, vec.GetVector(), cfg.GetTimestamp(), cfg.GetUpdateTimestampIfExists())
if err != nil {
var attrs []attribute.KeyValue
if errors.Is(err, errors.ErrObjectIDNotFound(vec.GetId())) {
Expand Down
Loading

0 comments on commit 4186158

Please sign in to comment.