Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hash based sync #2020

Merged
merged 15 commits into from
Jan 20, 2023
3 changes: 0 additions & 3 deletions cmd/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,9 +1004,6 @@ func validatePutMd5(putMd5 bool, fromTo common.FromTo) error {
if putMd5 && fromTo.IsS2S() {
glcm.Info(" --put-md5 flag to check data consistency between source and destination is not applicable for S2S Transfers (i.e. When both the source and the destination are remote). AzCopy cannot compute MD5 hash of data stored at remote location.")
}
if putMd5 && !fromTo.IsUpload() {
return fmt.Errorf("put-md5 is set but the job is not an upload")
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/copyEnumeratorInit.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrde
traverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &srcCredInfo,
&cca.FollowSymlinks, cca.ListOfFilesChannel, cca.Recursive, getRemoteProperties,
cca.IncludeDirectoryStubs, cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs,
cca.S2sPreserveBlobTags, azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions, nil /* errorChannel */)
cca.S2sPreserveBlobTags, common.ESyncHashType.None(), azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions, nil /* errorChannel */)

if err != nil {
return nil, err
Expand Down Expand Up @@ -342,7 +342,7 @@ func (cca *CookedCopyCmdArgs) isDestDirectory(dst common.ResourceString, ctx *co

rt, err := InitResourceTraverser(dst, cca.FromTo.To(), ctx, &dstCredInfo, nil,
nil, false, false, false, common.EPermanentDeleteOption.None(),
func(common.EntityType) {}, cca.ListOfVersionIDs, false, pipeline.LogNone, cca.CpkOptions, nil /* errorChannel */)
func(common.EntityType) {}, cca.ListOfVersionIDs, false, common.ESyncHashType.None(), pipeline.LogNone, cca.CpkOptions, nil /* errorChannel */)

if err != nil {
return false
Expand Down
2 changes: 1 addition & 1 deletion cmd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (cooked cookedListCmdArgs) HandleListContainerCommand() (err error) {

traverser, err := InitResourceTraverser(source, cooked.location, &ctx, &credentialInfo, nil, nil,
true, false, false, common.EPermanentDeleteOption.None(), func(common.EntityType) {},
nil, false, pipeline.LogNone, common.CpkOptions{}, nil /* errorChannel */)
nil, false, common.ESyncHashType.None(), pipeline.LogNone, common.CpkOptions{}, nil /* errorChannel */)

if err != nil {
return fmt.Errorf("failed to initialize traverser: %s", err.Error())
Expand Down
2 changes: 1 addition & 1 deletion cmd/removeEnumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func newRemoveEnumerator(cca *CookedCopyCmdArgs) (enumerator *CopyEnumerator, er
sourceTraverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &cca.credentialInfo,
nil, cca.ListOfFilesChannel, cca.Recursive, false, cca.IncludeDirectoryStubs,
cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs, false,
azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions, nil /* errorChannel */)
common.ESyncHashType.None(), azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions, nil /* errorChannel */)

// report failure to create traverser
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/setPropertiesEnumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func setPropertiesEnumerator(cca *CookedCopyCmdArgs) (enumerator *CopyEnumerator
sourceTraverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &cca.credentialInfo,
nil, cca.ListOfFilesChannel, cca.Recursive, false, cca.IncludeDirectoryStubs,
cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs, false,
azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions, nil /* errorChannel */)
common.ESyncHashType.None(), azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions, nil /* errorChannel */)

// report failure to create traverser
if err != nil {
Expand Down
25 changes: 20 additions & 5 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type rawSyncCmdArgs struct {
legacyExclude string // for warning messages only
includeRegex string
excludeRegex string
compareHash string

preservePermissions bool
preserveSMBPermissions bool // deprecated and synonymous with preservePermissions
Expand Down Expand Up @@ -274,6 +275,17 @@ func (raw *rawSyncCmdArgs) cook() (cookedSyncCmdArgs, error) {
return cooked, fmt.Errorf("in order to use --preserve-posix-properties, both the source and destination must be POSIX-aware (valid pairings are Linux->Blob, Blob->Linux, Blob->Blob)")
}

if err = cooked.compareHash.Parse(raw.compareHash); err != nil {
return cooked, err
} else {
switch cooked.compareHash {
case common.ESyncHashType.MD5():
// Save any new MD5s on files we download.
raw.putMd5 = true
default: // no need to put a hash of any kind.
}
}

cooked.putMd5 = raw.putMd5
if err = validatePutMd5(cooked.putMd5, cooked.fromTo); err != nil {
return cooked, err
Expand Down Expand Up @@ -384,6 +396,7 @@ type cookedSyncCmdArgs struct {
excludeRegex []string

// options
compareHash common.SyncHashType
preservePermissions common.PreservePermissionsOption
preserveSMBInfo bool
preservePOSIXProperties bool
Expand Down Expand Up @@ -754,11 +767,11 @@ func init() {
// TODO: enable for copy with IfSourceNewer
// smb info/permissions can be persisted in the scenario of File -> File
syncCmd.PersistentFlags().BoolVar(&raw.preserveSMBPermissions, "preserve-smb-permissions", false, "False by default. Preserves SMB ACLs between aware resources (Azure Files). This flag applies to both files and folders, unless a file-only filter is specified (e.g. include-pattern).")
syncCmd.PersistentFlags().BoolVar(&raw.preserveSMBInfo, "preserve-smb-info", (runtime.GOOS=="windows"), "Preserves SMB property info (last write time, creation time, attribute bits)"+
" between SMB-aware resources (Windows and Azure Files). On windows, this flag will be set to true by default. If the source or destination is a "+
"volume mounted on Linux using SMB protocol, this flag will have to be explicitly set to true. Only the attribute bits supported by Azure Files "+
"will be transferred; any others will be ignored. This flag applies to both files and folders, unless a file-only filter is specified "+
"(e.g. include-pattern). The info transferred for folders is the same as that for files, except for Last Write Time which is never preserved for folders.")
syncCmd.PersistentFlags().BoolVar(&raw.preserveSMBInfo, "preserve-smb-info", (runtime.GOOS == "windows"), "Preserves SMB property info (last write time, creation time, attribute bits)"+
" between SMB-aware resources (Windows and Azure Files). On windows, this flag will be set to true by default. If the source or destination is a "+
"volume mounted on Linux using SMB protocol, this flag will have to be explicitly set to true. Only the attribute bits supported by Azure Files "+
"will be transferred; any others will be ignored. This flag applies to both files and folders, unless a file-only filter is specified "+
"(e.g. include-pattern). The info transferred for folders is the same as that for files, except for Last Write Time which is never preserved for folders.")
syncCmd.PersistentFlags().BoolVar(&raw.preservePOSIXProperties, "preserve-posix-properties", false, "'Preserves' property info gleaned from stat or statx into object metadata.")

// TODO: enable when we support local <-> File
Expand Down Expand Up @@ -792,6 +805,8 @@ func init() {
syncCmd.PersistentFlags().BoolVar(&raw.mirrorMode, "mirror-mode", false, "Disable last-modified-time based comparison and overwrites the conflicting files and blobs at the destination if this flag is set to true. Default is false")
syncCmd.PersistentFlags().BoolVar(&raw.dryrun, "dry-run", false, "Prints the path of files that would be copied or removed by the sync command. This flag does not copy or remove the actual files.")

syncCmd.PersistentFlags().StringVar(&raw.compareHash, "compare-hash", "None", "Inform sync to rely on hashes as an alternative to LMT. Missing hashes at a remote source will throw an error. (None, MD5) Default: None")

// temp, to assist users with change in param names, by providing a clearer message when these obsolete ones are accidentally used
syncCmd.PersistentFlags().StringVar(&raw.legacyInclude, "include", "", "Legacy include param. DO NOT USE")
syncCmd.PersistentFlags().StringVar(&raw.legacyExclude, "exclude", "", "Legacy exclude param. DO NOT USE")
Expand Down
111 changes: 97 additions & 14 deletions cmd/syncComparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,35 @@

package cmd

import "strings"
import (
"fmt"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-azcopy/v10/common"
"reflect"
"strings"
)

const (
syncSkipReasonTime = "the source has an older LMT than the destination"
syncSkipReasonMissingHash = "the source lacks an associated hash; please upload with --put-md5"
syncSkipReasonSameHash = "the source has the same hash"
syncOverwriteReasonNewerHash = "the source has a differing hash"
syncOverwriteResaonNewerLMT = "the source is more recent than the destination"
syncStatusSkipped = "skipped"
syncStatusOverwritten = "overwritten"
)

func syncComparatorLog(fileName, status, skipReason string, stdout bool) {
out := fmt.Sprintf("File %s was %s because %s", fileName, status, skipReason)

if azcopyScanningLogger != nil {
azcopyScanningLogger.Log(pipeline.LogInfo, out)
}

if stdout {
glcm.Info(out)
}
}

// with the help of an objectIndexer containing the source objects
// find out the destination objects that should be transferred
Expand All @@ -35,12 +63,14 @@ type syncDestinationComparator struct {
// storing the source objects
sourceIndex *objectIndexer

preferSMBTime bool
comparisonHashType common.SyncHashType

preferSMBTime bool
disableComparison bool
}

func newSyncDestinationComparator(i *objectIndexer, copyScheduler, cleaner objectProcessor, preferSMBTime, disableComparison bool) *syncDestinationComparator {
return &syncDestinationComparator{sourceIndex: i, copyTransferScheduler: copyScheduler, destinationCleaner: cleaner, preferSMBTime: preferSMBTime, disableComparison: disableComparison}
func newSyncDestinationComparator(i *objectIndexer, copyScheduler, cleaner objectProcessor, comparisonHashType common.SyncHashType, preferSMBTime, disableComparison bool) *syncDestinationComparator {
return &syncDestinationComparator{sourceIndex: i, copyTransferScheduler: copyScheduler, destinationCleaner: cleaner, preferSMBTime: preferSMBTime, disableComparison: disableComparison, comparisonHashType: comparisonHashType}
}

// it will only schedule transfers for destination objects that are present in the indexer but stale compared to the entry in the map
Expand All @@ -58,12 +88,36 @@ func (f *syncDestinationComparator) processIfNecessary(destinationObject StoredO
// if the destinationObject is present at source and stale, we transfer the up-to-date version from source
if present {
defer delete(f.sourceIndex.indexMap, destinationObject.relativePath)
if f.disableComparison || sourceObjectInMap.isMoreRecentThan(destinationObject, f.preferSMBTime) {
err := f.copyTransferScheduler(sourceObjectInMap)
if err != nil {
return err

if f.disableComparison {
return f.copyTransferScheduler(sourceObjectInMap)
}

if f.comparisonHashType != common.ESyncHashType.None() && sourceObjectInMap.entityType == common.EEntityType.File() {
switch f.comparisonHashType {
case common.ESyncHashType.MD5():
if sourceObjectInMap.md5 == nil {
syncComparatorLog(sourceObjectInMap.relativePath, syncStatusSkipped, syncSkipReasonMissingHash, true)
return nil
}

if !reflect.DeepEqual(sourceObjectInMap.md5, destinationObject.md5) {
syncComparatorLog(sourceObjectInMap.relativePath, syncStatusOverwritten, syncOverwriteReasonNewerHash, false)

// hash inequality = source "newer" in this model.
return f.copyTransferScheduler(sourceObjectInMap)
}
adreed-msft marked this conversation as resolved.
Show resolved Hide resolved
default:
panic("sanity check: unsupported hash type " + f.comparisonHashType.String())
adreed-msft marked this conversation as resolved.
Show resolved Hide resolved
}

syncComparatorLog(sourceObjectInMap.relativePath, syncStatusSkipped, syncSkipReasonSameHash, false)
return nil
} else if sourceObjectInMap.isMoreRecentThan(destinationObject, f.preferSMBTime) {
return f.copyTransferScheduler(sourceObjectInMap)
}

syncComparatorLog(sourceObjectInMap.relativePath, syncStatusOverwritten, syncOverwriteResaonNewerLMT, false)
} else {
// purposefully ignore the error from destinationCleaner
// it's a tolerable error, since it just means some extra destination object might hang around a bit longer
Expand All @@ -83,12 +137,14 @@ type syncSourceComparator struct {
// storing the destination objects
destinationIndex *objectIndexer

preferSMBTime bool
comparisonHashType common.SyncHashType

preferSMBTime bool
disableComparison bool
}

func newSyncSourceComparator(i *objectIndexer, copyScheduler objectProcessor, preferSMBTime, disableComparison bool) *syncSourceComparator {
return &syncSourceComparator{destinationIndex: i, copyTransferScheduler: copyScheduler, preferSMBTime: preferSMBTime, disableComparison: disableComparison}
func newSyncSourceComparator(i *objectIndexer, copyScheduler objectProcessor, comparisonHashType common.SyncHashType, preferSMBTime, disableComparison bool) *syncSourceComparator {
return &syncSourceComparator{destinationIndex: i, copyTransferScheduler: copyScheduler, preferSMBTime: preferSMBTime, disableComparison: disableComparison, comparisonHashType: comparisonHashType}
}

// it will only transfer source items that are:
Expand All @@ -109,11 +165,38 @@ func (f *syncSourceComparator) processIfNecessary(sourceObject StoredObject) err
if present {
defer delete(f.destinationIndex.indexMap, relPath)

// if destination is stale, schedule source for transfer
if f.disableComparison || sourceObject.isMoreRecentThan(destinationObjectInMap, f.preferSMBTime) {
// if destination is stale, schedule source for transfer
if f.disableComparison {
return f.copyTransferScheduler(sourceObject)
}

if f.comparisonHashType != common.ESyncHashType.None() && sourceObject.entityType == common.EEntityType.File() {
switch f.comparisonHashType {
case common.ESyncHashType.MD5():
if sourceObject.md5 == nil {
syncComparatorLog(sourceObject.relativePath, syncStatusSkipped, syncSkipReasonMissingHash, true)
return nil
}

if !reflect.DeepEqual(sourceObject.md5, destinationObjectInMap.md5) {
// hash inequality = source "newer" in this model.
syncComparatorLog(sourceObject.relativePath, syncStatusOverwritten, syncOverwriteReasonNewerHash, false)
return f.copyTransferScheduler(sourceObject)
}
default:
panic("sanity check: unsupported hash type " + f.comparisonHashType.String())
}

syncComparatorLog(sourceObject.relativePath, syncStatusSkipped, syncSkipReasonSameHash, false)
return nil
} else if sourceObject.isMoreRecentThan(destinationObjectInMap, f.preferSMBTime) {
// if destination is stale, schedule source
syncComparatorLog(sourceObject.relativePath, syncStatusOverwritten, syncOverwriteResaonNewerLMT, false)
return f.copyTransferScheduler(sourceObject)
}
// skip if source is more recent

syncComparatorLog(sourceObject.relativePath, syncStatusSkipped, syncSkipReasonTime, false)
// skip if dest is more recent
return nil
}

Expand Down
9 changes: 5 additions & 4 deletions cmd/syncEnumerator.go
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
if entityType == common.EEntityType.File() {
atomic.AddUint64(&cca.atomicSourceFilesScanned, 1)
}
}, nil, cca.s2sPreserveBlobTags, azcopyLogVerbosity.ToPipelineLogLevel(), cca.cpkOptions, nil /* errorChannel */)
}, nil, cca.s2sPreserveBlobTags, cca.compareHash, azcopyLogVerbosity.ToPipelineLogLevel(), cca.cpkOptions, nil /* errorChannel */)

if err != nil {
return nil, err
Expand All @@ -86,7 +86,7 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
if entityType == common.EEntityType.File() {
atomic.AddUint64(&cca.atomicDestinationFilesScanned, 1)
}
}, nil, cca.s2sPreserveBlobTags, azcopyLogVerbosity.ToPipelineLogLevel(), cca.cpkOptions, nil /* errorChannel */)
}, nil, cca.s2sPreserveBlobTags, cca.compareHash, azcopyLogVerbosity.ToPipelineLogLevel(), cca.cpkOptions, nil /* errorChannel */)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,7 +158,8 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
// when uploading, we can delete remote objects immediately, because as we traverse the remote location
// we ALREADY have available a complete map of everything that exists locally
// so as soon as we see a remote destination object we can know whether it exists in the local source
comparator = newSyncDestinationComparator(indexer, transferScheduler.scheduleCopyTransfer, destCleanerFunc, cca.preserveSMBInfo, cca.mirrorMode).processIfNecessary

comparator = newSyncDestinationComparator(indexer, transferScheduler.scheduleCopyTransfer, destCleanerFunc, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode).processIfNecessary
finalize = func() error {
// schedule every local file that doesn't exist at the destination
err = indexer.traverse(transferScheduler.scheduleCopyTransfer, filters)
Expand All @@ -182,7 +183,7 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
indexer.isDestinationCaseInsensitive = IsDestinationCaseInsensitive(cca.fromTo)
// in all other cases (download and S2S), the destination is scanned/indexed first
// then the source is scanned and filtered based on what the destination contains
comparator = newSyncSourceComparator(indexer, transferScheduler.scheduleCopyTransfer, cca.preserveSMBInfo, cca.mirrorMode).processIfNecessary
comparator = newSyncSourceComparator(indexer, transferScheduler.scheduleCopyTransfer, cca.compareHash, cca.preserveSMBInfo, cca.mirrorMode).processIfNecessary

finalize = func() error {
// remove the extra files at the destination that were not present at the source
Expand Down
4 changes: 2 additions & 2 deletions cmd/syncIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

// the objectIndexer is essential for the generic sync enumerator to work
// it can serve as a:
// 1. objectProcessor: accumulate a lookup map with given StoredObjects
// 2. resourceTraverser: go through the entities in the map like a traverser
// 1. objectProcessor: accumulate a lookup map with given StoredObjects
// 2. resourceTraverser: go through the entities in the map like a traverser
type objectIndexer struct {
indexMap map[string]StoredObject
counter int
Expand Down
Loading