Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init registry #2

Merged
merged 27 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/push-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ jobs:
go install mvdan.cc/[email protected]
test -z "$(gofumpt -l -extra .)"

- name: Prepare Consul
run: |
make prepare
sleep 5

- name: Unit Test
run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./...

Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
prepare:
docker pull consul:latest
docker run -d --name=dev-consul -e CONSUL_BIND_INTERFACE=eth0 -p 8500:8500 consul:latest
3 changes: 2 additions & 1 deletion OWNERS
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
approvers:
- YangruiEmma
- hanson
- Hanson
- baiyutang
95 changes: 94 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,94 @@
# registry-consul
# registry-consul (This is a community driven project)
GuangmingLuo marked this conversation as resolved.
Show resolved Hide resolved

## Docs

### Server

#### Basic Usage
```
import (
...
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
consul "github.com/kitex-contrib/registry-consul"
consulapi "github.com/hashicorp/consul/api"
)

func main() {

r, err := consul.NewConsulRegister("127.0.0.1:8500")
if err != nil {
log.Fatal(err)
}

server := hello.NewServer(new(HelloImpl), server.WithRegistry(r), server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
ServiceName: "greet.server",
}))
err = server.Run()
if err != nil {
log.Fatal(err)
}
}
```

#### Customize Service Check

registry has a default config for service check like

```
check.Timeout = "5s"
check.Interval = "5s"
check.DeregisterCriticalServiceAfter = "1m"
```

you can also use `WithCheck` to modify your config

```
import (
...
consul "github.com/kitex-contrib/registry-consul"
consulapi "github.com/hashicorp/consul/api"
)
func main() {
...
r, err := consul.NewConsulRegister("127.0.0.1:8500", consul.WithCheck(&consulapi.AgentServiceCheck{
Interval: "7s",
Timeout: "5s",
DeregisterCriticalServiceAfter: "1m",
}))
}
```

### Client

```
import (
...
"github.com/cloudwego/kitex/client"
consul "github.com/kitex-contrib/registry-consul"
...
)

func main() {
...
r, err := consul.NewConsulResolver("127.0.0.1:8500")
if err != nil {
log.Fatal(err)
}
client, err := echo.NewClient("greet.server", client.WithResolver(r))
if err != nil {
log.Fatal(err)
}
...
}
```

## Example

See Server and Client in example.

## Compatibility

Compatible with consul.

maintained by: [Hanson](https://github.com/hanson)
135 changes: 135 additions & 0 deletions consul_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2021 CloudWeGo Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
GuangmingLuo marked this conversation as resolved.
Show resolved Hide resolved
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consul

import (
"errors"
"fmt"

"github.com/cloudwego/kitex/pkg/registry"
"github.com/hashicorp/consul/api"
)

type consulRegistry struct {
consulClient *api.Client
opts options
}

var _ registry.Registry = (*consulRegistry)(nil)

type options struct {
check *api.AgentServiceCheck
}

// Option is consul option.
type Option func(o *options)

// WithCheck is consul registry option to set AgentServiceCheck.
func WithCheck(check *api.AgentServiceCheck) Option {
return func(o *options) { o.check = check }
}

// NewConsulRegister create a new registry using consul.
func NewConsulRegister(address string, opts ...Option) (registry.Registry, error) {
config := api.DefaultConfig()
config.Address = address
client, err := api.NewClient(config)
if err != nil {
return nil, err
}

op := options{
check: defaultCheck(),
}

for _, option := range opts {
option(&op)
}

return &consulRegistry{consulClient: client, opts: op}, nil
}

// NewConsulRegisterWithConfig create a new registry using consul, with a custom config.
func NewConsulRegisterWithConfig(config *api.Config) (*consulRegistry, error) {
client, err := api.NewClient(config)
if err != nil {
return nil, err
}

return &consulRegistry{consulClient: client}, nil
}

// Register register a service to consul.
func (c *consulRegistry) Register(info *registry.Info) error {
if err := validateRegistryInfo(info); err != nil {
return err
}

host, port, err := parseAddr(info.Addr)
if err != nil {
return err
}
svcID, err := getServiceId(info)
if err != nil {
return err
}
svcInfo := &api.AgentServiceRegistration{
ID: svcID,
Address: host,
Port: port,
Name: info.ServiceName,
Meta: info.Tags,
Weights: &api.AgentWeights{
Passing: info.Weight,
Warning: info.Weight,
},
Check: c.opts.check,
}

if c.opts.check != nil {
c.opts.check.TCP = fmt.Sprintf("%s:%d", host, port)
svcInfo.Check = c.opts.check
}

return c.consulClient.Agent().ServiceRegister(svcInfo)
}

// Deregister deregister a service from consul.
func (c *consulRegistry) Deregister(info *registry.Info) error {
svcID, err := getServiceId(info)
if err != nil {
return err
}
return c.consulClient.Agent().ServiceDeregister(svcID)
}

func validateRegistryInfo(info *registry.Info) error {
if info.ServiceName == "" {
return errors.New("missing service name in consul register")
}
if info.Addr == nil {
return errors.New("missing addr in consul register")
}
return nil
}

func defaultCheck() *api.AgentServiceCheck {
check := new(api.AgentServiceCheck)
check.Timeout = "5s"
check.Interval = "5s"
check.DeregisterCriticalServiceAfter = "1m"

return check
}
94 changes: 94 additions & 0 deletions consul_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2021 CloudWeGo Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consul
Hanson marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"errors"
"fmt"
"log"

"github.com/cloudwego/kitex/pkg/discovery"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/hashicorp/consul/api"
)

const (
defaultNetwork = "tcp"
)

type consulResolver struct {
consulClient *api.Client
}

var _ discovery.Resolver = (*consulResolver)(nil)

// NewConsulResolver create a service resolver using consul.
func NewConsulResolver(address string) (discovery.Resolver, error) {
config := api.DefaultConfig()
config.Address = address
client, err := api.NewClient(config)
if err != nil {
return nil, err
}

return &consulResolver{consulClient: client}, nil
}

// Target return a description for the given target that is suitable for being a key for cache.
func (c *consulResolver) Target(_ context.Context, target rpcinfo.EndpointInfo) (description string) {
return target.ServiceName()
}

// Resolve a service info by desc.
func (c *consulResolver) Resolve(_ context.Context, desc string) (discovery.Result, error) {
var eps []discovery.Instance
agentServiceList, _, err := c.consulClient.Health().Service(desc, "", true, nil)
if err != nil {
log.Printf("err:%v", err)
return discovery.Result{}, err
}
if len(agentServiceList) == 0 {
return discovery.Result{}, errors.New("no service found")
}
for _, i := range agentServiceList {
svc := i.Service
if svc == nil || svc.Address == "" {
continue
}
eps = append(eps, discovery.NewInstance(
defaultNetwork,
fmt.Sprint(svc.Address, ":", svc.Port),
svc.Weights.Passing,
svc.Meta,
))
}

return discovery.Result{
Cacheable: true,
CacheKey: desc,
Instances: eps,
}, nil
}

// Diff computes the difference between two results.
func (c *consulResolver) Diff(cacheKey string, prev, next discovery.Result) (discovery.Change, bool) {
return discovery.DefaultDiff(cacheKey, prev, next)
}

// Name return the name of this resolver.
func (c *consulResolver) Name() string {
return "consul"
}
Loading