Skip to content

Commit

Permalink
Remove CID lists (#217)
Browse files Browse the repository at this point in the history
* refactor: remove CID lists

* refactor: use -1 instead of pointer for cid set len undef
  • Loading branch information
dirkmc authored Jun 8, 2021
1 parent db84ad0 commit 7e93538
Show file tree
Hide file tree
Showing 11 changed files with 386 additions and 58 deletions.
4 changes: 4 additions & 0 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,7 @@ func (m *mockChannelState) Stages() *datatransfer.ChannelStages {
func (m *mockChannelState) ReceivedCids() []cid.Cid {
panic("implement me")
}

func (m *mockChannelState) ReceivedCidsLen() int {
panic("implement me")
}
17 changes: 13 additions & 4 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type channelState struct {
voucherResults []internal.EncodedVoucherResult
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc
channelCIDsReader ChannelCIDsReader
receivedCids ReceivedCidsReader

// stages tracks the timeline of events related to a data transfer, for
// traceability purposes.
Expand Down Expand Up @@ -100,13 +100,22 @@ func (c channelState) Voucher() datatransfer.Voucher {

// ReceivedCids returns the cids received so far on this channel
func (c channelState) ReceivedCids() []cid.Cid {
receivedCids, err := c.channelCIDsReader(c.ChannelID())
receivedCids, err := c.receivedCids.ToArray(c.ChannelID())
if err != nil {
log.Error(err)
}
return receivedCids
}

// ReceivedCids returns the number of cids received so far on this channel
func (c channelState) ReceivedCidsLen() int {
len, err := c.receivedCids.Len(c.ChannelID())
if err != nil {
log.Error(err)
}
return len
}

// Sender returns the peer id for the node that is sending data
func (c channelState) Sender() peer.ID { return c.sender }

Expand Down Expand Up @@ -190,7 +199,7 @@ func (c channelState) Stages() *datatransfer.ChannelStages {
return c.stages
}

func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc, channelCIDsReader ChannelCIDsReader) datatransfer.ChannelState {
func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc, receivedCidsReader ReceivedCidsReader) datatransfer.ChannelState {
return channelState{
selfPeer: c.SelfPeer,
isPull: c.Initiator == c.Recipient,
Expand All @@ -209,7 +218,7 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
voucherResults: c.VoucherResults,
voucherResultDecoder: voucherResultDecoder,
voucherDecoder: voucherDecoder,
channelCIDsReader: channelCIDsReader,
receivedCids: receivedCidsReader,
stages: c.Stages,
}
}
Expand Down
69 changes: 49 additions & 20 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (

type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)

type ChannelCIDsReader func(chid datatransfer.ChannelID) ([]cid.Cid, error)
type ReceivedCidsReader interface {
ToArray(chid datatransfer.ChannelID) ([]cid.Cid, error)
Len(chid datatransfer.ChannelID) (int, error)
}

type Notifier func(datatransfer.Event, datatransfer.ChannelState)

Expand All @@ -55,7 +58,6 @@ type Channels struct {
voucherResultDecoder DecoderByTypeFunc
stateMachines fsm.Group
migrateStateMachines func(context.Context) error
cidLists cidlists.CIDLists
seenCIDs *cidsets.CIDSetManager
}

Expand All @@ -78,7 +80,6 @@ func New(ds datastore.Batching,

seenCIDsDS := namespace.Wrap(ds, datastore.NewKey("seencids"))
c := &Channels{
cidLists: cidLists,
seenCIDs: cidsets.NewCIDSetManager(seenCIDsDS),
notifier: notifier,
voucherDecoder: voucherDecoder,
Expand Down Expand Up @@ -123,7 +124,7 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
Timestamp: time.Now(),
}

c.notifier(evt, fromInternalChannelState(realChannel, c.voucherDecoder, c.voucherResultDecoder, c.cidLists.ReadList))
c.notifier(evt, c.fromInternalChannelState(realChannel))

// When the channel has been cleaned up, remove the caches of seen cids
if evt.Code == datatransfer.CleanupComplete {
Expand Down Expand Up @@ -180,10 +181,6 @@ func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, base
if err != nil {
return datatransfer.ChannelID{}, err
}
err = c.cidLists.CreateList(chid, nil)
if err != nil {
return datatransfer.ChannelID{}, err
}
return chid, c.stateMachines.Send(chid, datatransfer.Open)
}

Expand All @@ -197,7 +194,7 @@ func (c *Channels) InProgress() (map[datatransfer.ChannelID]datatransfer.Channel
channels := make(map[datatransfer.ChannelID]datatransfer.ChannelState, len(internalChannels))
for _, internalChannel := range internalChannels {
channels[datatransfer.ChannelID{ID: internalChannel.TransferID, Responder: internalChannel.Responder, Initiator: internalChannel.Initiator}] =
fromInternalChannelState(internalChannel, c.voucherDecoder, c.voucherResultDecoder, c.cidLists.ReadList)
c.fromInternalChannelState(internalChannel)
}
return channels, nil
}
Expand All @@ -210,7 +207,7 @@ func (c *Channels) GetByID(ctx context.Context, chid datatransfer.ChannelID) (da
if err != nil {
return nil, NewErrNotFound(chid)
}
return fromInternalChannelState(internalChannel, c.voucherDecoder, c.voucherResultDecoder, c.cidLists.ReadList), nil
return c.fromInternalChannelState(internalChannel), nil
}

// Accept marks a data transfer as accepted
Expand Down Expand Up @@ -239,11 +236,6 @@ func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint

// Returns true if this is the first time the block has been received
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64) (bool, error) {
err := c.cidLists.AppendList(chid, k)
if err != nil {
return false, err
}

return c.fireProgressEvent(chid, datatransfer.DataReceived, datatransfer.DataReceivedProgress, k, delta)
}

Expand Down Expand Up @@ -361,12 +353,12 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
// blocks that have already been queued / sent / received
func (c *Channels) removeSeenCIDCaches(chid datatransfer.ChannelID) error {
progressStates := []datatransfer.EventCode{
datatransfer.DataQueuedProgress,
datatransfer.DataSentProgress,
datatransfer.DataReceivedProgress,
datatransfer.DataQueued,
datatransfer.DataSent,
datatransfer.DataReceived,
}
for _, evt := range progressStates {
sid := cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
sid := seenCidsSetID(chid, evt)
err := c.seenCIDs.DeleteSet(sid)
if err != nil {
return err
Expand All @@ -388,7 +380,7 @@ func (c *Channels) fireProgressEvent(chid datatransfer.ChannelID, evt datatransf
}

// Check if the block has already been seen
sid := cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
sid := seenCidsSetID(chid, evt)
seen, err := c.seenCIDs.InsertSetCID(sid, k)
if err != nil {
return false, err
Expand Down Expand Up @@ -424,3 +416,40 @@ func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatran
}
return nil
}

// Get the ID of the CID set for the given channel ID and event code.
// The CID set stores a unique list of queued / sent / received CIDs.
func seenCidsSetID(chid datatransfer.ChannelID, evt datatransfer.EventCode) cidsets.SetID {
return cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
}

// Convert from the internally used channel state format to the externally exposed ChannelState
func (c *Channels) fromInternalChannelState(ch internal.ChannelState) datatransfer.ChannelState {
rcr := &receivedCidsReader{
seenCIDs: c.seenCIDs,
}
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder, rcr)
}

// Implements the ReceivedCidsReader interface so that the internal channel
// state has access to the received CIDs.
// The interface is used (instead of passing these values directly)
// so the values can be loaded lazily. Reading all CIDs from the datastore
// is an expensive operation so we want to avoid doing it unless necessary.
// Note that the received CIDs get cleaned up when the channel completes, so
// these methods will return an empty array after that point.
type receivedCidsReader struct {
seenCIDs *cidsets.CIDSetManager
}

func (r *receivedCidsReader) ToArray(chid datatransfer.ChannelID) ([]cid.Cid, error) {
sid := seenCidsSetID(chid, datatransfer.DataReceived)
return r.seenCIDs.SetToArray(sid)
}

func (r *receivedCidsReader) Len(chid datatransfer.ChannelID) (int, error) {
sid := seenCidsSetID(chid, datatransfer.DataReceived)
return r.seenCIDs.SetLen(sid)
}

var _ ReceivedCidsReader = (*receivedCidsReader)(nil)
11 changes: 7 additions & 4 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,23 @@ func TestChannels(t *testing.T) {
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())
require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())
require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())

isNew, err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 25)
require.NoError(t, err)
require.False(t, isNew)
state = checkEvent(ctx, t, received, datatransfer.DataSent)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())
require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())
require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())

