Skip to content

Commit

Permalink
Merge pull request #257 from fengziren/master
Browse files Browse the repository at this point in the history
✨ feat(client): add UploadLargeFile
  • Loading branch information
mleone87 authored May 14, 2023
2 parents 8da8251 + 0217b72 commit 2a02bab
Showing 1 changed file with 52 additions and 14 deletions.
66 changes: 52 additions & 14 deletions proxmox/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (c *Client) GetJsonRetryable(url string, data *map[string]interface{}, trie
if strings.Contains(statErr.Error(), "500 no such resource") {
return statErr
}
//fmt.Printf("[DEBUG][GetJsonRetryable] Sleeping for %d seconds before asking url %s", ii+1, url)
// fmt.Printf("[DEBUG][GetJsonRetryable] Sleeping for %d seconds before asking url %s", ii+1, url)
time.Sleep(time.Duration(ii+1) * time.Second)
}
return statErr
Expand All @@ -160,7 +160,7 @@ func (c *Client) GetNodeList() (list map[string]interface{}, err error) {
// For resource types that can be in a disabled state, disabled resources
// will not be returned
func (c *Client) GetResourceList(resourceType string) (list map[string]interface{}, err error) {
var endpoint = "/cluster/resources"
endpoint := "/cluster/resources"
if resourceType != "" {
endpoint = fmt.Sprintf("%s?type=%s", endpoint, resourceType)
}
Expand Down Expand Up @@ -450,8 +450,10 @@ func (c *Client) WaitForCompletion(taskResponse map[string]interface{}) (waitExi
return "", fmt.Errorf("Wait timeout for:" + taskUpid)
}

var rxTaskNode = regexp.MustCompile("UPID:(.*?):")
var rxExitStatusSuccess = regexp.MustCompile(`^(OK|WARNINGS)`)
var (
rxTaskNode = regexp.MustCompile("UPID:(.*?):")
rxExitStatusSuccess = regexp.MustCompile(`^(OK|WARNINGS)`)
)

func (c *Client) GetTaskExitstatus(taskUpid string) (exitStatus interface{}, err error) {
node := rxTaskNode.FindStringSubmatch(taskUpid)[1]
Expand Down Expand Up @@ -525,7 +527,7 @@ func (c *Client) DeleteVmParams(vmr *VmRef, params map[string]interface{}) (exit
return "", err
}

//Remove HA if required
// Remove HA if required
if vmr.haState != "" {
url := fmt.Sprintf("/cluster/ha/resources/%d", vmr.vmId)
resp, err := c.session.Delete(url, nil, nil)
Expand Down Expand Up @@ -623,7 +625,6 @@ func (c *Client) CreateLxcContainer(node string, vmParams map[string]interface{}
}

func (c *Client) CloneLxcContainer(vmr *VmRef, vmParams map[string]interface{}) (exitStatus string, err error) {

reqbody := ParamsToBody(vmParams)
url := fmt.Sprintf("/nodes/%s/lxc/%s/clone", vmr.node, vmParams["vmid"])
resp, err := c.session.Post(url, nil, nil, &reqbody)
Expand Down Expand Up @@ -890,7 +891,6 @@ func (c *Client) CreateVMDisk(
fullDiskName string,
diskParams map[string]interface{},
) error {

reqbody := ParamsToBody(diskParams)
url := fmt.Sprintf("/nodes/%s/storage/%s/content", nodeName, storageName)
resp, err := c.session.Post(url, nil, nil, &reqbody)
Expand Down Expand Up @@ -946,7 +946,6 @@ func (c *Client) createVMDisks(
// CreateNewDisk - This method allows simpler disk creation for direct client users
// It should work for any existing container and virtual machine
func (c *Client) CreateNewDisk(vmr *VmRef, disk string, volume string) (exitStatus interface{}, err error) {

reqbody := ParamsToBody(map[string]interface{}{disk: volume})
url := fmt.Sprintf("/nodes/%s/%s/%d/config", vmr.node, vmr.vmType, vmr.vmId)
resp, err := c.session.Put(url, nil, nil, &reqbody)
Expand Down Expand Up @@ -1342,6 +1341,47 @@ func (c *Client) Upload(node string, storage string, contentType string, filenam
return nil
}

func (c *Client) UploadLargeFile(node string, storage string, contentType string, filename string, filesize int64, file io.Reader) error {
var contentLength int64

var body io.Reader
var mimetype string
var err error
body, mimetype, contentLength, err = createStreamedUploadBody(contentType, filename, filesize, file)
if err != nil {
return err
}

url := fmt.Sprintf("%s/nodes/%s/storage/%s/upload", c.session.ApiUrl, node, storage)
headers := c.session.Headers.Clone()
headers.Add("Content-Type", mimetype)
headers.Add("Accept", "application/json")
req, err := c.session.NewRequest(http.MethodPost, url, &headers, body)
if err != nil {
return err
}

req.ContentLength = contentLength

resp, err := c.session.Do(req)
if err != nil {
return err
}

taskResponse, err := ResponseJSON(resp)
if err != nil {
return err
}
exitStatus, err := c.WaitForCompletion(taskResponse)
if err != nil {
return err
}
if exitStatus != exitStatusSuccess {
return fmt.Errorf("moving file to destination failed: %v", exitStatus)
}
return nil
}

func createUploadBody(contentType string, filename string, r io.Reader) (io.Reader, string, error) {
var buf bytes.Buffer
w := multipart.NewWriter(&buf)
Expand Down Expand Up @@ -1486,7 +1526,6 @@ func (c *Client) ReadVMHA(vmr *VmRef) (err error) {
}
}
return

}

func (c *Client) UpdateVMHA(vmr *VmRef, haState string, haGroup string) (exitStatus interface{}, err error) {
Expand All @@ -1495,7 +1534,7 @@ func (c *Client) UpdateVMHA(vmr *VmRef, haState string, haGroup string) (exitSta
return
}

//Remove HA
// Remove HA
if haState == "" {
url := fmt.Sprintf("/cluster/ha/resources/%d", vmr.vmId)
resp, err := c.session.Delete(url, nil, nil)
Expand All @@ -1512,7 +1551,7 @@ func (c *Client) UpdateVMHA(vmr *VmRef, haState string, haGroup string) (exitSta
return nil, err
}

//Activate HA
// Activate HA
if vmr.haState == "" {
paramMap := map[string]interface{}{
"sid": vmr.vmId,
Expand All @@ -1535,7 +1574,7 @@ func (c *Client) UpdateVMHA(vmr *VmRef, haState string, haGroup string) (exitSta
}
}

//Set wanted state
// Set wanted state
paramMap := map[string]interface{}{
"state": haState,
"group": haGroup,
Expand Down Expand Up @@ -1583,8 +1622,7 @@ func (c *Client) DeletePool(poolid string) error {
return c.Delete("/pools/" + poolid)
}

//permissions check

// permissions check
func (c *Client) GetUserPermissions(id UserID, path string) (permissions []string, err error) {
existence, err := CheckUserExistence(id, c)
if err != nil {
Expand Down

0 comments on commit 2a02bab

Please sign in to comment.