Skip to content

Commit

Permalink
iIntroduce transfer status restarted
Browse files Browse the repository at this point in the history
  • Loading branch information
nakulkar-msft committed Mar 16, 2023
1 parent e1120ac commit 05d9281
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 23 deletions.
6 changes: 2 additions & 4 deletions common/fe-ste-models.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,8 @@ func (TransferStatus) Success() TransferStatus { return TransferStatus(2) }
// Folder was created, but properties have not been persisted yet. Equivalent to Started, but never intended to be set on anything BUT folders.
func (TransferStatus) FolderCreated() TransferStatus { return TransferStatus(3) }

func (TransferStatus) Restarted() TransferStatus { return TransferStatus(4) }

// Transfer failed due to some error.
func (TransferStatus) Failed() TransferStatus { return TransferStatus(-1) }

Expand All @@ -711,10 +713,6 @@ func (TransferStatus) TierAvailabilityCheckFailure() TransferStatus { return Tra

func (TransferStatus) Cancelled() TransferStatus { return TransferStatus(-6) }

func (ts TransferStatus) ShouldTransfer() bool {
return ts == ETransferStatus.NotStarted() || ts == ETransferStatus.Started() || ts == ETransferStatus.FolderCreated()
}

// Transfer is any of the three possible state (InProgress, Completer or Failed)
func (TransferStatus) All() TransferStatus { return TransferStatus(math.MaxInt8) }
func (ts TransferStatus) String() string {
Expand Down
3 changes: 2 additions & 1 deletion jobsAdmin/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func ResumeJobOrder(req common.ResumeJobRequest) common.CancelPauseResumeRespons
// If the transfer status is less than -1, it means the transfer failed because of some reason.
// Transfer Status needs to reset.
if jppt.TransferStatus() <= common.ETransferStatus.Failed() {
jppt.SetTransferStatus(common.ETransferStatus.Started(), true)
jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true)
jppt.SetErrorCode(0, true)
}
}
Expand Down Expand Up @@ -579,6 +579,7 @@ func resurrectJobSummary(jm ste.IJobMgr) common.ListJobSummaryResponse {
case common.ETransferStatus.NotStarted(),
common.ETransferStatus.FolderCreated(),
common.ETransferStatus.Started(),
common.ETransferStatus.Restarted(),
common.ETransferStatus.Cancelled():
js.TotalBytesExpected += uint64(jppt.SourceSize)
case common.ETransferStatus.Success():
Expand Down
2 changes: 1 addition & 1 deletion ste/mgr-JobPartMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context, sourceBlobToken

// If the transfer was failed, then while rescheduling the transfer marking it Started.
if ts == common.ETransferStatus.Failed() {
jppt.SetTransferStatus(common.ETransferStatus.Started(), true)
jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true)
}

