Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume transfer of incomplete file #2119

Merged
merged 5 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,11 @@ func (EnvironmentVariable) DownloadToTempPath() EnvironmentVariable {
Description: "Configures azcopy to download to a temp path before actual download. Allowed values are true/false",
}
}

func (EnvironmentVariable) DisableBlobTransferResume() EnvironmentVariable {
return EnvironmentVariable {
Name: "AZCOPY_DISABLE_INCOMPLETE_BLOB_TRANSFER",
DefaultValue: "false",
Description: "An incomplete transfer to blob endpoint will be resumed from start if set to true",
}
}
15 changes: 8 additions & 7 deletions common/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/Azure/azure-storage-file-go/azfile"
)

/////////////////////////////////////////////////////////////////////////////////////////////////
// ///////////////////////////////////////////////////////////////////////////////////////////////
type URLStringExtension string

func (s URLStringExtension) RedactSecretQueryParamForLogging() string {
Expand All @@ -27,7 +27,7 @@ func (s URLStringExtension) RedactSecretQueryParamForLogging() string {
return URLExtension{*u}.RedactSecretQueryParamForLogging()
}

/////////////////////////////////////////////////////////////////////////////////////////////////
// ///////////////////////////////////////////////////////////////////////////////////////////////
type URLExtension struct {
url.URL
}
Expand Down Expand Up @@ -84,7 +84,7 @@ func RedactSecretQueryParam(rawQuery, queryKeyNeedRedact string) (bool, string)
return sigFound, values.Encode()
}

/////////////////////////////////////////////////////////////////////////////////////////////////
// ///////////////////////////////////////////////////////////////////////////////////////////////
type FileURLPartsExtension struct {
azfile.FileURLParts
}
Expand All @@ -100,7 +100,7 @@ func (parts FileURLPartsExtension) GetServiceURL() url.URL {
return parts.URL()
}

/////////////////////////////////////////////////////////////////////////////////////////////////
// ///////////////////////////////////////////////////////////////////////////////////////////////
type HTTPResponseExtension struct {
*http.Response
}
Expand All @@ -118,7 +118,7 @@ func (r HTTPResponseExtension) IsSuccessStatusCode(successStatusCodes ...int) bo
return false
}

