Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s2: Add example for indexing and existing stream #723

Merged
merged 1 commit into from
Jan 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion s2/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,11 @@ func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
// Seek allows seeking in compressed data.
func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
if r.err != nil {
return 0, r.err
if !errors.Is(r.err, io.EOF) {
return 0, r.err
}
// Reset on EOF
r.err = nil
}
if offset == 0 && whence == io.SeekCurrent {
return r.blockStart + int64(r.i), nil
Expand Down
174 changes: 174 additions & 0 deletions s2/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"math/rand"
"os"
"sync"
"testing"

Expand Down Expand Up @@ -234,3 +235,176 @@ func TestSeeking(t *testing.T) {
})
}
}

// ExampleIndexStream shows an example of indexing a stream
// and indexing it after it has been written.
// The index can either be appended.
func ExampleIndexStream() {
fatalErr := func(err error) {
if err != nil {
panic(err)
}
}

// Create a test stream without index
var streamName = ""
tmp := make([]byte, 5<<20)
{
rng := rand.New(rand.NewSource(0xbeefcafe))
rng.Read(tmp)
// Make it compressible...
for i, v := range tmp {
tmp[i] = '0' + v&3
}
// Compress it...
output, err := os.CreateTemp("", "IndexStream")
streamName = output.Name()
fatalErr(err)

// We use smaller blocks just for the example...
enc := s2.NewWriter(output, s2.WriterSnappyCompat())
err = enc.EncodeBuffer(tmp)
fatalErr(err)

// Close and get index...
err = enc.Close()
fatalErr(err)
err = output.Close()
fatalErr(err)
}

// Open our compressed stream without an index...
stream, err := os.Open(streamName)
fatalErr(err)
defer stream.Close()

var indexInput = io.Reader(stream)
var indexOutput io.Writer
var indexedName string

// Should index be combined with stream by appending?
// This could also be done by appending to an os.File
// If not it will be written to a separate file.
const combineOutput = false

// Function to easier use defer.
func() {
if combineOutput {
output, err := os.CreateTemp("", "IndexStream-Combined")
fatalErr(err)
defer func() {
fatalErr(output.Close())
if false {
fi, err := os.Stat(output.Name())
fatalErr(err)
fmt.Println("Combined:", fi.Size(), "bytes")
} else {
fmt.Println("Index saved")
}
}()

// Everything read from stream will also be written to output.
indexedName = output.Name()
indexInput = io.TeeReader(stream, output)
indexOutput = output
} else {
output, err := os.CreateTemp("", "IndexStream-Index")
fatalErr(err)
defer func() {
fatalErr(output.Close())
fi, err := os.Stat(output.Name())
fatalErr(err)
if false {
fmt.Println("Index:", fi.Size(), "bytes")
} else {
fmt.Println("Index saved")
}
}()
indexedName = output.Name()
indexOutput = output
}

// Index the input
idx, err := s2.IndexStream(indexInput)
fatalErr(err)

// Write the index
_, err = indexOutput.Write(idx)
fatalErr(err)
}()

if combineOutput {
// Read from combined stream only.
stream, err := os.Open(indexedName)
fatalErr(err)
defer stream.Close()
// Create a reader with the input.
// We assert that the stream is an io.ReadSeeker.
r := s2.NewReader(io.ReadSeeker(stream))

// Request a ReadSeeker with random access.
// This will load the index from the stream.
rs, err := r.ReadSeeker(true, nil)
fatalErr(err)

_, err = rs.Seek(-10, io.SeekEnd)
fatalErr(err)

b, err := io.ReadAll(rs)
fatalErr(err)
if want := tmp[len(tmp)-10:]; !bytes.Equal(b, want) {
fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
}
fmt.Println("last 10 bytes read")

_, err = rs.Seek(10, io.SeekStart)
fatalErr(err)
_, err = io.ReadFull(rs, b)
fatalErr(err)
if want := tmp[10:20]; !bytes.Equal(b, want) {
fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
}
fmt.Println("10 bytes at offset 10 read")
} else {
// Read from separate stream and index.
stream, err := os.Open(streamName)
fatalErr(err)
defer stream.Close()
// Create a reader with the input.
// We assert that the stream is an io.ReadSeeker.
r := s2.NewReader(io.ReadSeeker(stream))

// Read the separate index.
index, err := os.ReadFile(indexedName)
fatalErr(err)

// Request a ReadSeeker with random access.
// The provided index will be used.
rs, err := r.ReadSeeker(true, index)
fatalErr(err)

_, err = rs.Seek(-10, io.SeekEnd)
fatalErr(err)

b, err := io.ReadAll(rs)
fatalErr(err)
if want := tmp[len(tmp)-10:]; !bytes.Equal(b, want) {
fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
}
fmt.Println("last 10 bytes read")

_, err = rs.Seek(10, io.SeekStart)
fatalErr(err)
_, err = io.ReadFull(rs, b)
fatalErr(err)
if want := tmp[10:20]; !bytes.Equal(b, want) {
fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
}
fmt.Println("10 bytes at offset 10 read")
}

// OUTPUT:
// Index saved
// last 10 bytes read
// 10 bytes at offset 10 read
}