if _, dst, isFolder := plan.TransferSrcDstStrings(t); isFolder {
Expand Down
12 changes: 9 additions & 3 deletions ste/mgr-JobPartTransferMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type IJobPartTransferMgr interface {
ResetSourceSize() // sets source size to 0 (made to be used by setProperties command to make number of bytes transferred = 0)
SuccessfulBytesTransferred() int64
TransferIndex() (partNum, transferIndex uint32)
RestartedTransfer() bool
}

type TransferInfo struct {
Expand Down Expand Up @@ -339,7 +340,7 @@ func (jptm *jobPartTransferMgr) Info() TransferInfo {
// does not exceeds 50000 (max number of block per blob)
if blockSize == 0 {
blockSize = common.DefaultBlockBlobBlockSize
for ; sourceSize >= common.MaxNumberOfBlocksPerBlob * blockSize; blockSize = 2 * blockSize {
for ; sourceSize >= common.MaxNumberOfBlocksPerBlob*blockSize; blockSize = 2 * blockSize {
if blockSize > common.BlockSizeThreshold {
/*
* For a RAM usage of 0.5G/core, we would have 4G memory on typical 8 core device, meaning at a blockSize of 256M,
Expand Down Expand Up @@ -431,9 +432,9 @@ func (jptm *jobPartTransferMgr) FileCountLimiter() common.CacheLimiter {
// As at Oct 2019, cases where we mutate destination names are
// (i) when destination is Windows or Azure Files, and source contains characters unsupported at the destination
// (ii) when downloading with --decompress and there are two files that differ only in an extension that will will strip
//e.g. foo.txt and foo.txt.gz (if we decompress the latter, we'll strip the extension and the names will collide)
// e.g. foo.txt and foo.txt.gz (if we decompress the latter, we'll strip the extension and the names will collide)
// (iii) For completeness, there's also bucket->container name resolution when copying from S3, but that is not expected to ever
//create collisions, since it already takes steps to prevent them.
// create collisions, since it already takes steps to prevent them.
func (jptm *jobPartTransferMgr) WaitUntilLockDestination(ctx context.Context) error {
if strings.EqualFold(jptm.Info().Destination, common.Dev_Null) {
return nil // nothing to lock
Expand Down Expand Up @@ -554,6 +555,11 @@ func (jptm *jobPartTransferMgr) TransferIndex() (partNum, transferIndex uint32)
return uint32(jptm.jobPartMgr.Plan().PartNum), jptm.transferIndex
}

func (jptm *jobPartTransferMgr) RestartedTransfer() bool {
return (jptm.jobPartMgr.Plan().FromTo.To() == common.ELocation.Blob() &&
jptm.TransferStatusIgnoringCancellation() == common.ETransferStatus.Restarted())
}

// JobHasLowFileCount returns an estimate of whether we only have a very small number of files in the overall job
// (An "estimate" because it actually only looks at the current job part)
func (jptm *jobPartTransferMgr) JobHasLowFileCount() bool {
Expand Down
31 changes: 17 additions & 14 deletions ste/sender-blockBlob.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ type blockBlobSenderBase struct {
atomicChunksWritten int32
atomicPutListIndicator int32
muBlockIDs *sync.Mutex
blockNamePrefix string
completedBlockList map[int]string
blockNamePrefix string
completedBlockList map[int]string
}

func getVerifiedChunkParams(transferInfo TransferInfo, memLimit int64) (chunkSize int64, numChunks uint32, err error) {
Expand Down Expand Up @@ -170,7 +170,7 @@ func newBlockBlobSenderBase(jptm IJobPartTransferMgr, destination string, p pipe
cpkToApply: cpkToApply,
muBlockIDs: &sync.Mutex{},
blockNamePrefix: getBlockNamePrefix(jptm.Info().JobID, partNum, transferIndex),
}, nil
}, nil
}

func (s *blockBlobSenderBase) SendableEntityType() common.EntityType {
Expand All @@ -190,6 +190,9 @@ func (s *blockBlobSenderBase) RemoteFileExists() (bool, time.Time, error) {
}

func (s *blockBlobSenderBase) Prologue(ps common.PrologueState) (destinationModified bool) {
if s.jptm.RestartedTransfer() {
s.buildCommittedBlockMap()
}
if s.jptm.ShouldInferContentType() {
s.headersToApply.ContentType = ps.GetInferredContentType(s.jptm)
}
Expand Down Expand Up @@ -292,21 +295,21 @@ func (s *blockBlobSenderBase) Cleanup() {
}
}

//Currently we've common Metadata Copier across all senders for block blob.
// Currently we've common Metadata Copier across all senders for block blob.
func (s *blockBlobSenderBase) GenerateCopyMetadata(id common.ChunkID) chunkFunc {
return createChunkFunc(true, s.jptm, id, func() {
if unixSIP, ok := s.sip.(IUNIXPropertyBearingSourceInfoProvider); ok {
// Clone the metadata before we write to it, we shouldn't be writing to the same metadata as every other blob.
s.metadataToApply = common.Metadata(s.metadataToApply).Clone().ToAzBlobMetadata()

statAdapter, err := unixSIP.GetUNIXProperties()
if err != nil {
s.jptm.FailActiveSend("GetUNIXProperties", err)
}

common.AddStatToBlobMetadata(statAdapter, s.metadataToApply)
}
_, err := s.destBlockBlobURL.SetMetadata(s.jptm.Context(), s.metadataToApply, azblob.BlobAccessConditions{}, s.cpkToApply)
_, err := s.destBlockBlobURL.SetMetadata(s.jptm.Context(), s.metadataToApply, azblob.BlobAccessConditions{}, s.cpkToApply)
if err != nil {
s.jptm.FailActiveSend("Setting Metadata", err)
return
Expand All @@ -328,8 +331,8 @@ func (s *blockBlobSenderBase) generateEncodedBlockID(index int32) string {
}

func (s *blockBlobSenderBase) buildCommittedBlockMap() {
invalidAzCopyBlockNameMsg := "buildCommittedBlockMap: Found blocks which are not comitted by AzCopy. Restarting whole file"
changedChunkSize := "buildCommittedBlockMap: Chunksize mismatch on uncomitted blocks"
invalidAzCopyBlockNameMsg := "buildCommittedBlockMap: Found blocks which are not committed by AzCopy. Restarting whole file"
changedChunkSize := "buildCommittedBlockMap: Chunksize mismatch on uncommitted blocks"
list := make(map[int]string)

blockList, err := s.destBlockBlobURL.GetBlockList(s.jptm.Context(), azblob.BlockListUncommitted, azblob.LeaseAccessConditions{})
Expand All @@ -339,7 +342,7 @@ func (s *blockBlobSenderBase) buildCommittedBlockMap() {
}

if len(blockList.UncommittedBlocks) == 0 {
s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, "No uncomitted chunks found.")
s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, "No uncommitted chunks found.")
return
}

Expand All @@ -351,15 +354,15 @@ func (s *blockBlobSenderBase) buildCommittedBlockMap() {
s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, invalidAzCopyBlockNameMsg)
return
}

tmp, err := base64.StdEncoding.DecodeString(block.Name)
decodedBlockName := string(tmp)
if err != nil || !strings.HasPrefix(decodedBlockName, s.blockNamePrefix) {
s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, invalidAzCopyBlockNameMsg)
return
}

index, err := strconv.Atoi(decodedBlockName[len(decodedBlockName) - len(s.blockNamePrefix):])
index, err := strconv.Atoi(decodedBlockName[len(decodedBlockName)-len(s.blockNamePrefix):])
if err != nil || index < 0 || index > int(s.numChunks) {
s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, invalidAzCopyBlockNameMsg)
return
Expand All @@ -373,12 +376,12 @@ func (s *blockBlobSenderBase) buildCommittedBlockMap() {

list[index] = decodedBlockName
}

// We are here only if all the uncommitted blocks are uploaded by this job with same blockSize
s.completedBlockList = list
}

func (s *blockBlobSenderBase) ChunkAlreadyUploaded(index int32) bool {
_, ok := s.completedBlockList[int(index)]
return ok
}
}

0 comments on commit 05d9281

Please sign in to comment.