From c58c50327c1d3c45ed1543066934d2fa1e7c0797 Mon Sep 17 00:00:00 2001 From: dudaodong Date: Tue, 5 Mar 2024 11:42:03 +0800 Subject: [PATCH] doc: add doc for ChunkRead and ParallelChunkRead --- README.md | 4 ++ README_zh-CN.md | 6 ++ docs/api/packages/fileutil.md | 116 +++++++++++++++++++++++++++++++ docs/en/api/packages/fileutil.md | 114 ++++++++++++++++++++++++++++++ fileutil/file.go | 52 ++++++++------ fileutil/file_example_test.go | 63 +++++++++++++++++ fileutil/file_test.go | 53 ++++++++++++++ fileutil/testdata/test1.csv | 3 - 8 files changed, 386 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index ff8e2cec..d67eab10 100644 --- a/README.md +++ b/README.md @@ -729,6 +729,10 @@ import "github.com/duke-git/lancet/v2/fileutil" [[play](https://go.dev/play/p/GhLS6d8lH_g)] - **ReadFile** : read file or url. [[doc](https://github.com/duke-git/lancet/blob/main/docs/en/api/packages/fileutil.md#ReadFile)] +- **ChunkRead** : reads a block from the file at the specified offset and returns all lines within the block. + [[doc](https://github.com/duke-git/lancet/blob/main/docs/en/api/packages/fileutil.md#ChunkRead)] +- **ParallelChunkRead** : reads the file in parallel and send each chunk of lines to the specified channel. + [[doc](https://github.com/duke-git/lancet/blob/main/docs/en/api/packages/fileutil.md#ParallelChunkRead)]

10. Formatter contains some functions for data formatting.        index

diff --git a/README_zh-CN.md b/README_zh-CN.md index 6e212912..e11241a7 100644 --- a/README_zh-CN.md +++ b/README_zh-CN.md @@ -728,6 +728,12 @@ import "github.com/duke-git/lancet/v2/fileutil" [[play](https://go.dev/play/p/GhLS6d8lH_g)] - **ReadFile** : 读取文件或者URL。 [[doc](https://github.com/duke-git/lancet/blob/main/docs/api/packages/fileutil.md#ReadFile)] +- **ChunkRead** : 从文件的指定偏移读取块并返回块内所有行。 + [[doc](https://github.com/duke-git/lancet/blob/main/docs/api/packages/fileutil.md#ChunkRead)] +- **ParallelChunkRead** : 并行读取文件并将每个块的行发送到指定通道。 + [[doc](https://github.com/duke-git/lancet/blob/main/docs/api/packages/fileutil.md#ParallelChunkRead)] + +

10. formatter 格式化器包含一些数据格式化处理方法。       回到目录

diff --git a/docs/api/packages/fileutil.md b/docs/api/packages/fileutil.md index dda1043d..830dfc01 100644 --- a/docs/api/packages/fileutil.md +++ b/docs/api/packages/fileutil.md @@ -50,6 +50,8 @@ import ( - [WriteStringToFile](#WriteStringToFile) - [WriteBytesToFile](#WriteBytesToFile) - [ReadFile](#ReadFile) +- [ChunkRead](#ChunkRead) +- [ParallelChunkRead](#ParallelChunkRead)
@@ -955,9 +957,123 @@ func main() { if err != nil { return } + fmt.Println(string(dat)) + // Output: // User-agent: * // Disallow: /deny } +``` + +### ChunkRead + +

从文件的指定偏移读取块并返回块内所有行。

+ +函数签名: + +```go +func ChunkRead(file *os.File, offset int64, size int, bufPool *sync.Pool) ([]string, error) +``` + +示例: + +```go +package main + +import ( + "fmt" + "github.com/duke-git/lancet/v2/fileutil" +) + +func main() { + const mb = 1024 * 1024 + const defaultChunkSizeMB = 100 + + // test1.csv file content: + // Lili,22,female + // Jim,21,male + filePath := "./testdata/test1.csv" // 替换为你的文件路径 + f, err := os.Open(filePath) + if err != nil { + return + } + + defer f.Close() + + var bufPool = sync.Pool{ + New: func() interface{} { + return make([]byte, 0, defaultChunkSizeMB*mb) + }, + } + + lines, err := fileutil.ChunkRead(f, 0, 100, &bufPool) + if err != nil { + return + } + + fmt.Println(lines[0]) + fmt.Println(lines[1]) + + // Output: + // Lili,22,female + // Jim,21,male +} +``` + +### ParallelChunkRead + +

并行读取文件并将每个块的行发送到指定通道。

+ +函数签名: + +```go +// filePath:文件路径 +// chunkSizeMB: 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整 +// maxGoroutine: 并发读取分块的数量,设置为0时使用CPU核心数 +// linesCh: 用于接收返回结果的通道。 +func ParallelChunkRead(filePath string, linesCh chan<- []string, chunkSizeMB, maxGoroutine int) error +``` + +示例: + +```go +package main + +import ( + "fmt" + "github.com/duke-git/lancet/v2/fileutil" +) + +func main() { + const mb = 1024 * 1024 + const defaultChunkSizeMB = 100 // 默认值 + + numParsers := runtime.NumCPU() + + linesCh := make(chan []string, numParsers) + + // test1.csv file content: + // Lili,22,female + // Jim,21,male + filePath := "./testdata/test1.csv" + + go fileutil.ParallelChunkRead(filePath, linesCh, defaultChunkSizeMB, numParsers) + + var totalLines int + for lines := range linesCh { + totalLines += len(lines) + + for _, line := range lines { + fmt.Println(line) + } + } + + fmt.Println(totalLines) + + // Output: + // Lili,22,female + // Jim,21,male + // 2 +} ``` \ No newline at end of file diff --git a/docs/en/api/packages/fileutil.md b/docs/en/api/packages/fileutil.md index 5729aa26..b4a449ca 100644 --- a/docs/en/api/packages/fileutil.md +++ b/docs/en/api/packages/fileutil.md @@ -50,6 +50,8 @@ import ( - [WriteStringToFile](#WriteStringToFile) - [WriteBytesToFile](#WriteBytesToFile) - [ReadFile](#ReadFile) +- [ChunkRead](#ChunkRead) +- [ParallelChunkRead](#ParallelChunkRead)
@@ -961,3 +963,115 @@ func main() { // Disallow: /deny } ``` + +### ChunkRead + +

reads a block from the file at the specified offset and returns all lines within the block.

+ +Signature : + +```go +func ChunkRead(file *os.File, offset int64, size int, bufPool *sync.Pool) ([]string, error) +``` + +Example: + +```go +package main + +import ( + "fmt" + "github.com/duke-git/lancet/v2/fileutil" +) + +func main() { + const mb = 1024 * 1024 + const defaultChunkSizeMB = 100 + + // test1.csv file content: + // Lili,22,female + // Jim,21,male + filePath := "./testdata/test1.csv" + f, err := os.Open(filePath) + if err != nil { + return + } + + defer f.Close() + + var bufPool = sync.Pool{ + New: func() interface{} { + return make([]byte, 0, defaultChunkSizeMB*mb) + }, + } + + lines, err := fileutil.ChunkRead(f, 0, 100, &bufPool) + if err != nil { + return + } + + fmt.Println(lines[0]) + fmt.Println(lines[1]) + + // Output: + // Lili,22,female + // Jim,21,male +} +``` + +### ParallelChunkRead + +

Reads the file in parallel and send each chunk of lines to the specified channel.

+ +Signature : + +```go +// filePath: file path. +// chunkSizeMB: The size of the block (in MB, the default is 100MB when set to 0). Setting it too large will be detrimental. Adjust it as appropriate. +// maxGoroutine: The number of concurrent read chunks, the number of CPU cores used when set to 0. +// linesCh: The channel used to receive the returned results. +func ParallelChunkRead(filePath string, linesCh chan<- []string, chunkSizeMB, maxGoroutine int) error +``` + +Example: + +```go +package main + +import ( + "fmt" + "github.com/duke-git/lancet/v2/fileutil" +) + +func main() { + const mb = 1024 * 1024 + const defaultChunkSizeMB = 100 // 默认值 + + numParsers := runtime.NumCPU() + + linesCh := make(chan []string, numParsers) + + // test1.csv file content: + // Lili,22,female + // Jim,21,male + filePath := "./testdata/test1.csv" + + go fileutil.ParallelChunkRead(filePath, linesCh, defaultChunkSizeMB, numParsers) + + var totalLines int + for lines := range linesCh { + totalLines += len(lines) + + for _, line := range lines { + fmt.Println(line) + } + } + + fmt.Println(totalLines) + + // Output: + // Lili,22,female + // Jim,21,male + // 2 +} +``` \ No newline at end of file diff --git a/fileutil/file.go b/fileutil/file.go index a5631b02..4e08d2be 100644 --- a/fileutil/file.go +++ b/fileutil/file.go @@ -16,7 +16,6 @@ import ( "fmt" "io" "io/fs" - "log" "net/http" "os" "path/filepath" @@ -869,12 +868,13 @@ func isCsvSupportedType(v interface{}) bool { } } -// ChunkRead 从文件的指定偏移读取块并返回块内所有行 -func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string { +// ChunkRead reads a block from the file at the specified offset and returns all lines within the block +// Play: todo +func ChunkRead(file *os.File, offset int64, size int, bufPool *sync.Pool) ([]string, error) { buf := bufPool.Get().([]byte)[:size] // 从Pool获取缓冲区并调整大小 - n, err := f.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区 + n, err := file.ReadAt(buf, offset) // 从指定偏移读取数据到缓冲区 if err != nil && err != io.EOF { - log.Fatal(err) + return nil, err } buf = buf[:n] // 调整切片以匹配实际读取的字节数 @@ -893,58 +893,64 @@ func ChunkRead(f *os.File, offset int64, size int, bufPool *sync.Pool) []string lines = append(lines, line) } bufPool.Put(buf) // 读取完成后,将缓冲区放回Pool - return lines + return lines, nil } -// 并行读取文件并将每个块的行发送到指定通道 +// ParallelChunkRead reads the file in parallel and send each chunk of lines to the specified channel. // filePath 文件路径 -// ChunkSizeMB 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整 -// MaxGoroutine 并发读取分块的数量,设置为0时使用CPU核心数 +// chunkSizeMB 分块的大小(单位MB,设置为0时使用默认100MB),设置过大反而不利,视情调整 +// maxGoroutine 并发读取分块的数量,设置为0时使用CPU核心数 // linesCh用于接收返回结果的通道。 -func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, MaxGoroutine int) { - if ChunkSizeMB == 0 { - ChunkSizeMB = 100 +// Play: todo +func ParallelChunkRead(filePath string, linesCh chan<- []string, chunkSizeMB, maxGoroutine int) error { + if chunkSizeMB == 0 { + chunkSizeMB = 100 } - ChunkSize := ChunkSizeMB * 1024 * 1024 + chunkSize := chunkSizeMB * 1024 * 1024 // 内存复用 bufPool := sync.Pool{ New: func() interface{} { - return make([]byte, 0, ChunkSize) + return make([]byte, 0, chunkSize) }, } - if MaxGoroutine == 0 { - MaxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数 + if maxGoroutine == 0 { + maxGoroutine = runtime.NumCPU() // 设置为0时使用CPU核心数 } f, err := os.Open(filePath) if err != nil { - log.Fatalf("failed to open file: %v", err) + return err } + defer f.Close() info, err := f.Stat() if err != nil { - log.Fatalf("failed to get file info: %v", err) + return err } wg := sync.WaitGroup{} - chunkOffsetCh := make(chan int64, MaxGoroutine) + chunkOffsetCh := make(chan int64, maxGoroutine) // 分配工作 go func() { - for i := int64(0); i < info.Size(); i += int64(ChunkSize) { + for i := int64(0); i < info.Size(); i += int64(chunkSize) { chunkOffsetCh <- i } close(chunkOffsetCh) }() // 启动工作协程 - for i := 0; i < MaxGoroutine; i++ { + for i := 0; i < maxGoroutine; i++ { wg.Add(1) go func() { for chunkOffset := range chunkOffsetCh { - linesCh <- ChunkRead(f, chunkOffset, ChunkSize, &bufPool) + chunk, err := ChunkRead(f, chunkOffset, chunkSize, &bufPool) + if err == nil { + linesCh <- chunk + } + } wg.Done() }() @@ -953,4 +959,6 @@ func ParallelChunkRead(filePath string, linesCh chan<- []string, ChunkSizeMB, Ma // 等待所有解析完成后关闭行通道 wg.Wait() close(linesCh) + + return nil } diff --git a/fileutil/file_example_test.go b/fileutil/file_example_test.go index 9fad0dc4..b0fc026a 100644 --- a/fileutil/file_example_test.go +++ b/fileutil/file_example_test.go @@ -5,6 +5,8 @@ import ( "io" "log" "os" + "runtime" + "sync" ) func ExampleIsExist() { @@ -421,8 +423,69 @@ func ExampleReadFile() { if err != nil { return } + fmt.Println(string(dat)) + // Output: // User-agent: * // Disallow: /deny } + +func ExampleChunkRead() { + const mb = 1024 * 1024 + const defaultChunkSizeMB = 100 + + filePath := "./testdata/test1.csv" + f, err := os.Open(filePath) + if err != nil { + return + } + + defer f.Close() + + var bufPool = sync.Pool{ + New: func() interface{} { + return make([]byte, 0, defaultChunkSizeMB*mb) + }, + } + + lines, err := ChunkRead(f, 0, 100, &bufPool) + if err != nil { + return + } + + fmt.Println(lines[0]) + fmt.Println(lines[1]) + + // Output: + // Lili,22,female + // Jim,21,male +} + +func ExampleParallelChunkRead() { + const mb = 1024 * 1024 + const defaultChunkSizeMB = 100 // 默认值 + + numParsers := runtime.NumCPU() + + linesCh := make(chan []string, numParsers) + filePath := "./testdata/test1.csv" + + go ParallelChunkRead(filePath, linesCh, defaultChunkSizeMB, numParsers) + + var totalLines int + for lines := range linesCh { + totalLines += len(lines) + + for _, line := range lines { + fmt.Println(line) + } + } + + fmt.Println(totalLines) + + // Output: + // Lili,22,female + // Jim,21,male + // 2 +} diff --git a/fileutil/file_test.go b/fileutil/file_test.go index f92396db..77259eab 100644 --- a/fileutil/file_test.go +++ b/fileutil/file_test.go @@ -4,7 +4,9 @@ import ( "io" "os" "path/filepath" + "runtime" "strings" + "sync" "testing" "github.com/duke-git/lancet/v2/internal" @@ -566,3 +568,54 @@ func TestCopyDir(t *testing.T) { os.RemoveAll(dest) } + +func TestParallelChunkRead(t *testing.T) { + assert := internal.NewAssert(t, "TestParallelChunkRead") + + const mb = 1024 * 1024 + const defaultChunkSizeMB = 100 // 默认值 + + numParsers := runtime.NumCPU() + + linesCh := make(chan []string, numParsers) + filePath := "./testdata/test1.csv" // 替换为你的文件路径 + + go ParallelChunkRead(filePath, linesCh, defaultChunkSizeMB, numParsers) + + var totalLines int + for lines := range linesCh { + totalLines += len(lines) + + assert.Equal("Lili,22,female", lines[0]) + assert.Equal("Jim,21,male", lines[1]) + } + + assert.Equal(2, totalLines) +} + +func TestChunkRead(t *testing.T) { + assert := internal.NewAssert(t, "TestChunkRead") + + const mb = 1024 * 1024 + const defaultChunkSizeMB = 100 // 默认值 + + filePath := "./testdata/test1.csv" // 替换为你的文件路径 + f, err := os.Open(filePath) + if err != nil { + return + } + + defer f.Close() + + var bufPool = sync.Pool{ + New: func() interface{} { + return make([]byte, 0, defaultChunkSizeMB*mb) + }, + } + + lines, err := ChunkRead(f, 0, 100, &bufPool) + + assert.Equal("Lili,22,female", lines[0]) + assert.Equal("Jim,21,male", lines[1]) + +} diff --git a/fileutil/testdata/test1.csv b/fileutil/testdata/test1.csv index 6bbd640e..c8961e72 100644 --- a/fileutil/testdata/test1.csv +++ b/fileutil/testdata/test1.csv @@ -1,5 +1,2 @@ Lili,22,female Jim,21,male - - -