Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加了当下载视频时,多线程分段下载的功能 #629

Merged
merged 12 commits into from
Mar 1, 2020
Merged
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var (
YoukuPassword string
// RetryTimes how many times to retry when the download failed
RetryTimes int

MultiThread bool
)

// FakeHeaders fake http headers
Expand Down
296 changes: 295 additions & 1 deletion downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package downloader

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -175,6 +180,289 @@ func Save(
return nil
}

func MultiThreadSave(
urlData URL, refer, fileName string, bar *pb.ProgressBar, chunkSizeMB, threadNum int,
) error {
filePath, err := utils.FilePath(fileName, urlData.Ext, false)
if err != nil {
return err
}
fileSize, exists, err := utils.FileSize(filePath)
if err != nil {
return err
}
if bar == nil {
bar = progressBar(urlData.Size)
bar.Start()
}
// Skip segment file
// TODO: Live video URLs will not return the size
if exists && fileSize == urlData.Size {
bar.Add64(fileSize)
return nil
}
tmpFilePath := filePath + ".download"
tmpFileSize, tmpExists, err := utils.FileSize(tmpFilePath)
if err != nil {
return err
}
if tmpExists {
if tmpFileSize == urlData.Size {
bar.Add64(urlData.Size)
return os.Rename(tmpFilePath, filePath)
} else {
err = os.Remove(tmpFilePath)
if err != nil {
return err
}
}
}

// Scan all parts
parts, err := readDirAllFilePart(filePath, fileName, urlData.Ext)
if err != nil {
return err
}

var unfinishedPart []*FilePartMeta
savedSize := int64(0)
if len(parts) > 0 {
lastEnd := int64(-1)
for i, part := range parts {
// If some parts are lost, re-insert one part.
if part.Start-lastEnd != 1 {
newPart := &FilePartMeta{
Index: part.Index - 0.000001,
Start: lastEnd + 1,
End: part.Start - 1,
Cur: lastEnd + 1,
}
tmp := append([]*FilePartMeta{}, parts[:i]...)
tmp = append(tmp, newPart)
parts = append(tmp, parts[i:]...)
unfinishedPart = append(unfinishedPart, newPart)
}
// When the part has been downloaded in whole, part.Cur is equal to part.End + 1
if part.Cur <= part.End+1 {
savedSize += part.Cur - part.Start
if part.Cur < part.End+1 {
unfinishedPart = append(unfinishedPart, part)
}
} else {
// The size of this part has been saved greater than the part size, delete it transparently and re-download.
err = os.Remove(filePartPath(filePath, part))
if err != nil {
return err
}
part.Cur = part.Start
unfinishedPart = append(unfinishedPart, part)
}
lastEnd = part.End
}
if lastEnd != urlData.Size-1 {
newPart := &FilePartMeta{
Index: parts[len(parts)-1].Index + 1,
Start: lastEnd + 1,
End: urlData.Size - 1,
Cur: lastEnd + 1,
}
parts = append(parts, newPart)
unfinishedPart = append(unfinishedPart, newPart)
}
} else {
var start, end, partSize int64
var i float32
iawia002 marked this conversation as resolved.
Show resolved Hide resolved
partSize = urlData.Size / int64(threadNum)
i = 0
for start < urlData.Size {
end = start + partSize - 1
if end > urlData.Size {
end = urlData.Size - 1
} else if int(i+1) == threadNum && end < urlData.Size {
end = urlData.Size - 1
}
part := &FilePartMeta{
Index: i,
Start: start,
End: end,
Cur: start,
}
parts = append(parts, part)
unfinishedPart = append(unfinishedPart, part)
start = end + 1
i++
}
}
if savedSize > 0 {
bar.Add64(savedSize)
if savedSize == urlData.Size {
return mergeMultiPart(filePath, parts)
}
}

wgp := utils.NewWaitGroupPool(threadNum)
var errs []error
for _, part := range unfinishedPart {
wgp.Add()
go func(part *FilePartMeta) {
file, err := os.OpenFile(filePartPath(filePath, part), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
errs = append(errs, err)
return
}
defer func() {
file.Close()
wgp.Done()
}()
var end, chunkSize int64
headers := map[string]string{
"Referer": refer,
}
if chunkSizeMB <= 0 {
chunkSize = part.End - part.Start + 1
} else {
chunkSize = int64(chunkSizeMB) * 1024 * 1024
}
end = computeEnd(part.Cur, chunkSize, part.End)
remainingSize := part.End - part.Cur + 1
if part.Cur == part.Start {
// Only write part to new file.
err = writeFilePartMeta(file, part)
if err != nil {
errs = append(errs, err)
return
}
}
for remainingSize > 0 {
end = computeEnd(part.Cur, chunkSize, part.End)
headers["Range"] = fmt.Sprintf("bytes=%d-%d", part.Cur, end)
temp := part.Cur
for i := 0; ; i++ {
written, err := writeFile(urlData.URL, file, headers, bar)
if err == nil {
remainingSize -= chunkSize
break
} else if i+1 >= config.RetryTimes {
errs = append(errs, err)
return
}
temp += written
headers["Range"] = fmt.Sprintf("bytes=%d-%d", temp, end)
}
}
part.Cur = end + 1
}(part)
}
wgp.Wait()
if len(errs) > 0 {
return errs[0]
}
return mergeMultiPart(filePath, parts)
}

