Skip to content

Commit

Permalink
增加了当下载视频时,多线程分段下载的功能 (#629)
Browse files Browse the repository at this point in the history
* 增加了当下载视频时,多线程分段下载的功能
1. 使用-m 参数启用多线程下载,线程数量由-n决定
2. 代码上是增加Save的可选版本MultiThreadSave版本。与进度条兼容。
3. MultiThreadSave的实现:
   将文件分成指定的thread数量的part文件,每个part
文件记录着在整个文件中的偏移量,以及本part当前已下载的数据大小。恢复下载时,会同意所有part的信息,以支持继续下载。
当重新启动后,part数量与线程数量不同时,有以下两种情况:
   1. 线程数量小于现存part数,则按线程池调度去完成所有part(使用信号量控制)
   2. 线程数大于现存part数,则多于的线程不启用。
 下载完成后,合并所有part,然后删除part文件(因此可能需要两倍大小的磁盘容量,回续可能优化)
 线程调度方案也可能会优化,以尽量利用可能的线程

* [漏传了一个文件] 增加了当下载视频时,多线程分段下载的功能
1. 使用-m 参数启用多线程下载,线程数量由-n决定
2. 代码上是增加Save的可选版本MultiThreadSave版本。与进度条兼容。
3. MultiThreadSave的实现:
   将文件分成指定的thread数量的part文件,每个part
文件记录着在整个文件中的偏移量,以及本part当前已下载的数据大小。恢复下载时,会同意所有part的信息,以支持继续下载。
当重新启动后,part数量与线程数量不同时,有以下两种情况:
   1. 线程数量小于现存part数,则按线程池调度去完成所有part(使用信号量控制)
   2. 线程数大于现存part数,则多于的线程不启用。
 下载完成后,合并所有part,然后删除part文件(因此可能需要两倍大小的磁盘容量,回续可能优化)
 线程调度方案也可能会优化,以尽量利用可能的线程

* 优化代码结构

* 1. 复原了Import包的位置
2. 规范了函数命名
3. 规范了注释
3. 优化了review中提到的代码,删掉了一些无用的变量

* 1. 修复合并后文件大小增加的bug

* '合并后删掉part文件'

* 1. 删除Signal

* 1. 写文件时增加O_WRONLY flag
2. errors.new() 改为 fmt.Errorf()

* 1. 修复恢复下载后文件数据不正常的bug(搞错了part下载完成时的Cur值)
2. 删掉了mustReadFile,合并到parseFilePartMeta
3. 仅在part文件为新创建时向文件写part meta。(因为文件打开模式为append,所以后续的writeFilePart
Meta都会变成追加,倒是文件数据出错)。因此,part.Cur字段只在运行时有效。从part文件中读出的Cur并不代表实际的Cur。实际的Cur由part的文件大小 - part.Start来确定。
4. 增加对缺失part文件的检查。
当恢复下载检测到有part丢失时(part.End - lastEnd != 1),则认定part丢失,在中间插入新part。
5. 增加对合并时中断下载的检查。恢复下载时如果存在.download但是.download文件大小不等于实际文件大小,则删掉.download文件重新合并。

* 1. 补充对删除末尾的part的检查与恢复

* 1. 修复错误的引用dir方式

* 1. 修复恢复下载时对end的错误计算
  • Loading branch information
M1178475702 authored Mar 1, 2020
1 parent b0a84cf commit 27a9baa
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 1 deletion.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ var (
YoukuPassword string
// RetryTimes how many times to retry when the download failed
RetryTimes int

MultiThread bool
)

// FakeHeaders fake http headers
Expand Down
296 changes: 295 additions & 1 deletion downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package downloader

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -175,6 +180,289 @@ func Save(
return nil
}

func MultiThreadSave(
urlData URL, refer, fileName string, bar *pb.ProgressBar, chunkSizeMB, threadNum int,
) error {
filePath, err := utils.FilePath(fileName, urlData.Ext, false)
if err != nil {
return err
}
fileSize, exists, err := utils.FileSize(filePath)
if err != nil {
return err
}
if bar == nil {
bar = progressBar(urlData.Size)
bar.Start()
}
// Skip segment file
// TODO: Live video URLs will not return the size
if exists && fileSize == urlData.Size {
bar.Add64(fileSize)
return nil
}
tmpFilePath := filePath + ".download"
tmpFileSize, tmpExists, err := utils.FileSize(tmpFilePath)
if err != nil {
return err
}
if tmpExists {
if tmpFileSize == urlData.Size {
bar.Add64(urlData.Size)
return os.Rename(tmpFilePath, filePath)
} else {
err = os.Remove(tmpFilePath)
if err != nil {
return err
}
}
}

// Scan all parts
parts, err := readDirAllFilePart(filePath, fileName, urlData.Ext)
if err != nil {
return err
}

var unfinishedPart []*FilePartMeta
savedSize := int64(0)
if len(parts) > 0 {
lastEnd := int64(-1)
for i, part := range parts {
// If some parts are lost, re-insert one part.
if part.Start-lastEnd != 1 {
newPart := &FilePartMeta{
Index: part.Index - 0.000001,
Start: lastEnd + 1,
End: part.Start - 1,
Cur: lastEnd + 1,
}
tmp := append([]*FilePartMeta{}, parts[:i]...)
tmp = append(tmp, newPart)
parts = append(tmp, parts[i:]...)
unfinishedPart = append(unfinishedPart, newPart)
}
// When the part has been downloaded in whole, part.Cur is equal to part.End + 1
if part.Cur <= part.End+1 {
savedSize += part.Cur - part.Start
if part.Cur < part.End+1 {
unfinishedPart = append(unfinishedPart, part)
}
} else {
// The size of this part has been saved greater than the part size, delete it transparently and re-download.
err = os.Remove(filePartPath(filePath, part))
if err != nil {
return err
}
part.Cur = part.Start
unfinishedPart = append(unfinishedPart, part)
}
lastEnd = part.End
}
if lastEnd != urlData.Size-1 {
newPart := &FilePartMeta{
Index: parts[len(parts)-1].Index + 1,
Start: lastEnd + 1,
End: urlData.Size - 1,
Cur: lastEnd + 1,
}
parts = append(parts, newPart)
unfinishedPart = append(unfinishedPart, newPart)
}
} else {
var start, end, partSize int64
var i float32
partSize = urlData.Size / int64(threadNum)
i = 0
for start < urlData.Size {
end = start + partSize - 1
if end > urlData.Size {
end = urlData.Size - 1
} else if int(i+1) == threadNum && end < urlData.Size {
end = urlData.Size - 1
}
part := &FilePartMeta{
Index: i,
Start: start,
End: end,
Cur: start,
}
parts = append(parts, part)
unfinishedPart = append(unfinishedPart, part)
start = end + 1
i++
}
}
if savedSize > 0 {
bar.Add64(savedSize)
if savedSize == urlData.Size {
return mergeMultiPart(filePath, parts)
}
}

wgp := utils.NewWaitGroupPool(threadNum)
var errs []error
for _, part := range unfinishedPart {
wgp.Add()
go func(part *FilePartMeta) {
file, err := os.OpenFile(filePartPath(filePath, part), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
errs = append(errs, err)
return
}
defer func() {
file.Close()
wgp.Done()
}()
var end, chunkSize int64
headers := map[string]string{
"Referer": refer,
}
if chunkSizeMB <= 0 {
chunkSize = part.End - part.Start + 1
} else {
chunkSize = int64(chunkSizeMB) * 1024 * 1024
}
end = computeEnd(part.Cur, chunkSize, part.End)
remainingSize := part.End - part.Cur + 1
if part.Cur == part.Start {
// Only write part to new file.
err = writeFilePartMeta(file, part)
if err != nil {
errs = append(errs, err)
return
}
}
for remainingSize > 0 {
end = computeEnd(part.Cur, chunkSize, part.End)
headers["Range"] = fmt.Sprintf("bytes=%d-%d", part.Cur, end)
temp := part.Cur
for i := 0; ; i++ {
written, err := writeFile(urlData.URL, file, headers, bar)
if err == nil {
remainingSize -= chunkSize
break
} else if i+1 >= config.RetryTimes {
errs = append(errs, err)
return
}
temp += written
headers["Range"] = fmt.Sprintf("bytes=%d-%d", temp, end)
}
}
part.Cur = end + 1
}(part)
}
wgp.Wait()
if len(errs) > 0 {
return errs[0]
}
return mergeMultiPart(filePath, parts)
}

func filePartPath(filepath string, part *FilePartMeta) string {
return fmt.Sprintf("%s.part%f", filepath, part.Index)
}

func computeEnd(s, chunkSize, max int64) int64 {
var end int64
end = s + chunkSize - 1
if end > max {
end = max
}
return end
}

func readDirAllFilePart(filePath, filename, extname string) ([]*FilePartMeta, error) {
dirPath := filepath.Dir(filePath)
dir, err := os.Open(dirPath)
if err != nil {
return nil, err
}
defer dir.Close()
fns, err := dir.Readdir(0)
if err != nil {
return nil, err
}
var metas []*FilePartMeta
reg := regexp.MustCompile(fmt.Sprintf("%s.%s.part.+", filename, extname))
for _, fn := range fns {
if reg.MatchString(fn.Name()) {
meta, err := parseFilePartMeta(path.Join(dirPath, fn.Name()), fn.Size())
if err != nil {
return nil, err
}
metas = append(metas, meta)
}
}
sort.Slice(metas, func(i, j int) bool {
return metas[i].Index < metas[j].Index
})
return metas, nil
}

func parseFilePartMeta(filepath string, fileSize int64) (*FilePartMeta, error) {
meta := new(FilePartMeta)
size := binary.Size(*meta)
file, err := os.OpenFile(filepath, os.O_RDWR, 0666)
if err != nil {
return nil, err
}
defer file.Close()
var buf [512]byte
readSize, err := file.ReadAt(buf[0:size], 0)
if err != nil && err != io.EOF {
return nil, err
}
if readSize < size {
return nil, fmt.Errorf("The file has been broked, please delete all part files and re-download.\n")
}
err = binary.Read(bytes.NewBuffer(buf[:size]), binary.LittleEndian, meta)
if err != nil {
return nil, err
}
savedSize := fileSize - int64(binary.Size(meta))
meta.Cur = meta.Start + savedSize
return meta, nil
}

func writeFilePartMeta(file *os.File, meta *FilePartMeta) error {
return binary.Write(file, binary.LittleEndian, meta)
}

func mergeMultiPart(filepath string, parts []*FilePartMeta) error {
tempFilePath := filepath + ".download"
tempFile, err := os.OpenFile(tempFilePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return err
}
var partFiles []*os.File
defer func() {
for _, f := range partFiles {
f.Close()
os.Remove(f.Name())
}
}()
for _, part := range parts {
file, err := os.Open(filePartPath(filepath, part))
if err != nil {
return err
}
partFiles = append(partFiles, file)
_, err = file.Seek(int64(binary.Size(part)), 0)
if err != nil {
return err
}
_, err = io.Copy(tempFile, file)
if err != nil {
return err
}
}
tempFile.Close()
err = os.Rename(tempFilePath, filepath)
return err
}

// Download download urls
func Download(v Data, refer string, chunkSizeMB int) error {
v.genSortedStreams()
Expand Down Expand Up @@ -273,7 +561,13 @@ func Download(v Data, refer string, chunkSizeMB int) error {
bar.Start()
if len(data.URLs) == 1 {
// only one fragment
err := Save(data.URLs[0], refer, title, bar, chunkSizeMB)
var err error
if config.MultiThread {
err = MultiThreadSave(data.URLs[0], refer, title, bar, chunkSizeMB, config.ThreadNumber)
} else {
err = Save(data.URLs[0], refer, title, bar, chunkSizeMB)
}

if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions downloader/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,10 @@ type Aria2Input struct {
// For a simple download, only add headers
Header []string `json:"header"`
}

type FilePartMeta struct {
Index float32
Start int64
End int64
Cur int64
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
)

func init() {
flag.BoolVar(&config.MultiThread, "m", false, "Multiple threads to download single video")
flag.BoolVar(&config.Debug, "d", false, "Debug mode")
flag.BoolVar(&config.Version, "v", false, "Show version")
flag.BoolVar(&config.InfoOnly, "i", false, "Information only")
Expand Down

2 comments on commit 27a9baa

@john-black-3k
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The README.md file should be updated to document the use of the -m parameter.

@iawia002
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm considering applying this feature to all types of videos, and I'll update the documentation at that time

Please sign in to comment.