Skip to content

Commit

Permalink
added ent to ce downgrade changes (#19311)
Browse files Browse the repository at this point in the history
* added ent to ce downgrade changes

* added changelog

* added busl headers
  • Loading branch information
aahel authored Oct 20, 2023
1 parent b1871fd commit 1280f45
Show file tree
Hide file tree
Showing 7 changed files with 1,242 additions and 22 deletions.
3 changes: 3 additions & 0 deletions .changelog/19311.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
raft: Fix panic during downgrade from enterprise to oss.
```
89 changes: 68 additions & 21 deletions agent/consul/fsm/commands_ce.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package fsm

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -152,7 +153,11 @@ func init() {
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now())
var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeRegistrationReq(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted register request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}

Expand All @@ -167,7 +172,11 @@ func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now())
var req structs.DeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeDeregistrationReq(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted deregister request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}

Expand Down Expand Up @@ -195,7 +204,11 @@ func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} {

func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} {
var req structs.KVSRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeKVSRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted KV request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "kvs"}, time.Now(),
Expand Down Expand Up @@ -240,7 +253,11 @@ func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} {

func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} {
var req structs.SessionRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeSessionRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted session request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "session"}, time.Now(),
Expand Down Expand Up @@ -299,7 +316,11 @@ func (c *FSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} {
// state store.
func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} {
var req structs.PreparedQueryRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodePreparedQueryRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted prepared query request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}

Expand All @@ -318,7 +339,7 @@ func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{}

func (c *FSM) applyTxn(buf []byte, index uint64) interface{} {
var req structs.TxnRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeTxnRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"fsm", "txn"}, time.Now())
Expand Down Expand Up @@ -485,7 +506,7 @@ func (c *FSM) applyConnectCALeafOperation(buf []byte, index uint64) interface{}

func (c *FSM) applyACLTokenSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLTokenBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLTokenBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "token"}, time.Now(),
Expand Down Expand Up @@ -523,7 +544,7 @@ func (c *FSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{} {

func (c *FSM) applyACLPolicySetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLPolicyBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLPolicyBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "policy"}, time.Now(),
Expand All @@ -544,10 +565,12 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{
}

func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} {
req := structs.ConfigEntryRequest{
Entry: &structs.ProxyConfigEntry{},
}
if err := structs.Decode(buf, &req); err != nil {
req := structs.ConfigEntryRequest{}
if err := decodeConfigEntryOperationRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted config entry request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}

Expand Down Expand Up @@ -594,7 +617,7 @@ func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} {

func (c *FSM) applyACLRoleSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLRoleBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLRoleBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "role"}, time.Now(),
Expand All @@ -616,7 +639,7 @@ func (c *FSM) applyACLRoleDeleteOperation(buf []byte, index uint64) interface{}

func (c *FSM) applyACLBindingRuleSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLBindingRuleBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLBindingRuleBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "bindingrule"}, time.Now(),
Expand All @@ -638,7 +661,7 @@ func (c *FSM) applyACLBindingRuleDeleteOperation(buf []byte, index uint64) inter

func (c *FSM) applyACLAuthMethodSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLAuthMethodBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLAuthMethodBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "authmethod"}, time.Now(),
Expand All @@ -649,7 +672,11 @@ func (c *FSM) applyACLAuthMethodSetOperation(buf []byte, index uint64) interface

func (c *FSM) applyACLAuthMethodDeleteOperation(buf []byte, index uint64) interface{} {
var req structs.ACLAuthMethodBatchDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLAuthMethodBatchDeleteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted acl auth method delete request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "authmethod"}, time.Now(),
Expand Down Expand Up @@ -706,7 +733,11 @@ func (c *FSM) applySystemMetadataOperation(buf []byte, index uint64) interface{}

func (c *FSM) applyPeeringWrite(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringWriteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringWriteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering write request")
return nil
}
panic(fmt.Errorf("failed to decode peering write request: %v", err))
}

Expand All @@ -718,7 +749,11 @@ func (c *FSM) applyPeeringWrite(buf []byte, index uint64) interface{} {

func (c *FSM) applyPeeringDelete(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringDeleteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringDeleteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering delete request")
return nil
}
panic(fmt.Errorf("failed to decode peering delete request: %v", err))
}

Expand Down Expand Up @@ -758,7 +793,11 @@ func (c *FSM) applyPeeringTerminate(buf []byte, index uint64) interface{} {

func (c *FSM) applyPeeringTrustBundleWrite(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringTrustBundleWriteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringTrustBundleWriteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering trust bundle write request")
return nil
}
panic(fmt.Errorf("failed to decode peering trust bundle write request: %v", err))
}

Expand All @@ -770,7 +809,11 @@ func (c *FSM) applyPeeringTrustBundleWrite(buf []byte, index uint64) interface{}

func (c *FSM) applyPeeringTrustBundleDelete(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringTrustBundleDeleteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringTrustBundleDeleteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering trust bundle delete request")
return nil
}
panic(fmt.Errorf("failed to decode peering trust bundle delete request: %v", err))
}

Expand All @@ -790,7 +833,11 @@ func (f *FSM) applyResourceOperation(buf []byte, idx uint64) any {

func (c *FSM) applyManualVirtualIPs(buf []byte, index uint64) interface{} {
var req state.ServiceVirtualIP
if err := structs.Decode(buf, &req); err != nil {
if err := decodeServiceVirtualIPRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted virtual ip request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}

Expand Down
145 changes: 145 additions & 0 deletions agent/consul/fsm/decode_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

//go:build !consulent
// +build !consulent

package fsm

import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbpeering"
)

func decodeRegistrationReq(buf []byte, req *structs.RegisterRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeRegistration(buf, req)
}

func decodeDeregistrationReq(buf []byte, req *structs.DeregisterRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeDeregistration(buf, req)
}

func decodeKVSRequest(buf []byte, req *structs.KVSRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeKVS(buf, req)
}

func decodeSessionRequest(buf []byte, req *structs.SessionRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}

return decodeSession(buf, req)
}

func decodePreparedQueryRequest(buf []byte, req *structs.PreparedQueryRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodePreparedQuery(buf, req)
}

func decodeTxnRequest(buf []byte, req *structs.TxnRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeTxn(buf, req)
}

func decodeACLTokenBatchSetRequest(buf []byte, req *structs.ACLTokenBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLTokenBatchSet(buf, req)

}

func decodeACLPolicyBatchSetRequest(buf []byte, req *structs.ACLPolicyBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLPolicyBatchSet(buf, req)

}

func decodeACLRoleBatchSetRequest(buf []byte, req *structs.ACLRoleBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLRoleBatchSet(buf, req)
}

func decodeACLBindingRuleBatchSetRequest(buf []byte, req *structs.ACLBindingRuleBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLBindingRuleBatchSet(buf, req)
}

func decodeACLAuthMethodBatchSetRequest(buf []byte, req *structs.ACLAuthMethodBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLAuthMethodBatchSet(buf, req)
}

func decodeACLAuthMethodBatchDeleteRequest(buf []byte, req *structs.ACLAuthMethodBatchDeleteRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}

return decodeACLAuthMethodBatchDelete(buf, req)
}

func decodeServiceVirtualIPRequest(buf []byte, req *state.ServiceVirtualIP) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeServiceVirtualIP(buf, req)
}

func decodePeeringWriteRequest(buf []byte, req *pbpeering.PeeringWriteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}
return decodePeeringWrite(buf, req)
}

func decodePeeringDeleteRequest(buf []byte, req *pbpeering.PeeringDeleteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}

return decodePeeringDelete(buf, req)
}

func decodePeeringTrustBundleWriteRequest(buf []byte, req *pbpeering.PeeringTrustBundleWriteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}
return decodePeeringTrustBundleWrite(buf, req)
}

func decodePeeringTrustBundleDeleteRequest(buf []byte, req *pbpeering.PeeringTrustBundleDeleteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}
return decodePeeringTrustBundleDelete(buf, req)
}

func decodeConfigEntryOperationRequest(buf []byte, req *structs.ConfigEntryRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}

return decodeConfigEntryOperation(buf, req)
}
Loading

0 comments on commit 1280f45

Please sign in to comment.