/////////////////////////////////////////////////////////////////////////////////////////////////
// ///////////////////////////////////////////////////////////////////////////////////////////////
type ByteSlice []byte
type ByteSliceExtension struct {
ByteSlice
Expand Down Expand Up @@ -185,8 +185,9 @@ func GenerateFullPathWithQuery(rootPath, childPath, extraQuery string) string {
// we have to generate a 36B string and then base64-encode this to retain the
// same size.
// Block Names of blobs are of format noted below.
// <5B empty placeholder> <16B GUID of AzCopy re-interpreted as string><5B PartNum><5B Index in the jobPart><5B blockNum>
// <5B empty placeholder> <16B GUID of AzCopy re-interpreted as string><5B PartNum><5B Index in the jobPart><5B blockNum>
const AZCOPY_BLOCKNAME_LENGTH = 48
func GenerateBlockBlobBlockID(blockNamePrefix string, index int32) string {
blockID := []byte(fmt.Sprintf("%s%05d", blockNamePrefix, index))
return base64.StdEncoding.EncodeToString(blockID)
}
}
6 changes: 2 additions & 4 deletions common/fe-ste-models.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,8 @@ func (TransferStatus) Success() TransferStatus { return TransferStatus(2) }
// Folder was created, but properties have not been persisted yet. Equivalent to Started, but never intended to be set on anything BUT folders.
func (TransferStatus) FolderCreated() TransferStatus { return TransferStatus(3) }

func (TransferStatus) Restarted() TransferStatus { return TransferStatus(4) }

// Transfer failed due to some error.
func (TransferStatus) Failed() TransferStatus { return TransferStatus(-1) }

Expand All @@ -711,10 +713,6 @@ func (TransferStatus) TierAvailabilityCheckFailure() TransferStatus { return Tra

func (TransferStatus) Cancelled() TransferStatus { return TransferStatus(-6) }

func (ts TransferStatus) ShouldTransfer() bool {
return ts == ETransferStatus.NotStarted() || ts == ETransferStatus.Started() || ts == ETransferStatus.FolderCreated()
}

// Transfer is any of the three possible state (InProgress, Completer or Failed)
func (TransferStatus) All() TransferStatus { return TransferStatus(math.MaxInt8) }
func (ts TransferStatus) String() string {
Expand Down
3 changes: 2 additions & 1 deletion jobsAdmin/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func ResumeJobOrder(req common.ResumeJobRequest) common.CancelPauseResumeRespons
// If the transfer status is less than -1, it means the transfer failed because of some reason.
// Transfer Status needs to reset.
if jppt.TransferStatus() <= common.ETransferStatus.Failed() {
jppt.SetTransferStatus(common.ETransferStatus.Started(), true)
jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true)
jppt.SetErrorCode(0, true)
}
}
Expand Down Expand Up @@ -579,6 +579,7 @@ func resurrectJobSummary(jm ste.IJobMgr) common.ListJobSummaryResponse {
case common.ETransferStatus.NotStarted(),
common.ETransferStatus.FolderCreated(),
common.ETransferStatus.Started(),
common.ETransferStatus.Restarted(),
common.ETransferStatus.Cancelled():
js.TotalBytesExpected += uint64(jppt.SourceSize)
case common.ETransferStatus.Success():
Expand Down
2 changes: 1 addition & 1 deletion ste/mgr-JobPartMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context, sourceBlobToken

// If the transfer was failed, then while rescheduling the transfer marking it Started.
if ts == common.ETransferStatus.Failed() {
jppt.SetTransferStatus(common.ETransferStatus.Started(), true)
jppt.SetTransferStatus(common.ETransferStatus.Restarted(), true)
}

if _, dst, isFolder := plan.TransferSrcDstStrings(t); isFolder {
Expand Down
12 changes: 9 additions & 3 deletions ste/mgr-JobPartTransferMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type IJobPartTransferMgr interface {
ResetSourceSize() // sets source size to 0 (made to be used by setProperties command to make number of bytes transferred = 0)
SuccessfulBytesTransferred() int64
TransferIndex() (partNum, transferIndex uint32)
RestartedTransfer() bool
}

type TransferInfo struct {
Expand Down Expand Up @@ -339,7 +340,7 @@ func (jptm *jobPartTransferMgr) Info() TransferInfo {
// does not exceeds 50000 (max number of block per blob)
if blockSize == 0 {
blockSize = common.DefaultBlockBlobBlockSize
for ; sourceSize >= common.MaxNumberOfBlocksPerBlob * blockSize; blockSize = 2 * blockSize {
for ; sourceSize >= common.MaxNumberOfBlocksPerBlob*blockSize; blockSize = 2 * blockSize {
if blockSize > common.BlockSizeThreshold {
/*
* For a RAM usage of 0.5G/core, we would have 4G memory on typical 8 core device, meaning at a blockSize of 256M,
Expand Down Expand Up @@ -431,9 +432,9 @@ func (jptm *jobPartTransferMgr) FileCountLimiter() common.CacheLimiter {
// As at Oct 2019, cases where we mutate destination names are
// (i) when destination is Windows or Azure Files, and source contains characters unsupported at the destination
// (ii) when downloading with --decompress and there are two files that differ only in an extension that will will strip
//e.g. foo.txt and foo.txt.gz (if we decompress the latter, we'll strip the extension and the names will collide)
// e.g. foo.txt and foo.txt.gz (if we decompress the latter, we'll strip the extension and the names will collide)
// (iii) For completeness, there's also bucket->container name resolution when copying from S3, but that is not expected to ever
//create collisions, since it already takes steps to prevent them.
// create collisions, since it already takes steps to prevent them.
func (jptm *jobPartTransferMgr) WaitUntilLockDestination(ctx context.Context) error {
if strings.EqualFold(jptm.Info().Destination, common.Dev_Null) {
return nil // nothing to lock
Expand Down Expand Up @@ -554,6 +555,11 @@ func (jptm *jobPartTransferMgr) TransferIndex() (partNum, transferIndex uint32)
return uint32(jptm.jobPartMgr.Plan().PartNum), jptm.transferIndex
}

func (jptm *jobPartTransferMgr) RestartedTransfer() bool {
return (jptm.jobPartMgr.Plan().FromTo.To() == common.ELocation.Blob() &&
jptm.TransferStatusIgnoringCancellation() == common.ETransferStatus.Restarted())
}

// JobHasLowFileCount returns an estimate of whether we only have a very small number of files in the overall job
// (An "estimate" because it actually only looks at the current job part)
func (jptm *jobPartTransferMgr) JobHasLowFileCount() bool {
Expand Down
81 changes: 75 additions & 6 deletions ste/sender-blockBlob.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ package ste

import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -61,7 +63,8 @@ type blockBlobSenderBase struct {
atomicChunksWritten int32
atomicPutListIndicator int32
muBlockIDs *sync.Mutex
blockNamePrefix string
blockNamePrefix string
completedBlockList map[int]string
}

func getVerifiedChunkParams(transferInfo TransferInfo, memLimit int64) (chunkSize int64, numChunks uint32, err error) {
Expand Down Expand Up @@ -167,7 +170,7 @@ func newBlockBlobSenderBase(jptm IJobPartTransferMgr, destination string, p pipe
cpkToApply: cpkToApply,
muBlockIDs: &sync.Mutex{},
blockNamePrefix: getBlockNamePrefix(jptm.Info().JobID, partNum, transferIndex),
}, nil
}, nil
}

func (s *blockBlobSenderBase) SendableEntityType() common.EntityType {
Expand All @@ -187,6 +190,9 @@ func (s *blockBlobSenderBase) RemoteFileExists() (bool, time.Time, error) {
}

func (s *blockBlobSenderBase) Prologue(ps common.PrologueState) (destinationModified bool) {
if s.jptm.RestartedTransfer() {
s.buildCommittedBlockMap()
}
if s.jptm.ShouldInferContentType() {
s.headersToApply.ContentType = ps.GetInferredContentType(s.jptm)
}
Expand Down Expand Up @@ -289,21 +295,21 @@ func (s *blockBlobSenderBase) Cleanup() {
}
}

//Currently we've common Metadata Copier across all senders for block blob.
// Currently we've common Metadata Copier across all senders for block blob.
func (s *blockBlobSenderBase) GenerateCopyMetadata(id common.ChunkID) chunkFunc {
return createChunkFunc(true, s.jptm, id, func() {
if unixSIP, ok := s.sip.(IUNIXPropertyBearingSourceInfoProvider); ok {
// Clone the metadata before we write to it, we shouldn't be writing to the same metadata as every other blob.
s.metadataToApply = common.Metadata(s.metadataToApply).Clone().ToAzBlobMetadata()

statAdapter, err := unixSIP.GetUNIXProperties()
if err != nil {
s.jptm.FailActiveSend("GetUNIXProperties", err)
}

common.AddStatToBlobMetadata(statAdapter, s.metadataToApply)
}
_, err := s.destBlockBlobURL.SetMetadata(s.jptm.Context(), s.metadataToApply, azblob.BlobAccessConditions{}, s.cpkToApply)
_, err := s.destBlockBlobURL.SetMetadata(s.jptm.Context(), s.metadataToApply, azblob.BlobAccessConditions{}, s.cpkToApply)
if err != nil {
s.jptm.FailActiveSend("Setting Metadata", err)
return
Expand All @@ -323,3 +329,66 @@ func (s *blockBlobSenderBase) setBlockID(index int32, value string) {
func (s *blockBlobSenderBase) generateEncodedBlockID(index int32) string {
return common.GenerateBlockBlobBlockID(s.blockNamePrefix, index)
}

func (s *blockBlobSenderBase) buildCommittedBlockMap() {
invalidAzCopyBlockNameMsg := "buildCommittedBlockMap: Found blocks which are not committed by AzCopy. Restarting whole file"
changedChunkSize := "buildCommittedBlockMap: Chunksize mismatch on uncommitted blocks"
list := make(map[int]string)

if common.GetLifecycleMgr().GetEnvironmentVariable(common.EEnvironmentVariable.DisableBlobTransferResume()) == "true" {
return
}

blockList, err := s.destBlockBlobURL.GetBlockList(s.jptm.Context(), azblob.BlockListUncommitted, azblob.LeaseAccessConditions{})
if err != nil {
s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogError, "Failed to get blocklist. Restarting whole file.")
return
}

if len(blockList.UncommittedBlocks) == 0 {
s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, "No uncommitted chunks found.")
return
}

// We return empty list if
// 1. We find chunks by a different actor
// 2. Chunk size differs
for _, block := range blockList.UncommittedBlocks {
if len(block.Name) != common.AZCOPY_BLOCKNAME_LENGTH {
s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, invalidAzCopyBlockNameMsg)
return
}

tmp, err := base64.StdEncoding.DecodeString(block.Name)
decodedBlockName := string(tmp)
if err != nil || !strings.HasPrefix(decodedBlockName, s.blockNamePrefix) {
s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, invalidAzCopyBlockNameMsg)
return
}

index, err := strconv.Atoi(decodedBlockName[len(s.blockNamePrefix):])
if err != nil || index < 0 || index > int(s.numChunks) {
s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, invalidAzCopyBlockNameMsg)
return
}

// Last chunk may have different blockSize
if block.Size != s.ChunkSize() && index != int(s.numChunks) {
s.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, changedChunkSize)
return
}

list[index] = decodedBlockName
}

// We are here only if all the uncommitted blocks are uploaded by this job with same blockSize
s.completedBlockList = list
}

func (s *blockBlobSenderBase) ChunkAlreadyTransferred(index int32) bool {
if s.completedBlockList != nil {
return false
}
_, ok := s.completedBlockList[int(index)]
return ok
}
8 changes: 8 additions & 0 deletions ste/sender-blockBlobFromLocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package ste

import (
"bytes"
"fmt"
"sync/atomic"

"github.com/Azure/azure-pipeline-go/pipeline"
Expand Down Expand Up @@ -88,6 +89,13 @@ func (u *blockBlobUploader) generatePutBlock(id common.ChunkID, blockIndex int32
// step 1: generate block ID
encodedBlockID := u.generateEncodedBlockID(blockIndex)

if u.ChunkAlreadyTransferred(blockIndex) {
u.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug,
fmt.Sprintf("Skipping chunk %d as it was already transferred.", blockIndex))
atomic.AddInt32(&u.atomicChunksWritten, 1)
return
}

// step 2: save the block ID into the list of block IDs
u.setBlockID(blockIndex, encodedBlockID)

Expand Down
7 changes: 7 additions & 0 deletions ste/sender-blockBlobFromURL.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package ste

import (
"bytes"
"fmt"
"net/url"
"sync/atomic"

Expand Down Expand Up @@ -131,6 +132,12 @@ func (c *urlToBlockBlobCopier) generatePutBlockFromURL(id common.ChunkID, blockI
// step 2: save the block ID into the list of block IDs
c.setBlockID(blockIndex, encodedBlockID)

if c.ChunkAlreadyTransferred(blockIndex) {
c.jptm.LogAtLevelForCurrentTransfer(pipeline.LogDebug, fmt.Sprintf("Skipping chunk %d as it was already transferred.", blockIndex))
atomic.AddInt32(&c.atomicChunksWritten, 1)
return
}

// step 3: put block to remote
c.jptm.LogChunkStatus(id, common.EWaitReason.S2SCopyOnWire())

Expand Down