-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simple persistence #9
Changes from all commits
1775e4a
7892810
00aa850
4482520
cd20a93
bce0ed2
3436138
0b928b7
e3da92d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
{ | ||
"subject": "4a0add80-4dac-4f87-9ba7-d062f6f0ca09", | ||
"extra": {}, | ||
"header": {}, | ||
"match_context": { | ||
"regex_capture_groups": [ | ||
"name-of-the-best-service-in-the-world", | ||
"service-args" | ||
] | ||
}, | ||
"url": { | ||
"Scheme": "https", | ||
"Host": "bs.api.example.com", | ||
"Path": "/bs" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package logstore | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
) | ||
|
||
var ( | ||
// helper variables to implement the packages .IsXYError() functions | ||
notFoundErr *NotFoundError | ||
badRequestErr *BadRequestError | ||
) | ||
|
||
// NotFoundError indicates that no value for a key is stored in the db | ||
type NotFoundError struct { | ||
key string | ||
} | ||
|
||
func NewNotFoundError(key string) error { | ||
return &NotFoundError{key} | ||
} | ||
|
||
func (e *NotFoundError) Error() string { | ||
return fmt.Sprintf("no value for key: %v", e.key) | ||
} | ||
|
||
func IsNotFoundError(err error) bool { | ||
return errors.As(err, ¬FoundErr) | ||
} | ||
|
||
// BadRequestError indicates that the value provided was not writable to the database | ||
type BadRequestError struct { | ||
reason string | ||
} | ||
|
||
func NewBadRequestError(reason string) error { | ||
return &BadRequestError{reason} | ||
} | ||
|
||
func (b *BadRequestError) Error() string { | ||
return fmt.Sprintf("invalid write request (reason: %v)", b.reason) | ||
} | ||
|
||
func IsBadRequestError(err error) bool { | ||
return errors.As(err, &badRequestErr) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
package logstore | ||
|
||
import ( | ||
"bytes" | ||
"encoding/binary" | ||
"errors" | ||
"hash/crc32" | ||
"io" | ||
"math" | ||
) | ||
|
||
const ( | ||
kindValue = iota | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to have the default value be a special case like |
||
kindTombstone | ||
) | ||
|
||
const ( | ||
MaxKeyLen = math.MaxUint32 | ||
MaxValLen = math.MaxUint32 | ||
) | ||
|
||
// ErrInsufficientData is returned when the given data is not enouch to be | ||
// parsed into a Record | ||
var ErrInsufficientData = errors.New("insufficient bytes to parse a record") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this an error that would ever be exposed to the caller? |
||
|
||
// ErrCorruptData is returned when the crc checksum is not matching the provided serialized data | ||
var ErrCorruptData = errors.New("crc checksum doesnt match the provided record data") | ||
|
||
func NewRecord(key string, value []byte) *Record { | ||
return &Record{ | ||
kind: kindValue, | ||
key: key, | ||
value: value, | ||
} | ||
} | ||
|
||
func NewTombstone(key string) *Record { | ||
return &Record{ | ||
kind: kindTombstone, | ||
key: key, | ||
value: NoValue, | ||
} | ||
} | ||
|
||
// Record represents a database record | ||
// | ||
// A Record will be serialized to a sequence of bytes in the following format | ||
// [checksum]: 4 bytes | ||
// [type]: 1 byte | ||
// [keyLen]: 4 bytes | ||
// [valLen]: 4 bytes | ||
// [key]: >= 1 bytes | ||
// [value]: valLen | ||
type Record struct { | ||
kind byte | ||
key string | ||
value []byte | ||
} | ||
|
||
const ( | ||
checksumSize = 4 | ||
kindSize = 1 | ||
keyLenSize = 4 | ||
valLenSize = 4 | ||
headerLength = checksumSize + kindSize + keyLenSize + valLenSize | ||
|
||
checksumOffset = 0 | ||
kindOffset = checksumSize | ||
keyLenOffset = kindOffset + kindSize | ||
keyValOffset = keyLenOffset + keyLenSize | ||
keyOffset = keyValOffset + valLenSize | ||
) | ||
|
||
var NoValue = noValue{} | ||
|
||
type noValue = []byte | ||
|
||
func (r *Record) Key() string { | ||
return r.key | ||
} | ||
|
||
func (r *Record) Value() []byte { | ||
return r.value | ||
} | ||
|
||
func (r *Record) IsTombstone() bool { | ||
return r.kind == kindTombstone | ||
} | ||
|
||
func (r *Record) Size() int { | ||
return headerLength + len(r.key) + len(r.value) | ||
} | ||
|
||
// Serialize serializes a record into the specified binary format | ||
func (r *Record) Serialize() []byte { | ||
keyBytes := []byte(r.key) | ||
keyLength := uint32(len(r.key)) | ||
valLength := uint32(len(r.value)) | ||
|
||
recordLength := headerLength + keyLength + valLength | ||
recordBuffer := allocateBufferOf(recordLength) | ||
|
||
buf := make([]byte, 4) | ||
|
||
// Write 4 empty bytes as placeholder for CRC to buffer | ||
recordBuffer.Write(buf) | ||
|
||
// Write header | ||
recordBuffer.WriteByte(r.kind) | ||
|
||
binary.BigEndian.PutUint32(buf, keyLength) | ||
recordBuffer.Write(buf) | ||
|
||
binary.BigEndian.PutUint32(buf, valLength) | ||
recordBuffer.Write(buf) | ||
|
||
// Append Key and Value data | ||
recordBuffer.Write(keyBytes) | ||
recordBuffer.Write(r.value) | ||
|
||
serializedRecord := recordBuffer.Bytes() | ||
|
||
// Calculate checksum | ||
crc := crc32.NewIEEE() | ||
crc.Write(serializedRecord[4:]) | ||
binary.BigEndian.PutUint32(serializedRecord, crc.Sum32()) | ||
|
||
return serializedRecord | ||
} | ||
|
||
func Deserialize(data []byte) (*Record, error) { | ||
if len(data) < headerLength { | ||
return nil, ErrInsufficientData | ||
} | ||
|
||
readBuf := bytes.NewBuffer(data) | ||
|
||
checksum := uint32(binary.BigEndian.Uint32(readBuf.Next(checksumSize))) | ||
kind, _ := readBuf.ReadByte() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unchecked error |
||
keyLength := uint32(binary.BigEndian.Uint32(readBuf.Next(keyLenSize))) | ||
valLength := uint32(binary.BigEndian.Uint32(readBuf.Next(valLenSize))) | ||
|
||
if uint32(len(data)) < headerLength+keyLength+valLength { | ||
return nil, ErrInsufficientData | ||
} | ||
key := make([]byte, keyLength) | ||
val := make([]byte, valLength) | ||
|
||
copy(key, readBuf.Next(int(keyLength))) | ||
copy(val, readBuf.Next(int(valLength))) | ||
|
||
check := crc32.NewIEEE() | ||
check.Write(data[kindOffset : headerLength+keyLength+valLength]) | ||
if check.Sum32() != checksum { | ||
return nil, ErrCorruptData | ||
} | ||
|
||
return &Record{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why return a pointer here? |
||
kind: kind, | ||
key: string(key), | ||
value: val, | ||
}, nil | ||
} | ||
|
||
func allocateBufferOf(len uint32) *bytes.Buffer { | ||
recordBuffer := new(bytes.Buffer) | ||
|
||
if len > math.MaxInt32 { | ||
recordBuffer.Grow(math.MaxInt32) | ||
len -= math.MaxInt32 | ||
} | ||
recordBuffer.Grow(int(len)) | ||
|
||
return recordBuffer | ||
} | ||
|
||
func (r *Record) Write(w io.Writer) (int, error) { | ||
return w.Write(r.Serialize()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package logstore | ||
|
||
import ( | ||
"encoding/base64" | ||
"log" | ||
"testing" | ||
) | ||
|
||
// Valid base64 encoded record with key='valid_key' and value='valid_value' | ||
const validRecordBase64 = "mFgwagAAAAAJAAAAC3ZhbGlkX2tleXZhbGlkX3ZhbHVl" | ||
|
||
func TestDeserializeRecord(t *testing.T) { | ||
recordBytes, err := base64.RawStdEncoding.DecodeString(validRecordBase64) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a testing table with a few more tests? This storage is somewhat critical and given its complexity the testing should be more rigorous. |
||
if err != nil { | ||
t.Fatalf("test setup invalid. Failed base64 decoding record string: %v", err) | ||
} | ||
|
||
record, err := Deserialize(recordBytes) | ||
if err != nil { | ||
t.Fatal("failed deserializing valid record: ", err) | ||
} | ||
|
||
t.Logf("Decoded record: %v", record) | ||
|
||
if record.kind != kindValue { | ||
t.Errorf("Unexpected record kind: %v (expected: %v)", record.kind, kindValue) | ||
} | ||
|
||
if record.key != "valid_key" { | ||
t.Errorf("Unexpected record key: %v (expected: %v)", record.key, "valid_key") | ||
} | ||
|
||
if string(record.value) != "valid_value" { | ||
t.Errorf("Unexpected record value: %v (expected: %v)", string(record.value), "valid_value") | ||
} | ||
} | ||
|
||
func TestSerializeRecord(t *testing.T) { | ||
record := &Record{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here |
||
kind: kindValue, | ||
key: "valid_key", | ||
value: []byte("valid_value"), | ||
} | ||
|
||
serialized := record.Serialize() | ||
b64 := base64.RawStdEncoding.EncodeToString(serialized) | ||
|
||
if b64 != validRecordBase64 { | ||
log.Fatalf("Serialized record does not match expectation: %v (expected: %v)", b64, validRecordBase64) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package logstore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No tests for the scanner? :( |
||
|
||
import ( | ||
"bufio" | ||
"errors" | ||
"io" | ||
) | ||
|
||
// Scanner is a custom bufio.Scanner to read the database file and create records from it | ||
type Scanner struct { | ||
*bufio.Scanner | ||
} | ||
|
||
func NewScanner(r io.Reader, maxTokenSize int) (*Scanner, error) { | ||
scanner := bufio.NewScanner(r) | ||
buf := make([]byte, 4096) | ||
scanner.Buffer(buf, maxTokenSize+headerLength) | ||
scanner.Split(split) | ||
return &Scanner{scanner}, nil | ||
} | ||
|
||
// split implements the SplitFunc interface for the custom scanner | ||
func split(data []byte, atEOF bool) (advance int, token []byte, err error) { | ||
if atEOF && len(data) == 0 { | ||
return 0, nil, nil | ||
} | ||
|
||
record, err := Deserialize(data) | ||
if err != nil { | ||
if errors.Is(err, ErrInsufficientData) { | ||
// The scanner read not enough bytes, to parse a whole record. Let's continue | ||
return 0, nil, nil | ||
} | ||
|
||
return 0, nil, err | ||
} | ||
|
||
advance = record.Size() | ||
token = data[:advance] | ||
return | ||
} | ||
|
||
func (s *Scanner) Record() *Record { | ||
r, _ := Deserialize(s.Bytes()) | ||
return r | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why make it this complex? Wouldn't it be enough to only expose something like
and let the caller identify the error using
errors.As
anderrors.Is
.