Skip to content

Commit

Permalink
FIX: DataNode: when dataPartition load,if applyId ==0 ,then start Raft
Browse files Browse the repository at this point in the history
Signed-off-by: awzhgw <[email protected]>
  • Loading branch information
awzhgw committed Aug 16, 2019
1 parent 669df1f commit 042f939
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 44 deletions.
2 changes: 1 addition & 1 deletion datanode/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
const (
IntervalToUpdateReplica = 600 // interval to update the replica
IntervalToUpdatePartitionSize = 60 // interval to update the partition size
NumOfFilesToRecoverInParallel = 17 // number of files to be recovered simultaneously
NumOfFilesToRecoverInParallel = 17 // number of files to be recovered simultaneously
)

// Network protocol
Expand Down
10 changes: 5 additions & 5 deletions datanode/data_partition_repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (dp *DataPartition) buildDataPartitionRepairTask(repairTasks []*DataPartiti
for index := 1; index < len(dp.replicas); index++ {
extents, err := dp.getRemoteExtentInfo(extentType, tinyExtents, dp.replicas[index])
if err != nil {
log.LogErrorf("buildDataPartitionRepairTask partitionID(%v) on (%v) err(%v)",dp.partitionID,dp.replicas[index],err)
log.LogErrorf("buildDataPartitionRepairTask partitionID(%v) on (%v) err(%v)", dp.partitionID, dp.replicas[index], err)
continue
}
repairTasks[index] = NewDataPartitionRepairTask(extents, leaderTinyExtentRealSize, leaderTinyDeleteRecordFileSize, dp.replicas[index], dp.replicas[0])
Expand Down Expand Up @@ -290,7 +290,7 @@ func (dp *DataPartition) prepareRepairTasks(repairTasks []*DataPartitionRepairTa
extentInfoMap := make(map[uint64]*storage.ExtentInfo)
for index := 0; index < len(repairTasks); index++ {
repairTask := repairTasks[index]
if repairTask==nil {
if repairTask == nil {
continue
}
for extentID, extentInfo := range repairTask.extents {
Expand Down Expand Up @@ -321,7 +321,7 @@ func (dp *DataPartition) buildExtentCreationTasks(repairTasks []*DataPartitionRe
}
for index := 0; index < len(repairTasks); index++ {
repairTask := repairTasks[index]
if repairTask==nil {
if repairTask == nil {
continue
}
if _, ok := repairTask.extents[extentID]; !ok && extentInfo.IsDeleted == false {
Expand All @@ -348,7 +348,7 @@ func (dp *DataPartition) buildExtentRepairTasks(repairTasks []*DataPartitionRepa

hasBeenRepaired := true
for index := 0; index < len(repairTasks); index++ {
if repairTasks[index]==nil {
if repairTasks[index] == nil {
continue
}
extentInfo, ok := repairTasks[index].extents[extentID]
Expand Down Expand Up @@ -406,7 +406,7 @@ func (dp *DataPartition) notifyFollower(wg *sync.WaitGroup, index int, members [
func (dp *DataPartition) NotifyExtentRepair(members []*DataPartitionRepairTask) (err error) {
wg := new(sync.WaitGroup)
for i := 1; i < len(members); i++ {
if members[i]==nil {
if members[i] == nil {
continue
}
wg.Add(1)
Expand Down
13 changes: 6 additions & 7 deletions datanode/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,21 +158,20 @@ func (d *Disk) startScheduleToUpdateSpaceInfo() {
}()
}


func (d *Disk) autoComputeExtentCrc(){
func (d *Disk) autoComputeExtentCrc() {
defer func() {
if r := recover(); r != nil {
d.autoComputeExtentCrc()
}
}()
}()
for {
partitions:=make([]*DataPartition,0)
partitions := make([]*DataPartition, 0)
d.RLock()
for _,dp:=range d.partitionMap {
partitions=append(partitions, dp)
for _, dp := range d.partitionMap {
partitions = append(partitions, dp)
}
d.RUnlock()
for _,dp:=range d.partitionMap{
for _, dp := range d.partitionMap {
dp.extentStore.AutoComputeExtentCrc()
}
}
Expand Down
19 changes: 18 additions & 1 deletion datanode/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type DataPartitionMetadata struct {
PartitionSize int
CreateTime string
Peers []proto.Peer
Hosts []string
}

type sortedPeers []proto.Peer
Expand Down Expand Up @@ -152,6 +153,7 @@ func LoadDataPartition(partitionDir string, disk *Disk) (dp *DataPartition, err
PartitionSize: meta.PartitionSize,
PartitionID: meta.PartitionID,
Peers: meta.Peers,
Hosts: meta.Hosts,
RaftStore: disk.space.GetRaftStore(),
NodeID: disk.space.GetNodeID(),
ClusterID: disk.space.GetClusterID(),
Expand All @@ -163,8 +165,22 @@ func LoadDataPartition(partitionDir string, disk *Disk) (dp *DataPartition, err
if err = dp.LoadAppliedID(); err != nil {
log.LogErrorf("action[loadApplyIndex] %v", err)
}

p := NewPacketToGetAppliedID(dp.partitionID)
target := dp.replicas[0]
leaderApplyID, err := dp.getRemoteAppliedID(target, p)
go dp.StartRaftLoggingSchedule()
go dp.StartRaftAfterRepair()

if (dp.appliedID==0 && err==nil && leaderApplyID==0) || dp.appliedID != 0 {
err = dp.StartRaft()
if err != nil {
log.LogErrorf("partitionID(%v) start raft err(%v)..", dp.partitionID, err)
disk.space.DetachDataPartition(dp.partitionID)
}
} else {
go dp.StartRaftAfterRepair()
}

dp.ForceLoadHeader()
return
}
Expand Down Expand Up @@ -329,6 +345,7 @@ func (dp *DataPartition) PersistMetadata() (err error) {
PartitionID: dp.config.PartitionID,
PartitionSize: dp.config.PartitionSize,
Peers: dp.config.Peers,
Hosts: dp.config.Hosts,
CreateTime: time.Now().Format(TimeLayout),
}
if metaData, err = json.Marshal(md); err != nil {
Expand Down
30 changes: 15 additions & 15 deletions datanode/partition_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package datanode

import (
"encoding/binary"
"encoding/json"
"fmt"
"github.com/chubaofs/chubaofs/proto"
"github.com/chubaofs/chubaofs/raftstore"
Expand All @@ -31,7 +32,6 @@ import (
"strconv"
"strings"
"time"
"encoding/json"
)

type dataPartitionCfg struct {
Expand Down Expand Up @@ -196,7 +196,7 @@ func (dp *DataPartition) StartRaftAfterRepair() {
timer.Reset(5 * time.Second)
continue
}
if initMaxExtentID == 0 || initPartitionSize==0 {
if initMaxExtentID == 0 || initPartitionSize == 0 {
initMaxExtentID, initPartitionSize, err = dp.getLeaderMaxExtentIDAndPartitionSize()
}

Expand All @@ -214,13 +214,13 @@ func (dp *DataPartition) StartRaftAfterRepair() {
continue
}

if currLeaderPartitionSize<initPartitionSize{
initPartitionSize=currLeaderPartitionSize
if currLeaderPartitionSize < initPartitionSize {
initPartitionSize = currLeaderPartitionSize
}
localSize := dp.extentStore.StoreSizeExtentID(initMaxExtentID)

log.LogInfof("StartRaftAfterRepair partitionID(%v) initMaxExtentID(%v) initPartitionSize(%v) currLeaderPartitionSize(%v)"+
"localSize(%v)",dp.partitionID,initMaxExtentID,initPartitionSize,currLeaderPartitionSize,localSize)
"localSize(%v)", dp.partitionID, initMaxExtentID, initPartitionSize, currLeaderPartitionSize, localSize)

if initPartitionSize > localSize {
log.LogErrorf("partitionID(%v) leader size(%v) local size(%v)", dp.partitionID, initPartitionSize, localSize)
Expand Down Expand Up @@ -264,9 +264,9 @@ func (dp *DataPartition) addRaftNode(req *proto.DataPartitionDecommissionRequest
if !isUpdated {
return
}
data,_:=json.Marshal(req)
data, _ := json.Marshal(req)
log.LogInfof("AddRaftNode PartitionID(%v) nodeID(%v) index(%v) do RaftLog (%v) Start Remove Self ",
req.PartitionId,dp.config.NodeID,string(data))
req.PartitionId, dp.config.NodeID, string(data))
dp.config.Peers = append(dp.config.Peers, req.AddPeer)
addr := strings.Split(req.AddPeer.Addr, ":")[0]
dp.config.RaftStore.AddNodeWithPort(req.AddPeer.ID, addr, heartbeatPort, replicaPort)
Expand All @@ -276,8 +276,8 @@ func (dp *DataPartition) addRaftNode(req *proto.DataPartitionDecommissionRequest
// Delete a raft node.
func (dp *DataPartition) removeRaftNode(req *proto.DataPartitionDecommissionRequest, index uint64) (isUpdated bool, err error) {
peerIndex := -1
data,_:=json.Marshal(req)
log.LogInfof("RemoveRaftNode PartitionID(%v) nodeID(%v) index(%v) do RaftLog (%v) ",req.PartitionId,dp.config.NodeID,string(data))
data, _ := json.Marshal(req)
log.LogInfof("RemoveRaftNode PartitionID(%v) nodeID(%v) index(%v) do RaftLog (%v) ", req.PartitionId, dp.config.NodeID, string(data))
for i, peer := range dp.config.Peers {
if peer.ID == req.RemovePeer.ID {
isUpdated = true
Expand All @@ -300,18 +300,18 @@ func (dp *DataPartition) removeRaftNode(req *proto.DataPartitionDecommissionRequ
}
dp.Disk().space.DeletePartition(dp.partitionID)
log.LogInfof("RemoveRaftNode PartitionID(%v) nodeID(%v) index(%v) do RaftLog (%v) Fininsh Remove Self ",
req.PartitionId,dp.config.NodeID,string(data))
req.PartitionId, dp.config.NodeID, string(data))
return
}
}(index)
isUpdated = false
log.LogInfof("RemoveRaftNode PartitionID(%v) nodeID(%v) index(%v) do RaftLog (%v) Start Remove Self ",
req.PartitionId,dp.config.NodeID,string(data))
req.PartitionId, dp.config.NodeID, string(data))
return
}
dp.config.Peers = append(dp.config.Peers[:peerIndex], dp.config.Peers[peerIndex+1:]...)
log.LogInfof("RemoveRaftNode PartitionID(%v) nodeID(%v) index(%v) do RaftLog (%v) Start Remove Self ",
req.PartitionId,dp.config.NodeID,string(data))
req.PartitionId, dp.config.NodeID, string(data))
return
}

Expand Down Expand Up @@ -530,13 +530,13 @@ func (dp *DataPartition) getLeaderPartitionSize(maxExtentID uint64) (size uint64
return
}
size = binary.BigEndian.Uint64(p.Data)
log.LogInfof("partition(%v) MaxExtentID(%v) size(%v)", dp.partitionID, maxExtentID,size)
log.LogInfof("partition(%v) MaxExtentID(%v) size(%v)", dp.partitionID, maxExtentID, size)

return
}

// Get the MaxExtentID partition from the leader.
func (dp *DataPartition) getLeaderMaxExtentIDAndPartitionSize() (maxExtentID,PartitionSize uint64, err error) {
func (dp *DataPartition) getLeaderMaxExtentIDAndPartitionSize() (maxExtentID, PartitionSize uint64, err error) {
var (
conn *net.TCPConn
)
Expand Down Expand Up @@ -568,7 +568,7 @@ func (dp *DataPartition) getLeaderMaxExtentIDAndPartitionSize() (maxExtentID,Par
maxExtentID = binary.BigEndian.Uint64(p.Data[0:8])
PartitionSize = binary.BigEndian.Uint64(p.Data[8:16])

log.LogInfo("partition(%v) maxExtentID(%v) PartitionSize(%v) on leader", dp.partitionID, maxExtentID,PartitionSize)
log.LogInfo("partition(%v) maxExtentID(%v) PartitionSize(%v) on leader", dp.partitionID, maxExtentID, PartitionSize)

return
}
Expand Down
4 changes: 2 additions & 2 deletions datanode/wrap_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,11 +733,11 @@ func (s *DataNode) handlePacketToGetPartitionSize(p *repl.Packet) {

func (s *DataNode) handlePacketToGetMaxExtentIDAndPartitionSize(p *repl.Packet) {
partition := p.Object.(*DataPartition)
maxExtentID,totalPartitionSize := partition.extentStore.GetMaxExtentIDAndPartitionSize()
maxExtentID, totalPartitionSize := partition.extentStore.GetMaxExtentIDAndPartitionSize()

buf := make([]byte, 16)
binary.BigEndian.PutUint64(buf[0:8], uint64(maxExtentID))
binary.BigEndian.PutUint64(buf[8:16],totalPartitionSize)
binary.BigEndian.PutUint64(buf[8:16], totalPartitionSize)
p.PacketOkWithBody(buf)

return
Expand Down
4 changes: 2 additions & 2 deletions master/admin_task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type AdminTaskManager struct {
targetAddr string
TaskMap map[string]*proto.AdminTask
sync.RWMutex
exitCh chan struct{}
connPool *util.ConnectPool
exitCh chan struct{}
connPool *util.ConnectPool
}

func newAdminTaskManager(targetAddr, clusterID string) (sender *AdminTaskManager) {
Expand Down
4 changes: 2 additions & 2 deletions master/master_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package master

import (
"testing"
"fmt"
"github.com/chubaofs/chubaofs/proto"
"github.com/chubaofs/chubaofs/raftstore"
rproto "github.com/tiglabs/raft/proto"
"net/http"
"io/ioutil"
"net/http"
"testing"
)

func TestHandleLeaderChange(t *testing.T) {
Expand Down
14 changes: 5 additions & 9 deletions storage/extent_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ var (
RegexpExtentFile, _ = regexp.Compile("^(\\d)+$")
)



type ExtentFilter func(info *ExtentInfo) bool

// Filters
Expand Down Expand Up @@ -184,7 +182,7 @@ func (s *ExtentStore) SnapShot() (files []*proto.File, err error) {

files = make([]*proto.File, 0, len(normalExtentSnapshot))
for _, ei := range normalExtentSnapshot {
file := new (proto.File)
file := new(proto.File)
file.Name = strconv.FormatUint(ei.FileID, 10)
file.Size = uint32(ei.Size)
file.Modified = ei.ModifyTime
Expand All @@ -193,7 +191,7 @@ func (s *ExtentStore) SnapShot() (files []*proto.File, err error) {
}
tinyExtentSnapshot = s.getTinyExtentInfo()
for _, ei := range tinyExtentSnapshot {
file := new (proto.File)
file := new(proto.File)
file.Name = strconv.FormatUint(ei.FileID, 10)
file.Size = uint32(ei.Size)
file.Modified = ei.ModifyTime
Expand Down Expand Up @@ -614,7 +612,7 @@ func (s *ExtentStore) StoreSizeExtentID(maxExtentID uint64) (totalSize uint64) {
}

// StoreSizeExtentID returns the size of the extent store
func (s *ExtentStore) GetMaxExtentIDAndPartitionSize() (maxExtentID,totalSize uint64) {
func (s *ExtentStore) GetMaxExtentIDAndPartitionSize() (maxExtentID, totalSize uint64) {
extentInfos := make([]*ExtentInfo, 0)
s.eiMutex.RLock()
for _, extentInfo := range s.extentInfoMap {
Expand All @@ -625,10 +623,10 @@ func (s *ExtentStore) GetMaxExtentIDAndPartitionSize() (maxExtentID,totalSize ui
if extentInfo.FileID > maxExtentID {
maxExtentID = extentInfo.FileID
}
totalSize+=extentInfo.Size
totalSize += extentInfo.Size
}

return maxExtentID,totalSize
return maxExtentID, totalSize
}

func MarshalTinyExtent(extentID uint64, offset, size int64) (data []byte) {
Expand Down Expand Up @@ -775,8 +773,6 @@ func (s *ExtentStore) loadExtentFromDisk(extentID uint64, putCache bool) (e *Ext
return
}



func (s *ExtentStore) ScanBlocks(extentID uint64) (bcs []*BlockCrc, err error) {
var blockCnt int
bcs = make([]*BlockCrc, 0)
Expand Down

0 comments on commit 042f939

Please sign in to comment.