Skip to content

Commit

Permalink
Large file upload task
Browse files Browse the repository at this point in the history
  • Loading branch information
rkodev committed Jan 22, 2024
1 parent f2d9713 commit c3f6da5
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 126 deletions.
15 changes: 14 additions & 1 deletion batch_request_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (b *BatchRequestCollection) AddBatchRequestStep(reqInfo abstractions.Reques
// Send serializes and sends the batch request to the server
func (b *BatchRequestCollection) Send(ctx context.Context, adapter abstractions.RequestAdapter) (BatchResponse, error) {
// spit request with a max of 19
requestItems := ChunkSlice(b.batchRequest.requests, 19)
requestItems := chunkSlice(b.batchRequest.requests, 19)

if len(requestItems) > b.batchLimit {
return nil, errors.New("exceeded max number of batch requests")
Expand All @@ -56,3 +56,16 @@ func (b *BatchRequestCollection) Send(ctx context.Context, adapter abstractions.

return response, nil
}

func chunkSlice[T interface{}](slice []T, chunkSize int) [][]T {
var chunks [][]T
for i := 0; i < len(slice); i += chunkSize {
end := i + chunkSize
if end > len(slice) {
end = len(slice)
}

chunks = append(chunks, slice[i:end])
}
return chunks
}
57 changes: 0 additions & 57 deletions fileupload/large_file_upload_task.go

This file was deleted.

54 changes: 0 additions & 54 deletions fileupload/upload_request.go

This file was deleted.

82 changes: 82 additions & 0 deletions fileuploader/file_uploader_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package fileuploader

import (
"strings"
"time"
)

type rangePair struct {
Start float64
End float64
}

func stringIsNullOrEmpty(s string) bool {
s = strings.TrimSpace(s)
if s == "" || len(strings.TrimSpace(s)) == 0 {
return false
}
return true
}

type UploadSession interface {
GetExpirationDateTime() *time.Time
GetNextExpectedRanges() []string
GetOdataType() *string
GetUploadUrl() *string
}

type ProgressCallBack func(current float64, total float64)

type UploadResult[T interface{}] interface {
SetItemResponse(response T)
GetItemResponse() T
SetUploadSession(uploadSession UploadSession)
GetUploadSession() UploadSession
SetURI(uri string)
GetURI() string
SetUploadSucceeded(isSuccessful bool)
GetUploadSucceeded() bool
}

func NewUploadResult[T interface{}]() UploadResult[T] {
return &uploadResult[T]{}
}

type uploadResult[T interface{}] struct {
itemResponse T
uploadSession UploadSession
uri string
uploadSucceeded bool
}

func (u *uploadResult[T]) SetItemResponse(response T) {
u.itemResponse = response
}

func (u *uploadResult[T]) GetItemResponse() T {
return u.itemResponse
}

func (u *uploadResult[T]) SetUploadSession(uploadSession UploadSession) {
u.uploadSession = uploadSession
}

func (u *uploadResult[T]) GetUploadSession() UploadSession {
return u.uploadSession
}

func (u *uploadResult[T]) SetURI(uri string) {
u.uri = uri
}

func (u *uploadResult[T]) GetURI() string {
return u.uri
}

func (u *uploadResult[T]) SetUploadSucceeded(isSuccessful bool) {
u.uploadSucceeded = isSuccessful
}

func (u *uploadResult[T]) GetUploadSucceeded() bool {
return u.uploadSucceeded
}
84 changes: 84 additions & 0 deletions fileuploader/large_file_upload_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package fileuploader

import (
abstractions "github.com/microsoft/kiota-abstractions-go"
"github.com/microsoft/kiota-abstractions-go/serialization"
"math"
"strconv"
"strings"
)

type LargeFileUploadTask[T interface{}] interface {
UploadAsync(progress ProgressCallBack) UploadResult[T]
}

type largeFileUploadTask[T interface{}] struct {
uploadSession UploadSession
adapter abstractions.RequestAdapter
fileContent []byte
maxSlice float64
parsableFactory serialization.ParsableFactory
}

func NewLargeFileUploadTask[T interface{}](adapter abstractions.RequestAdapter, uploadSession UploadSession, fileContent []byte, maxSlice float64, parsableFactory serialization.ParsableFactory) LargeFileUploadTask[T] {
return &largeFileUploadTask[T]{
adapter: adapter,
uploadSession: uploadSession,
fileContent: fileContent,
maxSlice: maxSlice,
parsableFactory: parsableFactory,
}
}

// UploadAsync TODO Update function to use go routines
// TODO allow re-uploading slices to a maximum of 10
func (l *largeFileUploadTask[T]) UploadAsync(progress ProgressCallBack) UploadResult[T] {
/*maxTries := 3
uploadTries := 0
for uploadTries <= maxTries {
fmt.Println(uploadTries)
uploadTries++
}*/

slices := l.createUploadSlices()
for _, slice := range slices {
_, _ = slice.UploadAsync() // check if successful
progress(slice.RangeEnd, slice.TotalSessionLength)
}
panic("implement me")
}

func (l *largeFileUploadTask[T]) getRangesRemaining() []rangePair {
rangePairs := make([]rangePair, len(l.uploadSession.GetNextExpectedRanges()))

for i, ranges := range l.uploadSession.GetNextExpectedRanges() {
rangeValues := strings.Split(ranges, "-")

var startRange float64
if s, err := strconv.ParseFloat(rangeValues[0], 64); err == nil {
startRange = s
}

var endRange float64
if !stringIsNullOrEmpty(rangeValues[1]) {
if s, err := strconv.ParseFloat(rangeValues[1], 64); err == nil {
endRange = s
}
} else {
endRange = float64(len(l.fileContent))
}

rangePairs[i] = rangePair{
Start: startRange,
End: endRange,
}
}

return rangePairs
}

func (l largeFileUploadTask[T]) nextSliceLength(rangeBegin float64, rangeEnd float64) float64 {
sizeBasedOnRange := rangeEnd - rangeBegin + 1
return math.Min(sizeBasedOnRange, l.maxSlice)
}
66 changes: 66 additions & 0 deletions fileuploader/upload_slice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package fileuploader

import (
"context"
"fmt"
abstractions "github.com/microsoft/kiota-abstractions-go"
)

type uploadSlice[T interface{}] struct {
RequestAdapter abstractions.RequestAdapter
UrlTemplate string
RangeBegin float64
RangeEnd float64
TotalSessionLength float64
RangeLength float64
data []byte
errorMappings abstractions.ErrorMappings
}

func (l *largeFileUploadTask[T]) createUploadSlices() []uploadSlice[T] {
rangesRemaining := l.getRangesRemaining()

uploadSlices := make([]uploadSlice[T], len(rangesRemaining))

for i, v := range rangesRemaining {
uploadSlices[i] = uploadSlice[T]{
RequestAdapter: l.adapter,
UrlTemplate: *l.uploadSession.GetUploadUrl(),
RangeBegin: v.Start,
RangeEnd: v.End,
}
}

return uploadSlices
}

func (u *uploadSlice[T]) UploadAsync() (UploadResult[T], error) {
res := NewUploadResult[T]()
requestInfo := u.createRequestInformation(u.data)

var uploadResponseHandler abstractions.ResponseHandler = func(response interface{}, errorMappings abstractions.ErrorMappings) (interface{}, error) {
panic("To do")
}

ctx := context.Background()
ctx = context.WithValue(ctx, abstractions.ResponseHandlerOptionKey, uploadResponseHandler)

err := u.RequestAdapter.SendNoContent(ctx, requestInfo, u.errorMappings)
if err != nil {
return nil, err
}
return res, nil
}

func (u *uploadSlice[T]) createRequestInformation(content []byte) *abstractions.RequestInformation {
headers := abstractions.NewRequestHeaders()
headers.Add("Content-Range", fmt.Sprintf("bytes %f-%f/%f", u.RangeLength, u.RangeEnd, u.TotalSessionLength))
headers.Add("Content-Length", fmt.Sprintf("%f", u.RangeLength))

requestInfo := abstractions.NewRequestInformation()
requestInfo.Headers = headers
requestInfo.UrlTemplate = u.UrlTemplate
requestInfo.Method = abstractions.PUT
requestInfo.SetStreamContent(content)
return requestInfo
}
14 changes: 0 additions & 14 deletions utils.go

This file was deleted.

0 comments on commit c3f6da5

Please sign in to comment.