From 42512d9af3e46d5b55d29d5d96994bbddf8e0d44 Mon Sep 17 00:00:00 2001 From: mfbonfigli Date: Thu, 21 Nov 2024 22:22:35 +0000 Subject: [PATCH] Fix race condition in combined LAS reader Combined LAS reader used to be called from a single goroutine. From V2.0.0 final this is no longer the case, however the GetNext() function was not made thread safe. This change fixes the race condition via CAS operations and also solves an infinite deadlock due to a lack of lock release on error --- README.md | 3 +++ cmd/main.go | 2 +- internal/las/golas/reader.go | 1 + internal/las/reader.go | 28 +++++++++---------- internal/las/reader_test.go | 52 ++++++++++++++++++++++++++++++++++++ 5 files changed, 71 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 8aa445b..0b7db4c 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,9 @@ You can preview a couple of tilesets generated from this tool at this [website]( ## Changelog +##### Version 2.0.1 +* Resolved a bug that prevented the correct functioning of the join flag when processing multiple LAS files from a folder. + ##### Version 2.0.0 * Most of the code has been rewritten from the ground up, achieving much faster tiling with lower memory usage * Uses Proj v9.5.0: all projections supported by the Proj library are automatically supported by gocesiumtiler. diff --git a/cmd/main.go b/cmd/main.go index 47ffd84..f30edb5 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -23,7 +23,7 @@ var tilerProvider func() (tiler.Tiler, error) = func() (tiler.Tiler, error) { return tiler.NewGoCesiumTiler() } -var cmdVersion = "2.0.0" +var cmdVersion = "2.0.1" // GitCommit is injected dynamically at build time via `go build -ldflags "-X main.GitCommit=XYZ"` var GitCommit string = "(na)" diff --git a/internal/las/golas/reader.go b/internal/las/golas/reader.go index eaab80f..111b88a 100644 --- a/internal/las/golas/reader.go +++ b/internal/las/golas/reader.go @@ -219,6 +219,7 @@ func (g *Las) Next() (Point, error) { // a Seek operation right before returning the struct g.Lock() if g.current >= g.NumberOfPoints() { + g.Unlock() return p, io.EOF } data := make([]byte, g.Header.PointDataRecordLength) diff --git a/internal/las/reader.go b/internal/las/reader.go index cba8ea4..0640233 100644 --- a/internal/las/reader.go +++ b/internal/las/reader.go @@ -2,7 +2,9 @@ package las import ( "fmt" + "io" "os" + "sync/atomic" "github.com/mfbonfigli/gocesiumtiler/v2/internal/geom" "github.com/mfbonfigli/gocesiumtiler/v2/internal/las/golas" @@ -24,8 +26,7 @@ type LasReader interface { // CombinedFileLasReader enables reading a a list of LAS files as if they were a single one // the files MUST have the same properties (SRID, etc) type CombinedFileLasReader struct { - currentReader int - currentCount int + currentReader atomic.Int32 readers []LasReader numPts int crs string @@ -64,20 +65,19 @@ func (m *CombinedFileLasReader) GetCRS() string { } func (m *CombinedFileLasReader) GetNext() (geom.Point64, error) { - if m.currentReader >= len(m.readers) { - return geom.Point64{}, fmt.Errorf("no points to read") - } - r := m.readers[m.currentReader] - if m.currentCount == r.NumberOfPoints() { - m.currentReader++ - m.currentCount = 0 - if m.currentReader >= len(m.readers) { - return geom.Point64{}, fmt.Errorf("no points to read") + for { + currReader := int(m.currentReader.Load()) + if currReader >= len(m.readers) { + return geom.Point64{}, io.EOF + } + pt, err := m.readers[currReader].GetNext() + if err != nil { + // try to move on to the next reader + m.currentReader.CompareAndSwap(int32(currReader), int32(currReader)+1) + continue } - r = m.readers[m.currentReader] + return pt, nil } - m.currentCount++ - return r.GetNext() } func (m *CombinedFileLasReader) Close() { diff --git a/internal/las/reader_test.go b/internal/las/reader_test.go index ce5a58d..7b03e3f 100644 --- a/internal/las/reader_test.go +++ b/internal/las/reader_test.go @@ -3,6 +3,7 @@ package las import ( "fmt" "os" + "sync" "testing" ) @@ -42,3 +43,54 @@ func TestCombinedReader(t *testing.T) { t.Errorf("expected error, got none") } } + +func TestCombinedReaderConcurrency(t *testing.T) { + entries, err := os.ReadDir("./testdata") + if err != nil { + t.Fatal(err) + } + + files := []string{} + for _, e := range entries { + filename := e.Name() + files = append(files, fmt.Sprintf("./testdata/%s", filename)) + } + + r, err := NewCombinedFileLasReader(files, "EPSG:32633", false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if actual := r.NumberOfPoints(); actual != 10*len(files) { + t.Errorf("expected %d points got %d", 10*len(files), actual) + } + + if actual := r.GetCRS(); actual != "EPSG:32633" { + t.Errorf("expected epsg %d got epsg %s", 32633, actual) + } + + e := make(chan error, 10) + readFun := func(wg *sync.WaitGroup) { + defer wg.Done() + read := 0 + for i := 0; i < r.NumberOfPoints()/5; i++ { + _, err := r.GetNext() + if err != nil { + e <- err + t.Errorf("unexpected error %v", err) + continue + } + read++ + } + fmt.Println(read) + } + wg := &sync.WaitGroup{} + for i := 0; i < 5; i++ { + wg.Add(1) + go readFun(wg) + } + wg.Wait() + if len(e) > 0 { + t.Errorf("errors detected in the error channel but none expected") + } +}