-
Notifications
You must be signed in to change notification settings - Fork 3
/
package_test.go
142 lines (119 loc) · 2.43 KB
/
package_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package store
import (
"io"
"os"
"sync"
"time"
ring "github.com/gholt/devicering"
"github.com/gholt/msgring"
)
type memBuf struct {
buf []byte
}
type memFile struct {
buf *memBuf
pos int64
}
func (f *memFile) Read(p []byte) (int, error) {
n := copy(p, f.buf.buf[f.pos:])
if n == 0 {
return 0, io.EOF
}
f.pos += int64(n)
return n, nil
}
func (f *memFile) Seek(offset int64, whence int) (int64, error) {
switch whence {
case 0:
f.pos = offset
case 1:
f.pos += offset
case 2:
f.pos = int64(len(f.buf.buf)) + offset
}
return f.pos, nil
}
func (f *memFile) Write(p []byte) (int, error) {
pl := int64(len(p))
if int64(len(f.buf.buf))-f.pos < pl {
buf := make([]byte, int64(f.pos+pl))
copy(buf, f.buf.buf)
copy(buf[f.pos:], p)
f.buf.buf = buf
f.pos += pl
return int(pl), nil
}
copy(f.buf.buf[f.pos:], p)
f.pos += pl
return int(pl), nil
}
func (f *memFile) Close() error {
return nil
}
type msgRingPlaceholder struct {
ring ring.Ring
lock sync.Mutex
msgToNodeIDs []uint64
msgToPartitions []uint32
}
func (m *msgRingPlaceholder) Ring() ring.Ring {
return m.ring
}
func (m *msgRingPlaceholder) MaxMsgLength() uint64 {
return 65536
}
func (m *msgRingPlaceholder) SetMsgHandler(msgType uint64, handler msgring.MsgUnmarshaller) {
}
func (m *msgRingPlaceholder) MsgToNode(msg msgring.Msg, nodeID uint64, timeout time.Duration) {
m.lock.Lock()
m.msgToNodeIDs = append(m.msgToNodeIDs, nodeID)
m.lock.Unlock()
msg.Free(0, 0)
}
func (m *msgRingPlaceholder) MsgToOtherReplicas(msg msgring.Msg, partition uint32, timeout time.Duration) {
m.lock.Lock()
m.msgToPartitions = append(m.msgToPartitions, partition)
m.lock.Unlock()
msg.Free(0, 0)
}
type testErrorWriter struct {
goodBytes int
}
func (w *testErrorWriter) Write(p []byte) (int, error) {
if w.goodBytes >= len(p) {
w.goodBytes -= len(p)
return len(p), nil
}
if w.goodBytes > 0 {
n := w.goodBytes
w.goodBytes = 0
return n, io.EOF
}
return 0, io.EOF
}
type memFileInfo struct {
name string
size int64
mode os.FileMode
modTime time.Time
isDir bool
sys interface{}
}
func (m *memFileInfo) Name() string {
return m.name
}
func (m *memFileInfo) Size() int64 {
return m.size
}
func (m *memFileInfo) Mode() os.FileMode {
return m.mode
}
func (m *memFileInfo) ModTime() time.Time {
return m.modTime
}
func (m *memFileInfo) IsDir() bool {
return m.isDir
}
func (m *memFileInfo) Sys() interface{} {
return m.sys
}