Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
GuoYL123 committed Mar 2, 2020
2 parents dee28ca + 2720808 commit 2b0c1a1
Show file tree
Hide file tree
Showing 17 changed files with 329 additions and 155 deletions.
64 changes: 42 additions & 22 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/apache/servicecomb-kie/pkg/common"
Expand All @@ -39,6 +40,10 @@ import (
const (
version = "v1"
APIPathKV = "kie/kv"

HeaderContentType = "Content-Type"
MsgGetFailed = "get failed"
FmtGetFailed = "get %s failed,http status [%s], body [%s]"
)

//client errors
Expand All @@ -53,7 +58,7 @@ type Client struct {
opts Config
cipher security.Cipher
c *httpclient.Requests
CurrentRevision string
currentRevision int
}

//Config is the config of client
Expand Down Expand Up @@ -97,20 +102,20 @@ func (c *Client) Put(ctx context.Context, kv model.KVRequest, opts ...OpOption)
}
url := fmt.Sprintf("%s/%s/%s/%s/%s", c.opts.Endpoint, version, options.Project, APIPathKV, kv.Key)
h := http.Header{}
h.Set("Content-Type", "application/json")
h.Set(HeaderContentType, common.ContentTypeJSON)
body, _ := json.Marshal(kv)
resp, err := c.c.Do(ctx, "PUT", url, h, body)
resp, err := c.c.Do(ctx, http.MethodPut, url, h, body)
if err != nil {
return nil, err
}
b := httputil.ReadBody(resp)
if resp.StatusCode != http.StatusOK {
openlogging.Error("get failed", openlogging.WithTags(openlogging.Tags{
openlogging.Error(MsgGetFailed, openlogging.WithTags(openlogging.Tags{
"k": kv.Key,
"status": resp.Status,
"body": b,
}))
return nil, fmt.Errorf("get %s failed,http status [%s], body [%s]", kv.Key, resp.Status, b)
return nil, fmt.Errorf(FmtGetFailed, kv.Key, resp.Status, b)
}

kvs := &model.KVDoc{}
Expand All @@ -123,20 +128,22 @@ func (c *Client) Put(ctx context.Context, kv model.KVRequest, opts ...OpOption)
}

//Get get value of a key
func (c *Client) Get(ctx context.Context, opts ...GetOption) (*model.KVResponse, error) {
func (c *Client) Get(ctx context.Context, opts ...GetOption) (*model.KVResponse, int, error) {
options := GetOptions{}
for _, o := range opts {
o(&options)
}
if options.Project == "" {
options.Project = defaultProject
}

if options.Revision == "" {
options.Revision = strconv.Itoa(c.currentRevision)
}
var url string
if options.Key != "" {
url = fmt.Sprintf("%s/%s/%s/%s/%s?revision=%s", c.opts.Endpoint, version, options.Project, APIPathKV, options.Key, c.CurrentRevision)
url = fmt.Sprintf("%s/%s/%s/%s/%s?revision=%s", c.opts.Endpoint, version, options.Project, APIPathKV, options.Key, options.Revision)
} else {
url = fmt.Sprintf("%s/%s/%s/%s?revision=%s", c.opts.Endpoint, version, options.Project, APIPathKV, c.CurrentRevision)
url = fmt.Sprintf("%s/%s/%s/%s?revision=%s", c.opts.Endpoint, version, options.Project, APIPathKV, options.Revision)
}
if options.Wait != "" {
url = url + "&wait=" + options.Wait
Expand All @@ -152,33 +159,41 @@ func (c *Client) Get(ctx context.Context, opts ...GetOption) (*model.KVResponse,
url = url + labels
}
h := http.Header{}
resp, err := c.c.Do(ctx, "GET", url, h, nil)
resp, err := c.c.Do(ctx, http.MethodGet, url, h, nil)
if err != nil {
return nil, err
return nil, -1, err
}
responseRevision, err := strconv.Atoi(resp.Header.Get(common.HeaderRevision))
if err != nil {
responseRevision = -1
}
b := httputil.ReadBody(resp)
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
return nil, ErrKeyNotExist
return nil, responseRevision, ErrKeyNotExist
}
if resp.StatusCode == http.StatusNotModified {
return nil, ErrNoChanges
return nil, responseRevision, ErrNoChanges
}
openlogging.Error("get failed", openlogging.WithTags(openlogging.Tags{
openlogging.Error(MsgGetFailed, openlogging.WithTags(openlogging.Tags{
"k": options.Key,
"status": resp.Status,
"body": b,
}))
return nil, fmt.Errorf("get %s failed,http status [%s], body [%s]", options.Key, resp.Status, b)
return nil, responseRevision, fmt.Errorf(FmtGetFailed, options.Key, resp.Status, b)
} else if err != nil {
msg := fmt.Sprintf("get revision from response header failed when the request status is OK: %v", err)
openlogging.Error(msg)
return nil, responseRevision, fmt.Errorf(msg)
}
var kvs *model.KVResponse
err = json.Unmarshal(b, &kvs)
if err != nil {
openlogging.Error("unmarshal kv failed:" + err.Error())
return nil, err
return nil, responseRevision, err
}
c.CurrentRevision = resp.Header.Get(common.HeaderRevision)
return kvs, nil
c.currentRevision = responseRevision
return kvs, responseRevision, nil
}

//Summary get value by labels
Expand Down Expand Up @@ -206,7 +221,7 @@ func (c *Client) Summary(ctx context.Context, opts ...GetOption) ([]*model.KVRes
}
url := fmt.Sprintf("%s/%s/%s/%s?%s", c.opts.Endpoint, version, options.Project, "kie/summary", labelParams)
h := http.Header{}
resp, err := c.c.Do(ctx, "GET", url, h, nil)
resp, err := c.c.Do(ctx, http.MethodGet, url, h, nil)
if err != nil {
return nil, err
}
Expand All @@ -215,7 +230,7 @@ func (c *Client) Summary(ctx context.Context, opts ...GetOption) ([]*model.KVRes
if resp.StatusCode == http.StatusNotFound {
return nil, ErrKeyNotExist
}
openlogging.Error("get failed", openlogging.WithTags(openlogging.Tags{
openlogging.Error(MsgGetFailed, openlogging.WithTags(openlogging.Tags{
"p": options.Project,
"status": resp.Status,
"body": b,
Expand Down Expand Up @@ -246,8 +261,8 @@ func (c *Client) Delete(ctx context.Context, kvID, labelID string, opts ...OpOpt
url = fmt.Sprintf("%s?labelID=%s", url, labelID)
}
h := http.Header{}
h.Set("Content-Type", "application/json")
resp, err := c.c.Do(ctx, "DELETE", url, h, nil)
h.Set(HeaderContentType, common.ContentTypeJSON)
resp, err := c.c.Do(ctx, http.MethodDelete, url, h, nil)
if err != nil {
return err
}
Expand All @@ -257,3 +272,8 @@ func (c *Client) Delete(ctx context.Context, kvID, labelID string, opts ...OpOpt
}
return nil
}

//CurrentRevision return the current revision of kie, which is updated on the last get request
func (c *Client) CurrentRevision() int {
return c.currentRevision
}
24 changes: 16 additions & 8 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,32 @@ func TestClient_Put(t *testing.T) {
_, err := c.Put(context.TODO(), kv, WithProject("client_test"))
assert.NoError(t, err)

kvs, _ := c.Get(context.TODO(),
kvs, responseRevision, _ := c.Get(context.TODO(),
WithKey("app.properties"),
WithGetProject("client_test"),
WithLabels(map[string]string{"service": "client"}))
assert.GreaterOrEqual(t, len(kvs.Data), 1)

_, err = c.Get(context.TODO(),
_, _, err = c.Get(context.TODO(),
WithGetProject("client_test"),
WithLabels(map[string]string{"service": "client"}))
WithLabels(map[string]string{"service": "client"}),
WithRevision(responseRevision))
assert.Equal(t, ErrNoChanges, err)

_, err = c.Get(context.TODO(),
_, _, err = c.Get(context.TODO(),
WithGetProject("client_test"),
WithLabels(map[string]string{"service": "client"}))
assert.Error(t, err)

_, _, err = c.Get(context.TODO(),
WithGetProject("client_test"),
WithLabels(map[string]string{"service": "client"}),
WithRevision(c.CurrentRevision()-1))
assert.NoError(t, err)

t.Run("long polling,wait 10s,change value,should return result", func(t *testing.T) {
go func() {
kvs, err = c.Get(context.TODO(),
kvs, _, err = c.Get(context.TODO(),
WithLabels(map[string]string{"service": "client"}),
WithGetProject("client_test"),
WithWait("10s"))
Expand All @@ -80,8 +88,8 @@ func TestClient_Put(t *testing.T) {
}
_, err := c.Put(context.TODO(), kv, WithProject("client_test"))
assert.NoError(t, err)
t.Log(c.CurrentRevision)
kvs, err = c.Get(context.TODO(),
t.Log(c.CurrentRevision())
kvs, _, err = c.Get(context.TODO(),
WithGetProject("client_test"),
WithLabels(map[string]string{"service": "client"}),
WithExact())
Expand All @@ -103,7 +111,7 @@ func TestClient_Delete(t *testing.T) {
kvBody.Labels["env"] = "client_test"
kv, err := c.Put(context.TODO(), kvBody, WithProject("client_test"))
assert.NoError(t, err)
kvs, err := c.Get(context.TODO(),
kvs, _, err := c.Get(context.TODO(),
WithKey("time"),
WithGetProject("client_test"),
WithLabels(map[string]string{"env": "client_test"}))
Expand Down
20 changes: 15 additions & 5 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package client

import "strconv"

const (
defaultProject = "default"
)
Expand All @@ -29,11 +31,12 @@ type OpOption func(*OpOptions)

//GetOptions is the options of client func
type GetOptions struct {
Labels []map[string]string
Project string
Key string
Wait string
Exact bool
Labels []map[string]string
Project string
Key string
Wait string
Exact bool
Revision string
}

//OpOptions is the options of client func
Expand Down Expand Up @@ -78,6 +81,13 @@ func WithKey(k string) GetOption {
}
}

//WithRevision query keys with certain revision
func WithRevision(revision int) GetOption {
return func(options *GetOptions) {
options.Revision = strconv.Itoa(revision)
}
}

//WithProject set project to param
func WithProject(project string) OpOption {
return func(options *OpOptions) {
Expand Down
9 changes: 5 additions & 4 deletions cmd/kieserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
package main

import (
"github.com/apache/servicecomb-kie/server/pubsub"
"github.com/apache/servicecomb-kie/server/service"
"os"

"github.com/apache/servicecomb-kie/server/config"
_ "github.com/apache/servicecomb-kie/server/handler"
"github.com/apache/servicecomb-kie/server/pubsub"
v1 "github.com/apache/servicecomb-kie/server/resource/v1"
"github.com/apache/servicecomb-kie/server/service"
_ "github.com/apache/servicecomb-kie/server/service/mongo"
"github.com/go-chassis/go-chassis"
"github.com/go-chassis/go-chassis/core/common"
"github.com/go-mesh/openlogging"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -77,8 +78,8 @@ func main() {
if err := parseConfigFromCmd(os.Args); err != nil {
openlogging.Fatal(err.Error())
}
chassis.RegisterSchema("rest", &v1.KVResource{})
chassis.RegisterSchema("rest", &v1.HistoryResource{})
chassis.RegisterSchema(common.ProtocolRest, &v1.KVResource{})
chassis.RegisterSchema(common.ProtocolRest, &v1.HistoryResource{})
if err := chassis.Init(); err != nil {
openlogging.Fatal(err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion scripts/travis/start_deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@

cd build
bash build_docker.sh
sudo docker-compose -f $GOPATH/src/github.com/apache/servicecomb-kie/deployments/docker/docker-compose.yaml up -d
sudo docker-compose -f $GOPATH/src/github.com/apache/servicecomb-kie/deployments/docker/docker-compose.yaml down
sudo docker-compose -f $GOPATH/src/github.com/apache/servicecomb-kie/deployments/docker/docker-compose.yaml up -d
1 change: 1 addition & 0 deletions scripts/travis/unit_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set -e
# Make the Coverage File
echo "mode: atomic" > coverage.txt

go clean -testcache
#Start the Test
for d in $(go list ./... | grep -v vendor | grep -v third_party | grep -v examples); do
echo $d
Expand Down
13 changes: 10 additions & 3 deletions server/handler/noop_auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@
package handler

import (
"github.com/apache/servicecomb-kie/server/resource/v1"
"github.com/go-chassis/go-chassis/core/handler"
"github.com/go-chassis/go-chassis/core/invocation"
"github.com/go-mesh/openlogging"
)

//const of noop auth handler
const (
NoopAuthHandlerName = "auth-handler"
DefaultDomain = "default"
)

//NoopAuthHandler not need implement any logic
//developer can extend authenticate and authorization by set new handler in chassis.yaml
type NoopAuthHandler struct{}

//Handle set local attribute to http request
func (bk *NoopAuthHandler) Handle(chain *handler.Chain, inv *invocation.Invocation, cb invocation.ResponseCallBack) {
inv.SetMetadata("domain", "default")
inv.SetMetadata(v1.AttributeDomainKey, DefaultDomain)
chain.Next(inv, cb)
}

Expand All @@ -39,10 +46,10 @@ func newDomainResolver() handler.Handler {

//Name is handler name
func (bk *NoopAuthHandler) Name() string {
return "auth-handler"
return NoopAuthHandlerName
}
func init() {
if err := handler.RegisterHandler("auth-handler", newDomainResolver); err != nil {
if err := handler.RegisterHandler(NoopAuthHandlerName, newDomainResolver); err != nil {
openlogging.Fatal("register auth-handler failed: " + err.Error())
}
}
9 changes: 8 additions & 1 deletion server/pubsub/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@ package pubsub
import (
"encoding/json"
"errors"
"github.com/apache/servicecomb-kie/pkg/common"
"reflect"
"strings"

"github.com/apache/servicecomb-kie/pkg/common"
)

// const
const (
ActionPut = "put"
ActionDelete = "delete"
)

//KVChangeEvent is event between kie nodes, and broadcast by serf
Expand Down
Loading

0 comments on commit 2b0c1a1

Please sign in to comment.