-
Notifications
You must be signed in to change notification settings - Fork 0
/
encoding.go
128 lines (102 loc) · 2.79 KB
/
encoding.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package dshards
import (
"bytes"
"fmt"
"github.com/cjslep/syrup"
)
const (
kManifest = "manifest"
kRaw = "raw"
)
// chunker are Manifest and Raw files that know how to chunk their contents into
// multiple (if necessary) datashards-compatible byte streams.
type chunker interface {
Chunk() ([][]byte, error)
}
type manifest struct {
urns []URN
}
func (m manifest) Chunk() ([][]byte, error) {
var content []byte
for _, urn := range m.urns {
content = append(content, []byte(urn.String())...)
}
// "manifest", <chunk-size>, <file-size>
return chunk([]interface{}{kManifest, constChunkSize, len(content)}, content)
}
type raw struct {
content []byte
}
func (r raw) Chunk() ([][]byte, error) {
return chunk([]interface{}{kRaw}, r.content)
}
// Constant Chunking -- in use
const (
constChunkSize = 32 * 1024 // 32 Kibibytes
)
func chunk(eachChunk []interface{}, content []byte) (o [][]byte, err error) {
var buf bytes.Buffer
// 1. Determine byte overhead.
err = syrup.NewEncoder(syrup.NewPrototypeEncoding(), &buf).Encode(eachChunk)
if err != nil {
return
}
overhead := buf.Len()
buf.Reset()
// 2. Chunk (if needed)
for len(content) > constChunkSize-overhead {
toChunk := content[:constChunkSize-overhead]
content = content[constChunkSize-overhead:]
v := make([]interface{}, len(eachChunk)+1)
copy(v, eachChunk)
v[len(v)-1] = toChunk
err = syrup.NewEncoder(syrup.NewPrototypeEncoding(), &buf).Encode(v)
if err != nil {
return
}
if buf.Len() != constChunkSize {
err = fmt.Errorf("dshards chunking encoded %d of %d bytes", buf.Len(), constChunkSize)
return
}
res := make([]byte, constChunkSize)
copy(res, buf.Bytes())
o = append(o, res)
buf.Reset()
}
// 3. Final chunk (<= constChunkSize-overhead), pad if needed
v := make([]interface{}, len(eachChunk)+1)
copy(v, eachChunk)
v[len(v)-1] = content
err = syrup.NewEncoder(syrup.NewPrototypeEncoding(), &buf).Encode(v)
if err != nil {
return
}
res := make([]byte, constChunkSize)
copy(res, buf.Bytes())
o = append(o, res)
buf.Reset()
return
}
// Variadic Chunking -- not yet used
var allowedChunkSizesIncreasingOrder = []int{
1 * 1024, // 1 Kibibyte
2 * 1024, // 2 Kibibytes
4 * 1024, // 4 Kibibytes
8 * 1024, // 8 Kibibytes
16 * 1024, // 16 Kibibytes
32 * 1024, // 32 Kibibytes
}
func exceedsLargestChunkSize(lenc int) bool {
return lenc > allowedChunkSizesIncreasingOrder[len(allowedChunkSizesIncreasingOrder)-1]
}
// chunker takes a given content length and returns the next chunk size to use.
//
// Intended to be called iteratively, as the remaining content length shrinks.
func chunkerFn(lenc int) int {
for _, size := range allowedChunkSizesIncreasingOrder {
if lenc <= size {
return size
}
}
return allowedChunkSizesIncreasingOrder[len(allowedChunkSizesIncreasingOrder)-1]
}