From c3f6da5d8d555e21a01334231d86bd85e7eb7d99 Mon Sep 17 00:00:00 2001 From: rkodev <43806892+rkodev@users.noreply.github.com> Date: Wed, 21 Jun 2023 16:19:38 +0300 Subject: [PATCH] Large file upload task --- batch_request_collection.go | 15 ++++- fileupload/large_file_upload_task.go | 57 ----------------- fileupload/upload_request.go | 54 ----------------- fileuploader/file_uploader_util.go | 82 +++++++++++++++++++++++++ fileuploader/large_file_upload_task.go | 84 ++++++++++++++++++++++++++ fileuploader/upload_slice.go | 66 ++++++++++++++++++++ utils.go | 14 ----- 7 files changed, 246 insertions(+), 126 deletions(-) delete mode 100644 fileupload/large_file_upload_task.go delete mode 100644 fileupload/upload_request.go create mode 100644 fileuploader/file_uploader_util.go create mode 100644 fileuploader/large_file_upload_task.go create mode 100644 fileuploader/upload_slice.go delete mode 100644 utils.go diff --git a/batch_request_collection.go b/batch_request_collection.go index 9f4ae56..338e846 100644 --- a/batch_request_collection.go +++ b/batch_request_collection.go @@ -36,7 +36,7 @@ func (b *BatchRequestCollection) AddBatchRequestStep(reqInfo abstractions.Reques // Send serializes and sends the batch request to the server func (b *BatchRequestCollection) Send(ctx context.Context, adapter abstractions.RequestAdapter) (BatchResponse, error) { // spit request with a max of 19 - requestItems := ChunkSlice(b.batchRequest.requests, 19) + requestItems := chunkSlice(b.batchRequest.requests, 19) if len(requestItems) > b.batchLimit { return nil, errors.New("exceeded max number of batch requests") @@ -56,3 +56,16 @@ func (b *BatchRequestCollection) Send(ctx context.Context, adapter abstractions. return response, nil } + +func chunkSlice[T interface{}](slice []T, chunkSize int) [][]T { + var chunks [][]T + for i := 0; i < len(slice); i += chunkSize { + end := i + chunkSize + if end > len(slice) { + end = len(slice) + } + + chunks = append(chunks, slice[i:end]) + } + return chunks +} diff --git a/fileupload/large_file_upload_task.go b/fileupload/large_file_upload_task.go deleted file mode 100644 index 131205c..0000000 --- a/fileupload/large_file_upload_task.go +++ /dev/null @@ -1,57 +0,0 @@ -package fileupload - -import ( - abstractions "github.com/microsoft/kiota-abstractions-go" - "github.com/microsoftgraph/msgraph-sdk-go-core" - "time" -) - -type LargeFileUploadTask[T interface{}] struct { - uploadSession UploadSession - requestAdapter *msgraphgocore.GraphRequestAdapterBase - data []byte - sliceSize int -} - -type UploadSession interface { - GetNextExpectedRanges() []string - GetUploadUrl() *string - GetExpirationDateTime() *time.Time -} - -type ProgressCallback func(current float64, total float64) - -type UploadResult[T interface{}] struct { - ItemResponse T - UploadSession UploadSession - URI string - UploadSucceeded bool - ErrorMappings abstractions.ErrorMappings -} - -const DefaultSliceSize = 1024 - -func NewLargeFileUploadTask[T interface{}](requestAdapter *msgraphgocore.GraphRequestAdapterBase, uploadSession UploadSession, data []byte, maxSliceSize int) *LargeFileUploadTask[T] { - return &LargeFileUploadTask[T]{ - uploadSession: uploadSession, - requestAdapter: requestAdapter, - data: data, - sliceSize: maxSliceSize, - } -} - -func (l *LargeFileUploadTask[T]) Upload(progress ProgressCallback) UploadResult[T] { - chunkSize := l.sliceSize - if chunkSize == -1 { - chunkSize = DefaultSliceSize - } - - uploadResult := UploadResult[T]{} - - chunks := msgraphgocore.ChunkSlice(l.data, chunkSize) - for _, dataSection := range chunks { - uploadRequest := NewUploadRequest[T](l.requestAdapter, dataSection, l.uploadSession.GetUploadUrl()) - uploadRequest.UploadAsync() - } - return uploadResult -} diff --git a/fileupload/upload_request.go b/fileupload/upload_request.go deleted file mode 100644 index 02493aa..0000000 --- a/fileupload/upload_request.go +++ /dev/null @@ -1,54 +0,0 @@ -package fileupload - -import ( - "context" - "fmt" - abstractions "github.com/microsoft/kiota-abstractions-go" - msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core" -) - -type UploadRequest[T interface{}] struct { - requestAdapter *msgraphgocore.GraphRequestAdapterBase - data []byte - sessionUrlTemplate string - rangeBegin uint64 - rangeEnd uint64 - rangeLength uint64 - totalSessionLength uint64 - errorMappings abstractions.ErrorMappings -} - -func NewUploadRequest[T interface{}](requestAdapter *msgraphgocore.GraphRequestAdapterBase, data []byte, sessionUrlTemplate string, rangeBegin uint64, rangeEnd uint64, totalSessionLength uint64) *UploadRequest[T] { - return &UploadRequest[T]{ - requestAdapter: requestAdapter, - data: data, - sessionUrlTemplate: sessionUrlTemplate, - rangeBegin: rangeBegin, - rangeEnd: rangeEnd, - totalSessionLength: totalSessionLength, - } -} - -func (u *UploadRequest[T]) RangeLength() uint64 { - return u.rangeEnd - u.rangeBegin + 1 -} - -func (u *UploadRequest[T]) UploadAsync() UploadResult[T] { - uploadResult := UploadResult[T]{} - requestInfo := u.createRequestInformation() - err := u.requestAdapter.SendNoContent(context.Background(), requestInfo, u.errorMappings) - - return uploadResult -} - -func (u *UploadRequest[T]) createRequestInformation() *abstractions.RequestInformation { - requestInfo := abstractions.NewRequestInformation() - requestInfo.UrlTemplate = u.sessionUrlTemplate - requestInfo.Method = abstractions.POST - requestInfo.SetStreamContent(u.data) - - requestInfo.Headers.Add("Content-Range", fmt.Sprintf("bytes %d-%d/%d", u.rangeLength, u.rangeEnd, u.totalSessionLength)) - requestInfo.Headers.Add("Content-Length", fmt.Sprintf("%d", u.RangeLength())) - - return requestInfo -} diff --git a/fileuploader/file_uploader_util.go b/fileuploader/file_uploader_util.go new file mode 100644 index 0000000..1ea3bf1 --- /dev/null +++ b/fileuploader/file_uploader_util.go @@ -0,0 +1,82 @@ +package fileuploader + +import ( + "strings" + "time" +) + +type rangePair struct { + Start float64 + End float64 +} + +func stringIsNullOrEmpty(s string) bool { + s = strings.TrimSpace(s) + if s == "" || len(strings.TrimSpace(s)) == 0 { + return false + } + return true +} + +type UploadSession interface { + GetExpirationDateTime() *time.Time + GetNextExpectedRanges() []string + GetOdataType() *string + GetUploadUrl() *string +} + +type ProgressCallBack func(current float64, total float64) + +type UploadResult[T interface{}] interface { + SetItemResponse(response T) + GetItemResponse() T + SetUploadSession(uploadSession UploadSession) + GetUploadSession() UploadSession + SetURI(uri string) + GetURI() string + SetUploadSucceeded(isSuccessful bool) + GetUploadSucceeded() bool +} + +func NewUploadResult[T interface{}]() UploadResult[T] { + return &uploadResult[T]{} +} + +type uploadResult[T interface{}] struct { + itemResponse T + uploadSession UploadSession + uri string + uploadSucceeded bool +} + +func (u *uploadResult[T]) SetItemResponse(response T) { + u.itemResponse = response +} + +func (u *uploadResult[T]) GetItemResponse() T { + return u.itemResponse +} + +func (u *uploadResult[T]) SetUploadSession(uploadSession UploadSession) { + u.uploadSession = uploadSession +} + +func (u *uploadResult[T]) GetUploadSession() UploadSession { + return u.uploadSession +} + +func (u *uploadResult[T]) SetURI(uri string) { + u.uri = uri +} + +func (u *uploadResult[T]) GetURI() string { + return u.uri +} + +func (u *uploadResult[T]) SetUploadSucceeded(isSuccessful bool) { + u.uploadSucceeded = isSuccessful +} + +func (u *uploadResult[T]) GetUploadSucceeded() bool { + return u.uploadSucceeded +} diff --git a/fileuploader/large_file_upload_task.go b/fileuploader/large_file_upload_task.go new file mode 100644 index 0000000..901ecc7 --- /dev/null +++ b/fileuploader/large_file_upload_task.go @@ -0,0 +1,84 @@ +package fileuploader + +import ( + abstractions "github.com/microsoft/kiota-abstractions-go" + "github.com/microsoft/kiota-abstractions-go/serialization" + "math" + "strconv" + "strings" +) + +type LargeFileUploadTask[T interface{}] interface { + UploadAsync(progress ProgressCallBack) UploadResult[T] +} + +type largeFileUploadTask[T interface{}] struct { + uploadSession UploadSession + adapter abstractions.RequestAdapter + fileContent []byte + maxSlice float64 + parsableFactory serialization.ParsableFactory +} + +func NewLargeFileUploadTask[T interface{}](adapter abstractions.RequestAdapter, uploadSession UploadSession, fileContent []byte, maxSlice float64, parsableFactory serialization.ParsableFactory) LargeFileUploadTask[T] { + return &largeFileUploadTask[T]{ + adapter: adapter, + uploadSession: uploadSession, + fileContent: fileContent, + maxSlice: maxSlice, + parsableFactory: parsableFactory, + } +} + +// UploadAsync TODO Update function to use go routines +// TODO allow re-uploading slices to a maximum of 10 +func (l *largeFileUploadTask[T]) UploadAsync(progress ProgressCallBack) UploadResult[T] { + /*maxTries := 3 + uploadTries := 0 + + for uploadTries <= maxTries { + fmt.Println(uploadTries) + uploadTries++ + }*/ + + slices := l.createUploadSlices() + for _, slice := range slices { + _, _ = slice.UploadAsync() // check if successful + progress(slice.RangeEnd, slice.TotalSessionLength) + } + panic("implement me") +} + +func (l *largeFileUploadTask[T]) getRangesRemaining() []rangePair { + rangePairs := make([]rangePair, len(l.uploadSession.GetNextExpectedRanges())) + + for i, ranges := range l.uploadSession.GetNextExpectedRanges() { + rangeValues := strings.Split(ranges, "-") + + var startRange float64 + if s, err := strconv.ParseFloat(rangeValues[0], 64); err == nil { + startRange = s + } + + var endRange float64 + if !stringIsNullOrEmpty(rangeValues[1]) { + if s, err := strconv.ParseFloat(rangeValues[1], 64); err == nil { + endRange = s + } + } else { + endRange = float64(len(l.fileContent)) + } + + rangePairs[i] = rangePair{ + Start: startRange, + End: endRange, + } + } + + return rangePairs +} + +func (l largeFileUploadTask[T]) nextSliceLength(rangeBegin float64, rangeEnd float64) float64 { + sizeBasedOnRange := rangeEnd - rangeBegin + 1 + return math.Min(sizeBasedOnRange, l.maxSlice) +} diff --git a/fileuploader/upload_slice.go b/fileuploader/upload_slice.go new file mode 100644 index 0000000..590b9dc --- /dev/null +++ b/fileuploader/upload_slice.go @@ -0,0 +1,66 @@ +package fileuploader + +import ( + "context" + "fmt" + abstractions "github.com/microsoft/kiota-abstractions-go" +) + +type uploadSlice[T interface{}] struct { + RequestAdapter abstractions.RequestAdapter + UrlTemplate string + RangeBegin float64 + RangeEnd float64 + TotalSessionLength float64 + RangeLength float64 + data []byte + errorMappings abstractions.ErrorMappings +} + +func (l *largeFileUploadTask[T]) createUploadSlices() []uploadSlice[T] { + rangesRemaining := l.getRangesRemaining() + + uploadSlices := make([]uploadSlice[T], len(rangesRemaining)) + + for i, v := range rangesRemaining { + uploadSlices[i] = uploadSlice[T]{ + RequestAdapter: l.adapter, + UrlTemplate: *l.uploadSession.GetUploadUrl(), + RangeBegin: v.Start, + RangeEnd: v.End, + } + } + + return uploadSlices +} + +func (u *uploadSlice[T]) UploadAsync() (UploadResult[T], error) { + res := NewUploadResult[T]() + requestInfo := u.createRequestInformation(u.data) + + var uploadResponseHandler abstractions.ResponseHandler = func(response interface{}, errorMappings abstractions.ErrorMappings) (interface{}, error) { + panic("To do") + } + + ctx := context.Background() + ctx = context.WithValue(ctx, abstractions.ResponseHandlerOptionKey, uploadResponseHandler) + + err := u.RequestAdapter.SendNoContent(ctx, requestInfo, u.errorMappings) + if err != nil { + return nil, err + } + return res, nil +} + +func (u *uploadSlice[T]) createRequestInformation(content []byte) *abstractions.RequestInformation { + headers := abstractions.NewRequestHeaders() + headers.Add("Content-Range", fmt.Sprintf("bytes %f-%f/%f", u.RangeLength, u.RangeEnd, u.TotalSessionLength)) + headers.Add("Content-Length", fmt.Sprintf("%f", u.RangeLength)) + + requestInfo := abstractions.NewRequestInformation() + requestInfo.Headers = headers + requestInfo.UrlTemplate = u.UrlTemplate + requestInfo.Method = abstractions.PUT + requestInfo.SetStreamContent(content) + return requestInfo +} diff --git a/utils.go b/utils.go deleted file mode 100644 index fbf489c..0000000 --- a/utils.go +++ /dev/null @@ -1,14 +0,0 @@ -package msgraphgocore - -func ChunkSlice[T interface{}](slice []T, chunkSize int) [][]T { - var chunks [][]T - for i := 0; i < len(slice); i += chunkSize { - end := i + chunkSize - if end > len(slice) { - end = len(slice) - } - - chunks = append(chunks, slice[i:end]) - } - return chunks -}