Skip to content

Commit

Permalink
Change the store interface to remove variadic args (micro#1095)
Browse files Browse the repository at this point in the history
  • Loading branch information
asim authored Jan 8, 2020
1 parent 78aed5b commit a90a74c
Show file tree
Hide file tree
Showing 13 changed files with 352 additions and 374 deletions.
130 changes: 84 additions & 46 deletions store/cloudflare/cloudflare.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,19 @@ func (w *workersKV) Init(opts ...store.Option) error {
return nil
}

// In the cloudflare workers KV implemention, List() doesn't guarantee
// anything as the workers API is eventually consistent.
func (w *workersKV) List() ([]*store.Record, error) {
func (w *workersKV) list(prefix string) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/keys", w.account, w.namespace)

response, _, _, err := w.request(ctx, http.MethodGet, path, nil, make(http.Header))
body := make(map[string]string)

if len(prefix) > 0 {
body["prefix"] = prefix
}

response, _, _, err := w.request(ctx, http.MethodGet, path, body, make(http.Header))
if err != nil {
return nil, err
}
Expand All @@ -138,13 +142,51 @@ func (w *workersKV) List() ([]*store.Record, error) {
keys = append(keys, r.Name)
}

return w.Read(keys...)
return keys, nil
}

func (w *workersKV) Read(keys ...string) ([]*store.Record, error) {
// In the cloudflare workers KV implemention, List() doesn't guarantee
// anything as the workers API is eventually consistent.
func (w *workersKV) List() ([]*store.Record, error) {
keys, err := w.list("")
if err != nil {
return nil, err
}

var gerr error
var records []*store.Record

for _, key := range keys {
r, err := w.Read(key)
if err != nil {
gerr = err
continue
}
records = append(records, r...)
}

return records, gerr
}

func (w *workersKV) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

var options store.ReadOptions
for _, o := range opts {
o(&options)
}

keys := []string{key}

if options.Prefix {
k, err := w.list(key)
if err != nil {
return nil, err
}
keys = k
}

//nolint:prealloc
var records []*store.Record

Expand Down Expand Up @@ -174,65 +216,61 @@ func (w *workersKV) Read(keys ...string) ([]*store.Record, error) {
return records, nil
}

func (w *workersKV) Write(records ...*store.Record) error {
func (w *workersKV) Write(r *store.Record) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

for _, r := range records {
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(r.Key))
if r.Expiry != 0 {
// Minimum cloudflare TTL is 60 Seconds
exp := int(math.Max(60, math.Round(r.Expiry.Seconds())))
path = path + "?expiration_ttl=" + strconv.Itoa(exp)
}
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(r.Key))
if r.Expiry != 0 {
// Minimum cloudflare TTL is 60 Seconds
exp := int(math.Max(60, math.Round(r.Expiry.Seconds())))
path = path + "?expiration_ttl=" + strconv.Itoa(exp)
}

headers := make(http.Header)
headers := make(http.Header)

resp, _, _, err := w.request(ctx, http.MethodPut, path, r.Value, headers)
if err != nil {
return err
}
resp, _, _, err := w.request(ctx, http.MethodPut, path, r.Value, headers)
if err != nil {
return err
}

a := &apiResponse{}
if err := json.Unmarshal(resp, a); err != nil {
return err
}
a := &apiResponse{}
if err := json.Unmarshal(resp, a); err != nil {
return err
}

if !a.Success {
messages := ""
for _, m := range a.Errors {
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
}
return errors.New(messages)
if !a.Success {
messages := ""
for _, m := range a.Errors {
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
}
return errors.New(messages)
}

return nil
}

func (w *workersKV) Delete(keys ...string) error {
func (w *workersKV) Delete(key string) error {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

for _, k := range keys {
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(k))
resp, _, _, err := w.request(ctx, http.MethodDelete, path, nil, make(http.Header))
if err != nil {
return err
}
path := fmt.Sprintf("accounts/%s/storage/kv/namespaces/%s/values/%s", w.account, w.namespace, url.PathEscape(key))
resp, _, _, err := w.request(ctx, http.MethodDelete, path, nil, make(http.Header))
if err != nil {
return err
}

a := &apiResponse{}
if err := json.Unmarshal(resp, a); err != nil {
return err
}
a := &apiResponse{}
if err := json.Unmarshal(resp, a); err != nil {
return err
}

if !a.Success {
messages := ""
for _, m := range a.Errors {
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
}
return errors.New(messages)
if !a.Success {
messages := ""
for _, m := range a.Errors {
messages += strconv.Itoa(m.Code) + " " + m.Message + "\n"
}
return errors.New(messages)
}

return nil
Expand Down
23 changes: 12 additions & 11 deletions store/cloudflare/cloudflare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ func TestCloudflare(t *testing.T) {
t.Log("Listed " + strconv.Itoa(len(records)) + " records")
}

err = wkv.Write(
&store.Record{
Key: randomK,
Value: []byte(randomV),
},
&store.Record{
Key: "expirationtest",
Value: []byte("This message will self destruct"),
Expiry: 75 * time.Second,
},
)
err = wkv.Write(&store.Record{
Key: randomK,
Value: []byte(randomV),
})
if err != nil {
t.Errorf("Write: %s", err.Error())
}
err = wkv.Write(&store.Record{
Key: "expirationtest",
Value: []byte("This message will self destruct"),
Expiry: 75 * time.Second,
})
if err != nil {
t.Errorf("Write: %s", err.Error())
}
Expand Down
87 changes: 47 additions & 40 deletions store/cockroach/cockroach.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,39 +80,47 @@ func (s *sqlStore) List() ([]*store.Record, error) {
}

// Read all records with keys
func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) {
func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
var options store.ReadOptions
for _, o := range opts {
o(&options)
}

// TODO: make use of options.Prefix using WHERE key LIKE = ?

q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table))
if err != nil {
return nil, err
}

var records []*store.Record
var timehelper pq.NullTime
for _, key := range keys {
row := q.QueryRow(key)
record := &store.Record{}
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
if err == sql.ErrNoRows {
return records, store.ErrNotFound
}
return records, err

row := q.QueryRow(key)
record := &store.Record{}
if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
if err == sql.ErrNoRows {
return records, store.ErrNotFound
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(key)
return records, store.ErrNotFound
}
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
} else {
records = append(records, record)
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(key)
return records, store.ErrNotFound
}
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
} else {
records = append(records, record)
}

return records, nil
}

// Write records
func (s *sqlStore) Write(rec ...*store.Record) error {
func (s *sqlStore) Write(r *store.Record) error {
q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry)
VALUES ($1, $2::bytea, $3)
ON CONFLICT (key)
Expand All @@ -121,37 +129,36 @@ func (s *sqlStore) Write(rec ...*store.Record) error {
if err != nil {
return err
}
for _, r := range rec {
var err error
if r.Expiry != 0 {
_, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
} else {
_, err = q.Exec(r.Key, r.Value, nil)
}
if err != nil {
return errors.Wrap(err, "Couldn't insert record "+r.Key)
}

if r.Expiry != 0 {
_, err = q.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
} else {
_, err = q.Exec(r.Key, r.Value, nil)
}

if err != nil {
return errors.Wrap(err, "Couldn't insert record "+r.Key)
}

return nil
}

// Delete records with keys
func (s *sqlStore) Delete(keys ...string) error {
func (s *sqlStore) Delete(key string) error {
q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table))
if err != nil {
return err
}
for _, key := range keys {
result, err := q.Exec(key)
if err != nil {
return err
}
_, err = result.RowsAffected()
if err != nil {
return err
}

result, err := q.Exec(key)
if err != nil {
return err
}
_, err = result.RowsAffected()
if err != nil {
return err
}

return nil
}

Expand Down
10 changes: 10 additions & 0 deletions store/cockroach/cockroach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,20 @@ func TestSQL(t *testing.T) {
Key: "test",
Value: []byte("foo"),
},
)
if err != nil {
t.Error(err)
}
err = sqlStore.Write(
&store.Record{
Key: "bar",
Value: []byte("baz"),
},
)
if err != nil {
t.Error(err)
}
err = sqlStore.Write(
&store.Record{
Key: "qux",
Value: []byte("aasad"),
Expand Down
Loading

0 comments on commit a90a74c

Please sign in to comment.