func filePartPath(filepath string, part *FilePartMeta) string {
return fmt.Sprintf("%s.part%f", filepath, part.Index)
}

func computeEnd(s, chunkSize, max int64) int64 {
var end int64
end = s + chunkSize - 1
if end > max {
end = max
}
return end
}

func readDirAllFilePart(filePath, filename, extname string) ([]*FilePartMeta, error) {
dirPath := filepath.Dir(filePath)
dir, err := os.Open(dirPath)
if err != nil {
return nil, err
}
defer dir.Close()
fns, err := dir.Readdir(0)
if err != nil {
return nil, err
}
var metas []*FilePartMeta
reg := regexp.MustCompile(fmt.Sprintf("%s.%s.part.+", filename, extname))
for _, fn := range fns {
if reg.MatchString(fn.Name()) {
meta, err := parseFilePartMeta(path.Join(dirPath, fn.Name()), fn.Size())
if err != nil {
return nil, err
}
metas = append(metas, meta)
}
}
sort.Slice(metas, func(i, j int) bool {
return metas[i].Index < metas[j].Index
})
return metas, nil
}

func parseFilePartMeta(filepath string, fileSize int64) (*FilePartMeta, error) {
meta := new(FilePartMeta)
size := binary.Size(*meta)
file, err := os.OpenFile(filepath, os.O_RDWR, 0666)
if err != nil {
return nil, err
}
defer file.Close()
var buf [512]byte
readSize, err := file.ReadAt(buf[0:size], 0)
if err != nil && err != io.EOF {
return nil, err
}
if readSize < size {
return nil, fmt.Errorf("The file has been broked, please delete all part files and re-download.\n")
}
err = binary.Read(bytes.NewBuffer(buf[:size]), binary.LittleEndian, meta)
if err != nil {
return nil, err
}
savedSize := fileSize - int64(binary.Size(meta))
meta.Cur = meta.Start + savedSize
return meta, nil
}

func writeFilePartMeta(file *os.File, meta *FilePartMeta) error {
return binary.Write(file, binary.LittleEndian, meta)
}

func mergeMultiPart(filepath string, parts []*FilePartMeta) error {
tempFilePath := filepath + ".download"
tempFile, err := os.OpenFile(tempFilePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return err
}
var partFiles []*os.File
defer func() {
for _, f := range partFiles {
f.Close()
os.Remove(f.Name())
}
}()
for _, part := range parts {
file, err := os.Open(filePartPath(filepath, part))
if err != nil {
return err
}
partFiles = append(partFiles, file)
_, err = file.Seek(int64(binary.Size(part)), 0)
if err != nil {
return err
}
_, err = io.Copy(tempFile, file)
if err != nil {
return err
}
}
tempFile.Close()
err = os.Rename(tempFilePath, filepath)
return err
}

// Download download urls
func Download(v Data, refer string, chunkSizeMB int) error {
v.genSortedStreams()
Expand Down Expand Up @@ -273,7 +561,13 @@ func Download(v Data, refer string, chunkSizeMB int) error {
bar.Start()
if len(data.URLs) == 1 {
// only one fragment
err := Save(data.URLs[0], refer, title, bar, chunkSizeMB)
var err error
if config.MultiThread {
err = MultiThreadSave(data.URLs[0], refer, title, bar, chunkSizeMB, config.ThreadNumber)
} else {
err = Save(data.URLs[0], refer, title, bar, chunkSizeMB)
}

if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions downloader/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,10 @@ type Aria2Input struct {
// For a simple download, only add headers
Header []string `json:"header"`
}

type FilePartMeta struct {
Index float32
Start int64
End int64
Cur int64
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
)

func init() {
flag.BoolVar(&config.MultiThread, "m", false, "Multiple threads to download single video")
flag.BoolVar(&config.Debug, "d", false, "Debug mode")
flag.BoolVar(&config.Version, "v", false, "Show version")
flag.BoolVar(&config.InfoOnly, "i", false, "Information only")
Expand Down