Skip to content

Commit

Permalink
Merge pull request #2 from Hanson/main
Browse files Browse the repository at this point in the history
Init registry
  • Loading branch information
GuangmingLuo authored Mar 1, 2022
2 parents 166a013 + 5dcad5d commit 599a149
Show file tree
Hide file tree
Showing 14 changed files with 1,512 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/push-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ jobs:
go install mvdan.cc/[email protected]
test -z "$(gofumpt -l -extra .)"
- name: Prepare Consul
run: |
make prepare
sleep 5
- name: Unit Test
run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./...

Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
prepare:
docker pull consul:latest
docker run -d --name=dev-consul -e CONSUL_BIND_INTERFACE=eth0 -p 8500:8500 consul:latest
3 changes: 2 additions & 1 deletion OWNERS
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
approvers:
- YangruiEmma
- hanson
- Hanson
- baiyutang
95 changes: 94 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,94 @@
# registry-consul
# registry-consul (This is a community driven project)

## Docs

### Server

#### Basic Usage
```
import (
...
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
consul "github.com/kitex-contrib/registry-consul"
consulapi "github.com/hashicorp/consul/api"
)
func main() {
r, err := consul.NewConsulRegister("127.0.0.1:8500")
if err != nil {
log.Fatal(err)
}
server := hello.NewServer(new(HelloImpl), server.WithRegistry(r), server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{
ServiceName: "greet.server",
}))
err = server.Run()
if err != nil {
log.Fatal(err)
}
}
```

#### Customize Service Check

registry has a default config for service check like

```
check.Timeout = "5s"
check.Interval = "5s"
check.DeregisterCriticalServiceAfter = "1m"
```

you can also use `WithCheck` to modify your config

```
import (
...
consul "github.com/kitex-contrib/registry-consul"
consulapi "github.com/hashicorp/consul/api"
)
func main() {
...
r, err := consul.NewConsulRegister("127.0.0.1:8500", consul.WithCheck(&consulapi.AgentServiceCheck{
Interval: "7s",
Timeout: "5s",
DeregisterCriticalServiceAfter: "1m",
}))
}
```

### Client

```
import (
...
"github.com/cloudwego/kitex/client"
consul "github.com/kitex-contrib/registry-consul"
...
)
func main() {
...
r, err := consul.NewConsulResolver("127.0.0.1:8500")
if err != nil {
log.Fatal(err)
}
client, err := echo.NewClient("greet.server", client.WithResolver(r))
if err != nil {
log.Fatal(err)
}
...
}
```

## Example

See Server and Client in example.

## Compatibility

Compatible with consul.

maintained by: [Hanson](https://github.com/hanson)
135 changes: 135 additions & 0 deletions consul_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2021 CloudWeGo Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consul

import (
"errors"
"fmt"

"github.com/cloudwego/kitex/pkg/registry"
"github.com/hashicorp/consul/api"
)

type consulRegistry struct {
consulClient *api.Client
opts options
}

var _ registry.Registry = (*consulRegistry)(nil)

type options struct {
check *api.AgentServiceCheck
}

// Option is consul option.
type Option func(o *options)

// WithCheck is consul registry option to set AgentServiceCheck.
func WithCheck(check *api.AgentServiceCheck) Option {
return func(o *options) { o.check = check }
}

// NewConsulRegister create a new registry using consul.
func NewConsulRegister(address string, opts ...Option) (registry.Registry, error) {
config := api.DefaultConfig()
config.Address = address
client, err := api.NewClient(config)
if err != nil {
return nil, err
}

op := options{
check: defaultCheck(),
}

for _, option := range opts {
option(&op)
}

return &consulRegistry{consulClient: client, opts: op}, nil
}

// NewConsulRegisterWithConfig create a new registry using consul, with a custom config.
func NewConsulRegisterWithConfig(config *api.Config) (*consulRegistry, error) {
client, err := api.NewClient(config)
if err != nil {
return nil, err
}

return &consulRegistry{consulClient: client}, nil
}

// Register register a service to consul.
func (c *consulRegistry) Register(info *registry.Info) error {
if err := validateRegistryInfo(info); err != nil {
return err
}

host, port, err := parseAddr(info.Addr)
if err != nil {
return err
}
svcID, err := getServiceId(info)
if err != nil {
return err
}
svcInfo := &api.AgentServiceRegistration{
ID: svcID,
Address: host,
Port: port,
Name: info.ServiceName,
Meta: info.Tags,
Weights: &api.AgentWeights{
Passing: info.Weight,
Warning: info.Weight,
},
Check: c.opts.check,
}

if c.opts.check != nil {
c.opts.check.TCP = fmt.Sprintf("%s:%d", host, port)
svcInfo.Check = c.opts.check
}

return c.consulClient.Agent().ServiceRegister(svcInfo)
}

// Deregister deregister a service from consul.
func (c *consulRegistry) Deregister(info *registry.Info) error {
svcID, err := getServiceId(info)
if err != nil {
return err
}
return c.consulClient.Agent().ServiceDeregister(svcID)
}

func validateRegistryInfo(info *registry.Info) error {
if info.ServiceName == "" {
return errors.New("missing service name in consul register")
}
if info.Addr == nil {
return errors.New("missing addr in consul register")
}
return nil
}

func defaultCheck() *api.AgentServiceCheck {
check := new(api.AgentServiceCheck)
check.Timeout = "5s"
check.Interval = "5s"
check.DeregisterCriticalServiceAfter = "1m"

return check
}
94 changes: 94 additions & 0 deletions consul_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2021 CloudWeGo Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consul

import (
"context"
"errors"
"fmt"
"log"

"github.com/cloudwego/kitex/pkg/discovery"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/hashicorp/consul/api"
)

const (
defaultNetwork = "tcp"
)

type consulResolver struct {
consulClient *api.Client
}

var _ discovery.Resolver = (*consulResolver)(nil)

// NewConsulResolver create a service resolver using consul.
func NewConsulResolver(address string) (discovery.Resolver, error) {
config := api.DefaultConfig()
config.Address = address
client, err := api.NewClient(config)
if err != nil {
return nil, err
}

return &consulResolver{consulClient: client}, nil
}

// Target return a description for the given target that is suitable for being a key for cache.
func (c *consulResolver) Target(_ context.Context, target rpcinfo.EndpointInfo) (description string) {
return target.ServiceName()
}

// Resolve a service info by desc.
func (c *consulResolver) Resolve(_ context.Context, desc string) (discovery.Result, error) {
var eps []discovery.Instance
agentServiceList, _, err := c.consulClient.Health().Service(desc, "", true, nil)
if err != nil {
log.Printf("err:%v", err)
return discovery.Result{}, err
}
if len(agentServiceList) == 0 {
return discovery.Result{}, errors.New("no service found")
}
for _, i := range agentServiceList {
svc := i.Service
if svc == nil || svc.Address == "" {
continue
}
eps = append(eps, discovery.NewInstance(
defaultNetwork,
fmt.Sprint(svc.Address, ":", svc.Port),
svc.Weights.Passing,
svc.Meta,
))
}

return discovery.Result{
Cacheable: true,
CacheKey: desc,
Instances: eps,
}, nil
}

// Diff computes the difference between two results.
func (c *consulResolver) Diff(cacheKey string, prev, next discovery.Result) (discovery.Change, bool) {
return discovery.DefaultDiff(cacheKey, prev, next)
}

// Name return the name of this resolver.
func (c *consulResolver) Name() string {
return "consul"
}
Loading

0 comments on commit 599a149

Please sign in to comment.