Skip to content

Commit

Permalink
Fix race condition in combined LAS reader
Browse files Browse the repository at this point in the history
Combined LAS reader used to be called from a single goroutine. From
V2.0.0 final this is no longer the case, however the GetNext() function
was not made thread safe. This change fixes the race condition via CAS
operations and also solves an infinite deadlock due to a lack of lock
release on error
  • Loading branch information
mfbonfigli committed Nov 21, 2024
1 parent 349f31c commit 42512d9
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 15 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ You can preview a couple of tilesets generated from this tool at this [website](


## Changelog
##### Version 2.0.1
* Resolved a bug that prevented the correct functioning of the join flag when processing multiple LAS files from a folder.

##### Version 2.0.0
* Most of the code has been rewritten from the ground up, achieving much faster tiling with lower memory usage
* Uses Proj v9.5.0: all projections supported by the Proj library are automatically supported by gocesiumtiler.
Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var tilerProvider func() (tiler.Tiler, error) = func() (tiler.Tiler, error) {
return tiler.NewGoCesiumTiler()
}

var cmdVersion = "2.0.0"
var cmdVersion = "2.0.1"

// GitCommit is injected dynamically at build time via `go build -ldflags "-X main.GitCommit=XYZ"`
var GitCommit string = "(na)"
Expand Down
1 change: 1 addition & 0 deletions internal/las/golas/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (g *Las) Next() (Point, error) {
// a Seek operation right before returning the struct
g.Lock()
if g.current >= g.NumberOfPoints() {
g.Unlock()
return p, io.EOF
}
data := make([]byte, g.Header.PointDataRecordLength)
Expand Down
28 changes: 14 additions & 14 deletions internal/las/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package las

import (
"fmt"
"io"
"os"
"sync/atomic"

"github.com/mfbonfigli/gocesiumtiler/v2/internal/geom"
"github.com/mfbonfigli/gocesiumtiler/v2/internal/las/golas"
Expand All @@ -24,8 +26,7 @@ type LasReader interface {
// CombinedFileLasReader enables reading a a list of LAS files as if they were a single one
// the files MUST have the same properties (SRID, etc)
type CombinedFileLasReader struct {
currentReader int
currentCount int
currentReader atomic.Int32
readers []LasReader
numPts int
crs string
Expand Down Expand Up @@ -64,20 +65,19 @@ func (m *CombinedFileLasReader) GetCRS() string {
}

func (m *CombinedFileLasReader) GetNext() (geom.Point64, error) {
if m.currentReader >= len(m.readers) {
return geom.Point64{}, fmt.Errorf("no points to read")
}
r := m.readers[m.currentReader]
if m.currentCount == r.NumberOfPoints() {
m.currentReader++
m.currentCount = 0
if m.currentReader >= len(m.readers) {
return geom.Point64{}, fmt.Errorf("no points to read")
for {
currReader := int(m.currentReader.Load())
if currReader >= len(m.readers) {
return geom.Point64{}, io.EOF
}
pt, err := m.readers[currReader].GetNext()
if err != nil {
// try to move on to the next reader
m.currentReader.CompareAndSwap(int32(currReader), int32(currReader)+1)
continue
}
r = m.readers[m.currentReader]
return pt, nil
}
m.currentCount++
return r.GetNext()
}

func (m *CombinedFileLasReader) Close() {
Expand Down
52 changes: 52 additions & 0 deletions internal/las/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package las
import (
"fmt"
"os"
"sync"
"testing"
)

Expand Down Expand Up @@ -42,3 +43,54 @@ func TestCombinedReader(t *testing.T) {
t.Errorf("expected error, got none")
}
}

func TestCombinedReaderConcurrency(t *testing.T) {
entries, err := os.ReadDir("./testdata")
if err != nil {
t.Fatal(err)
}

files := []string{}
for _, e := range entries {
filename := e.Name()
files = append(files, fmt.Sprintf("./testdata/%s", filename))
}

r, err := NewCombinedFileLasReader(files, "EPSG:32633", false)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if actual := r.NumberOfPoints(); actual != 10*len(files) {
t.Errorf("expected %d points got %d", 10*len(files), actual)
}

if actual := r.GetCRS(); actual != "EPSG:32633" {
t.Errorf("expected epsg %d got epsg %s", 32633, actual)
}

e := make(chan error, 10)
readFun := func(wg *sync.WaitGroup) {
defer wg.Done()
read := 0
for i := 0; i < r.NumberOfPoints()/5; i++ {
_, err := r.GetNext()
if err != nil {
e <- err
t.Errorf("unexpected error %v", err)
continue
}
read++
}
fmt.Println(read)
}
wg := &sync.WaitGroup{}
for i := 0; i < 5; i++ {
wg.Add(1)
go readFun(wg)
}
wg.Wait()
if len(e) > 0 {
t.Errorf("errors detected in the error channel but none expected")
}
}

0 comments on commit 42512d9

Please sign in to comment.