diff --git a/common/fe-ste-models.go b/common/fe-ste-models.go index b29552ed5..73c558b2d 100644 --- a/common/fe-ste-models.go +++ b/common/fe-ste-models.go @@ -697,6 +697,8 @@ func (TransferStatus) Success() TransferStatus { return TransferStatus(2) } // Folder was created, but properties have not been persisted yet. Equivalent to Started, but never intended to be set on anything BUT folders. func (TransferStatus) FolderCreated() TransferStatus { return TransferStatus(3) } +func (TransferStatus) Restarted() TransferStatus { return TransferStatus(4) } + // Transfer failed due to some error. func (TransferStatus) Failed() TransferStatus { return TransferStatus(-1) } @@ -711,10 +713,6 @@ func (TransferStatus) TierAvailabilityCheckFailure() TransferStatus { return Tra func (TransferStatus) Cancelled() TransferStatus { return TransferStatus(-6) } -func (ts TransferStatus) ShouldTransfer() bool { - return ts == ETransferStatus.NotStarted() || ts == ETransferStatus.Started() || ts == ETransferStatus.FolderCreated() -} - // Transfer is any of the three possible state (InProgress, Completer or Failed) func (TransferStatus) All() TransferStatus { return TransferStatus(math.MaxInt8) } func (ts TransferStatus) String() string { diff --git a/jobsAdmin/init.go b/jobsAdmin/init.go index c57478b42..11a0a5da9 100755 --- a/jobsAdmin/init.go +++ b/jobsAdmin/init.go @@ -424,7 +424,7 @@ func ResumeJobOrder(req common.ResumeJobRequest) common.CancelPauseResumeRespons // If the transfer status is less than -1, it means the transfer failed because of some reason. // Transfer Status needs to reset. if jppt.TransferStatus() <= common.ETransferStatus.Failed() { - jppt.SetTransferStatus(common.ETransferStatus.Started(), true) + jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true) jppt.SetErrorCode(0, true) } } @@ -579,6 +579,7 @@ func resurrectJobSummary(jm ste.IJobMgr) common.ListJobSummaryResponse { case common.ETransferStatus.NotStarted(), common.ETransferStatus.FolderCreated(), common.ETransferStatus.Started(), + common.ETransferStatus.Restarted(), common.ETransferStatus.Cancelled(): js.TotalBytesExpected += uint64(jppt.SourceSize) case common.ETransferStatus.Success(): diff --git a/ste/mgr-JobPartMgr.go b/ste/mgr-JobPartMgr.go index 9cb79a967..fecf5710d 100644 --- a/ste/mgr-JobPartMgr.go +++ b/ste/mgr-JobPartMgr.go @@ -423,7 +423,7 @@ func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context, sourceBlobToken // If the transfer was failed, then while rescheduling the transfer marking it Started. if ts == common.ETransferStatus.Failed() { - jppt.SetTransferStatus(common.ETransferStatus.Started(), true) + jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true) } if _, dst, isFolder := plan.TransferSrcDstStrings(t); isFolder { diff --git a/ste/mgr-JobPartTransferMgr.go b/ste/mgr-JobPartTransferMgr.go index f71a19a8d..50269c743 100644 --- a/ste/mgr-JobPartTransferMgr.go +++ b/ste/mgr-JobPartTransferMgr.go @@ -97,6 +97,7 @@ type IJobPartTransferMgr interface { ResetSourceSize() // sets source size to 0 (made to be used by setProperties command to make number of bytes transferred = 0) SuccessfulBytesTransferred() int64 TransferIndex() (partNum, transferIndex uint32) + RestartedTransfer() bool } type TransferInfo struct { @@ -339,7 +340,7 @@ func (jptm *jobPartTransferMgr) Info() TransferInfo { // does not exceeds 50000 (max number of block per blob) if blockSize == 0 { blockSize = common.DefaultBlockBlobBlockSize - for ; sourceSize >= common.MaxNumberOfBlocksPerBlob * blockSize; blockSize = 2 * blockSize { + for ; sourceSize >= common.MaxNumberOfBlocksPerBlob*blockSize; blockSize = 2 * blockSize { if blockSize > common.BlockSizeThreshold { /* * For a RAM usage of 0.5G/core, we would have 4G memory on typical 8 core device, meaning at a blockSize of 256M, @@ -431,9 +432,9 @@ func (jptm *jobPartTransferMgr) FileCountLimiter() common.CacheLimiter { // As at Oct 2019, cases where we mutate destination names are // (i) when destination is Windows or Azure Files, and source contains characters unsupported at the destination // (ii) when downloading with --decompress and there are two files that differ only in an extension that will will strip -//e.g. foo.txt and foo.txt.gz (if we decompress the latter, we'll strip the extension and the names will collide) +// e.g. foo.txt and foo.txt.gz (if we decompress the latter, we'll strip the extension and the names will collide) // (iii) For completeness, there's also bucket->container name resolution when copying from S3, but that is not expected to ever -//create collisions, since it already takes steps to prevent them. +// create collisions, since it already takes steps to prevent them. func (jptm *jobPartTransferMgr) WaitUntilLockDestination(ctx context.Context) error { if strings.EqualFold(jptm.Info().Destination, common.Dev_Null) { return nil // nothing to lock @@ -554,6 +555,11 @@ func (jptm *jobPartTransferMgr) TransferIndex() (partNum, transferIndex uint32) return uint32(jptm.jobPartMgr.Plan().PartNum), jptm.transferIndex } +func (jptm *jobPartTransferMgr) RestartedTransfer() bool { + return (jptm.jobPartMgr.Plan().FromTo.To() == common.ELocation.Blob() && + jptm.TransferStatusIgnoringCancellation() == common.ETransferStatus.Restarted()) +} + // JobHasLowFileCount returns an estimate of whether we only have a very small number of files in the overall job // (An "estimate" because it actually only looks at the current job part) func (jptm *jobPartTransferMgr) JobHasLowFileCount() bool { diff --git a/ste/sender-blockBlob.go b/ste/sender-blockBlob.go index c916c24b3..b2c8f4e13 100644 --- a/ste/sender-blockBlob.go +++ b/ste/sender-blockBlob.go @@ -63,8 +63,8 @@ type blockBlobSenderBase struct { atomicChunksWritten int32 atomicPutListIndicator int32 muBlockIDs *sync.Mutex - blockNamePrefix string - completedBlockList map[int]string + blockNamePrefix string + completedBlockList map[int]string } func getVerifiedChunkParams(transferInfo TransferInfo, memLimit int64) (chunkSize int64, numChunks uint32, err error) { @@ -170,7 +170,7 @@ func newBlockBlobSenderBase(jptm IJobPartTransferMgr, destination string, p pipe cpkToApply: cpkToApply, muBlockIDs: &sync.Mutex{}, blockNamePrefix: getBlockNamePrefix(jptm.Info().JobID, partNum, transferIndex), - }, nil + }, nil } func (s *blockBlobSenderBase) SendableEntityType() common.EntityType { @@ -190,6 +190,9 @@ func (s *blockBlobSenderBase) RemoteFileExists() (bool, time.Time, error) { } func (s *blockBlobSenderBase) Prologue(ps common.PrologueState) (destinationModified bool) { + if s.jptm.RestartedTransfer() { + s.buildCommittedBlockMap() + } if s.jptm.ShouldInferContentType() { s.headersToApply.ContentType = ps.GetInferredContentType(s.jptm) } @@ -292,21 +295,21 @@ func (s *blockBlobSenderBase) Cleanup() { } } -//Currently we've common Metadata Copier across all senders for block blob. +// Currently we've common Metadata Copier across all senders for block blob. func (s *blockBlobSenderBase) GenerateCopyMetadata(id common.ChunkID) chunkFunc { return createChunkFunc(true, s.jptm, id, func() { if unixSIP, ok := s.sip.(IUNIXPropertyBearingSourceInfoProvider); ok { // Clone the metadata before we write to it, we shouldn't be writing to the same metadata as every other blob. s.metadataToApply = common.Metadata(s.metadataToApply).Clone().ToAzBlobMetadata() - + statAdapter, err := unixSIP.GetUNIXProperties() if err != nil { s.jptm.FailActiveSend("GetUNIXProperties", err) } - + common.AddStatToBlobMetadata(statAdapter, s.metadataToApply) } - _, err := s.destBlockBlobURL.SetMetadata(s.jptm.Context(), s.metadataToApply, azblob.BlobAccessConditions{}, s.cpkToApply) + _, err := s.destBlockBlobURL.SetMetadata(s.jptm.Context(), s.metadataToApply, azblob.BlobAccessConditions{}, s.cpkToApply) if err != nil { s.jptm.FailActiveSend("Setting Metadata", err) return @@ -328,8 +331,8 @@ func (s *blockBlobSenderBase) generateEncodedBlockID(index int32) string { } func (s *blockBlobSenderBase) buildCommittedBlockMap() { - invalidAzCopyBlockNameMsg := "buildCommittedBlockMap: Found blocks which are not comitted by AzCopy. Restarting whole file" - changedChunkSize := "buildCommittedBlockMap: Chunksize mismatch on uncomitted blocks" + invalidAzCopyBlockNameMsg := "buildCommittedBlockMap: Found blocks which are not committed by AzCopy. Restarting whole file" + changedChunkSize := "buildCommittedBlockMap: Chunksize mismatch on uncommitted blocks" list := make(map[int]string) blockList, err := s.destBlockBlobURL.GetBlockList(s.jptm.Context(), azblob.BlockListUncommitted, azblob.LeaseAccessConditions{}) @@ -339,7 +342,7 @@ func (s *blockBlobSenderBase) buildCommittedBlockMap() { } if len(blockList.UncommittedBlocks) == 0 { - s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, "No uncomitted chunks found.") + s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, "No uncommitted chunks found.") return } @@ -351,7 +354,7 @@ func (s *blockBlobSenderBase) buildCommittedBlockMap() { s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, invalidAzCopyBlockNameMsg) return } - + tmp, err := base64.StdEncoding.DecodeString(block.Name) decodedBlockName := string(tmp) if err != nil || !strings.HasPrefix(decodedBlockName, s.blockNamePrefix) { @@ -359,7 +362,7 @@ func (s *blockBlobSenderBase) buildCommittedBlockMap() { return } - index, err := strconv.Atoi(decodedBlockName[len(decodedBlockName) - len(s.blockNamePrefix):]) + index, err := strconv.Atoi(decodedBlockName[len(decodedBlockName)-len(s.blockNamePrefix):]) if err != nil || index < 0 || index > int(s.numChunks) { s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, invalidAzCopyBlockNameMsg) return @@ -373,7 +376,7 @@ func (s *blockBlobSenderBase) buildCommittedBlockMap() { list[index] = decodedBlockName } - + // We are here only if all the uncommitted blocks are uploaded by this job with same blockSize s.completedBlockList = list } @@ -381,4 +384,4 @@ func (s *blockBlobSenderBase) buildCommittedBlockMap() { func (s *blockBlobSenderBase) ChunkAlreadyUploaded(index int32) bool { _, ok := s.completedBlockList[int(index)] return ok -} \ No newline at end of file +}