isNew, err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
require.NoError(t, err)
require.False(t, isNew)
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())
require.Equal(t, []cid.Cid{cids[0], cids[1], cids[0]}, state.ReceivedCids())
require.ElementsMatch(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())
})

t.Run("pause/resume", func(t *testing.T) {
Expand Down Expand Up @@ -613,7 +613,10 @@ func TestMigrationsV1(t *testing.T) {
require.Equal(t, messages[i], channel.Message())
require.Equal(t, vouchers[i], channel.LastVoucher())
require.Equal(t, voucherResults[i], channel.LastVoucherResult())
require.Equal(t, receivedCids[i], channel.ReceivedCids())
// No longer relying on this migration to migrate CID lists as they
// have been deprecated since we moved to CID sets:
// https://github.com/filecoin-project/go-data-transfer/pull/217
//require.Equal(t, receivedCids[i], channel.ReceivedCids())
}
}

Expand Down
1 change: 1 addition & 0 deletions cidlists/cidlists.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
)

// Deprecated: CIDLists have now been replaced by CID sets (see cidsets directory).
// CIDLists maintains files that contain a list of CIDs received for different data transfers
type CIDLists interface {
CreateList(chid datatransfer.ChannelID, initalCids []cid.Cid) error
Expand Down
Loading

0 comments on commit 7e93538

Please sign in to comment.