Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batching capabilities to Chunking FSM #2

Merged
merged 3 commits into from
Oct 2, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/hashicorp/raft"
)

func chunkData(t *testing.T) ([]byte, []raft.Log) {
func chunkData(t *testing.T) ([]byte, []*raft.Log) {
data := make([]byte, 6000000)
n, err := rand.Read(data)
if err != nil && err != io.EOF {
Expand All @@ -22,14 +22,17 @@ func chunkData(t *testing.T) ([]byte, []raft.Log) {
t.Fatalf("expected 6000k bytes to test with, read %d", n)
}

logs := make([]raft.Log, 0)
logs := make([]*raft.Log, 0)
dur := time.Second

var index uint64
applyFunc := func(l raft.Log, d time.Duration) raft.ApplyFuture {
if d != dur {
t.Fatalf("expected d to be %v, got %v", time.Second, dur)
}
logs = append(logs, l)
index++
l.Index = index
logs = append(logs, &l)
return raft.ApplyFuture(nil)
}

Expand Down
127 changes: 113 additions & 14 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

var _ raft.FSM = (*ChunkingFSM)(nil)
var _ raft.ConfigurationStore = (*ChunkingConfigurationStore)(nil)
var _ raft.BatchingFSM = (*ChunkingBatchingFSM)(nil)

type ChunkingSuccess struct {
Response interface{}
Expand All @@ -28,6 +29,11 @@ type ChunkingFSM struct {
lastTerm uint64
}

type ChunkingBatchingFSM struct {
*ChunkingFSM
underlyingBatchingFSM raft.BatchingFSM
}

type ChunkingConfigurationStore struct {
*ChunkingFSM
underlyingConfigurationStore raft.ConfigurationStore
Expand All @@ -44,6 +50,20 @@ func NewChunkingFSM(underlying raft.FSM, store ChunkStorage) *ChunkingFSM {
return ret
}

func NewChunkingBatchingFSM(underlying raft.BatchingFSM, store ChunkStorage) *ChunkingBatchingFSM {
ret := &ChunkingBatchingFSM{
ChunkingFSM: &ChunkingFSM{
underlying: underlying,
store: store,
},
underlyingBatchingFSM: underlying,
}
if store == nil {
ret.ChunkingFSM.store = NewInmemChunkStorage()
}
return ret
}

func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store ChunkStorage) *ChunkingConfigurationStore {
ret := &ChunkingConfigurationStore{
ChunkingFSM: &ChunkingFSM{
Expand All @@ -58,30 +78,23 @@ func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store Chu
return ret
}

// Apply applies the log, handling chunking as needed. The return value will
// either be an error or whatever is returned from the underlying Apply.
func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
// Not chunking or wrong type, pass through
if l.Type != raft.LogCommand || l.Extensions == nil {
return c.underlying.Apply(l)
}

func (c *ChunkingFSM) applyChunk(l *raft.Log) (*raft.Log, error) {
if l.Term != c.lastTerm {
// Term has changed. A raft library client that was applying chunks
// should get an error that it's no longer the leader and bail, and
// then any client of (Consul, Vault, etc.) should then retry the full
// chunking operation automatically, which will be under a different
// opnum. So it should be safe in this case to clear the map.
if err := c.store.RestoreChunks(nil); err != nil {
return err
return nil, err
}
c.lastTerm = l.Term
}

// Get chunk info from extensions
var ci types.ChunkInfo
if err := proto.Unmarshal(l.Extensions, &ci); err != nil {
return errwrap.Wrapf("error unmarshaling chunk info: {{err}}", err)
return nil, errwrap.Wrapf("error unmarshaling chunk info: {{err}}", err)
}

// Store the current chunk and find out if all chunks have arrived
Expand All @@ -93,19 +106,20 @@ func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
Data: l.Data,
})
if err != nil {
return err
return nil, err
}
if !done {
return nil
return nil, nil
}

// All chunks are here; get the full set and clear storage of the op
chunks, err := c.store.FinalizeOp(ci.OpNum)
if err != nil {
return err
return nil, err
}

finalData := make([]byte, 0, len(chunks)*raft.SuggestedMaxDataSize)

for _, chunk := range chunks {
finalData = append(finalData, chunk.Data...)
}
Expand All @@ -119,7 +133,27 @@ func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
Extensions: ci.NextExtensions,
}

return ChunkingSuccess{Response: c.underlying.Apply(logToApply)}
return logToApply, nil
}

// Apply applies the log, handling chunking as needed. The return value will
// either be an error or whatever is returned from the underlying Apply.
func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
// Not chunking or wrong type, pass through
if l.Type != raft.LogCommand || l.Extensions == nil {
return c.underlying.Apply(l)
}

logToApply, err := c.applyChunk(l)
if err != nil {
return err
}

if logToApply != nil {
return ChunkingSuccess{Response: c.underlying.Apply(logToApply)}
}

return nil
}

func (c *ChunkingFSM) Snapshot() (raft.FSMSnapshot, error) {
Expand Down Expand Up @@ -157,3 +191,68 @@ func (c *ChunkingFSM) RestoreState(state *State) error {
func (c *ChunkingConfigurationStore) StoreConfiguration(index uint64, configuration raft.Configuration) {
c.underlyingConfigurationStore.StoreConfiguration(index, configuration)
}

// ApplyBatch applies the logs, handling chunking as needed. The return value will
// be an array containing either be an error or whatever is returned from the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor wordsalad here

// underlying Apply for each log.
func (c *ChunkingBatchingFSM) ApplyBatch(logs []*raft.Log) []interface{} {
// responses has a response for each log; their slice index should match.
responses := make([]interface{}, len(logs))

// sentLogs keeps track of which logs we sent. The key is the raft Index
// associated with the log and the value is true if this is a finalized set
// of chunks.
sentLogs := make(map[uint64]bool)

// sendLogs is the subset of logs that we need to pass onto the underlying
// FSM.
sendLogs := make([]*raft.Log, 0, len(logs))

for i, l := range logs {
// Not chunking or wrong type, pass through
if l.Type != raft.LogCommand || l.Extensions == nil {
sendLogs = append(sendLogs, l)
sentLogs[l.Index] = false
continue
}

logToApply, err := c.applyChunk(l)
if err != nil {
responses[i] = err
continue
}

if logToApply != nil {
sendLogs = append(sendLogs, logToApply)
sentLogs[l.Index] = true
}
}

// Send remaining logs to the underlying FSM.
var sentResponses []interface{}
if len(sendLogs) > 0 {
sentResponses = c.underlyingBatchingFSM.ApplyBatch(sendLogs)
}

var sentCounter int
for j, l := range logs {
// If the response is already set we errored above and should continue
// onto the next.
if responses[j] != nil {
continue
}

var resp interface{}
if chunked, ok := sentLogs[l.Index]; ok {
resp = sentResponses[sentCounter]
if chunked {
resp = ChunkingSuccess{Response: sentResponses[sentCounter]}
}
sentCounter++
}

responses[j] = resp
}

return responses
}
110 changes: 107 additions & 3 deletions fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ import (
"github.com/hashicorp/raft"
)

type MockBatchFSM struct {
*MockFSM
}

func (m *MockBatchFSM) ApplyBatch(logs []*raft.Log) []interface{} {
responses := make([]interface{}, len(logs))
for i, l := range logs {
responses[i] = m.Apply(l)
}

return responses
}

type MockFSM struct {
logs [][]byte
}
Expand All @@ -32,7 +45,7 @@ func TestFSM_Basic(t *testing.T) {
data, logs := chunkData(t)

for i, l := range logs {
r := f.Apply(&l)
r := f.Apply(l)
switch r.(type) {
case nil:
if i == len(logs)-1 {
Expand Down Expand Up @@ -72,7 +85,7 @@ func TestFSM_StateHandling(t *testing.T) {
if i == len(logs)-1 {
break
}
r := f.Apply(&l)
r := f.Apply(l)
switch r.(type) {
case nil:
case error:
Expand Down Expand Up @@ -118,7 +131,7 @@ func TestFSM_StateHandling(t *testing.T) {
t.Fatal(diff)
}

r := f.Apply(&(logs[len(logs)-1]))
r := f.Apply(logs[len(logs)-1])
rRaw, ok := r.(ChunkingSuccess)
if !ok {
t.Fatalf("wrong type back: %T, value is %#v", r, r)
Expand Down Expand Up @@ -160,3 +173,94 @@ func TestFSM_StateHandling(t *testing.T) {
t.Fatal(diff)
}
}

func TestBatchingFSM(t *testing.T) {
m := &MockBatchFSM{
MockFSM: new(MockFSM),
}
f := NewChunkingBatchingFSM(m, nil)
_, logs := chunkData(t)

responses := f.ApplyBatch(logs)
for i, r := range responses {
switch r.(type) {
case nil:
if i == len(logs)-1 {
t.Fatal("got nil, expected ChunkingSuccess")
}
case error:
t.Fatal(r.(error))
case ChunkingSuccess:
if i != len(logs)-1 {
t.Fatal("got int back before apply should have happened")
}
if r.(ChunkingSuccess).Response.(int) != 1 {
t.Fatalf("unexpected number of logs back: %d", r.(int))
}
default:
t.Fatal("unexpected return value")
}
}
}

func TestBatchingFSM_MixedData(t *testing.T) {
m := &MockBatchFSM{
MockFSM: new(MockFSM),
}
f := NewChunkingBatchingFSM(m, nil)
_, logs := chunkData(t)

lastSeen := 0
for i := range logs {
batch := make([]*raft.Log, len(logs))
for j := 0; j < len(logs); j++ {
index := uint64((i * len(logs)) + j)
if i == j {
l := logs[i]
l.Index = index
batch[j] = l
} else {
batch[j] = &raft.Log{
Index: index,
Data: []byte("test"),
Type: raft.LogCommand,
}
}
}

responses := f.ApplyBatch(batch)
for j, r := range responses {
switch r.(type) {
case nil:
if j != i {
t.Fatal("got unexpected nil")
}
case error:
t.Fatal(r.(error))
case int:
if j == i {
t.Fatal("got unexpected int")
}
if r.(int) != lastSeen+1 {
t.Fatalf("unexpected number of logs back: %d, expected %d", r.(int), lastSeen+1)
}

lastSeen++
case ChunkingSuccess:
if i != len(logs)-1 && j != i {
t.Fatal("got int back before apply should have happened")
}
if r.(ChunkingSuccess).Response.(int) != lastSeen+1 {
t.Fatalf("unexpected number of logs back: %d", r.(ChunkingSuccess).Response.(int))
}
lastSeen++
default:
t.Fatal("unexpected return value")
}
}
}
if lastSeen != 11*12+1 {
t.Fatalf("unexpected total logs processed: %d", lastSeen)
}

}