Skip to content

Commit

Permalink
fix client proxy and improve client Send func interface
Browse files Browse the repository at this point in the history
  • Loading branch information
xujinzheng committed Nov 6, 2015
1 parent 6bdf5d6 commit 8ff1b55
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 48 deletions.
43 changes: 9 additions & 34 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/base64"
"encoding/xml"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strings"
Expand Down Expand Up @@ -35,14 +34,14 @@ func init() {
}

const (
_GET Method = "GET"
_PUT = "PUT"
_POST = "POST"
_DELETE = "DELETE"
GET Method = "GET"
PUT = "PUT"
POST = "POST"
DELETE = "DELETE"
)

type MNSClient interface {
Send(method Method, headers map[string]string, message interface{}, resource string, v interface{}) (statusCode int, err error)
Send(method Method, headers map[string]string, message interface{}, resource string) (resp *http.Response, err error)
SetProxy(url string)
}

Expand Down Expand Up @@ -89,12 +88,12 @@ func NewAliMNSClient(url, accessKeyId, accessKeySecret string) MNSClient {
}

func (p *AliMNSClient) SetProxy(url string) {
p.url = url
p.proxyURL = url
}

func (p *AliMNSClient) proxy(req *http.Request) (*url.URL, error) {
if p.url != "" {
return url.Parse(p.url)
if p.proxyURL != "" {
return url.Parse(p.proxyURL)
}
return nil, nil
}
Expand All @@ -109,7 +108,7 @@ func (p *AliMNSClient) authorization(method Method, headers map[string]string, r
return
}

func (p *AliMNSClient) Send(method Method, headers map[string]string, message interface{}, resource string, v interface{}) (statusCode int, err error) {
func (p *AliMNSClient) Send(method Method, headers map[string]string, message interface{}, resource string) (resp *http.Response, err error) {
var xmlContent []byte

if message == nil {
Expand Down Expand Up @@ -159,35 +158,11 @@ func (p *AliMNSClient) Send(method Method, headers map[string]string, message in
req.Header.Add(header, value)
}

var resp *http.Response
if resp, err = p.client.Do(req); err != nil {
err = ERR_SEND_REQUEST_FAILED.New(errors.Params{"err": err})
return
}

if resp != nil {
defer resp.Body.Close()
statusCode = resp.StatusCode
if bBody, e := ioutil.ReadAll(resp.Body); e != nil {
err = ERR_READ_RESPONSE_BODY_FAILED.New(errors.Params{"err": e})
return
} else if resp.StatusCode != http.StatusCreated &&
resp.StatusCode != http.StatusOK &&
resp.StatusCode != http.StatusNoContent {
errResp := ErrorMessageResponse{}
if e := xml.Unmarshal(bBody, &errResp); e != nil {
err = ERR_UNMARSHAL_ERROR_RESPONSE_FAILED.New(errors.Params{"err": e})
return
}
err = to_error(errResp, resource)
return
} else if v != nil {
if e := xml.Unmarshal(bBody, v); e != nil {
err = ERR_UNMARSHAL_RESPONSE_FAILED.New(errors.Params{"err": e})
return
}
}
}
return
}

Expand Down
20 changes: 20 additions & 0 deletions decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ali_mns

import (
"encoding/xml"
"io"
)

type MNSDecoder interface {
Decode(reader io.Reader, v interface{}) (err error)
}

type AliMNSDecoder struct {
}

func (p *AliMNSDecoder) Decode(reader io.Reader, v interface{}) (err error) {
decoder := xml.NewDecoder(reader)
err = decoder.Decode(&v)

return
}
20 changes: 11 additions & 9 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type MNSQueue struct {
stopChan chan bool
qpsLimit int32
qpsMonitor *QPSMonitor
decoder MNSDecoder
}

func NewMNSQueue(name string, client MNSClient, qps ...int32) AliMNSQueue {
Expand All @@ -49,6 +50,7 @@ func NewMNSQueue(name string, client MNSClient, qps ...int32) AliMNSQueue {
queue.name = name
queue.stopChan = make(chan bool)
queue.qpsLimit = DefaultQPSLimit
queue.decoder = new(AliMNSDecoder)

if qps != nil && len(qps) == 1 && qps[0] > 0 {
queue.qpsLimit = qps[0]
Expand Down Expand Up @@ -77,7 +79,7 @@ func (p *MNSQueue) Name() string {

func (p *MNSQueue) SendMessage(message MessageSendRequest) (resp MessageSendResponse, err error) {
p.checkQPS()
_, err = p.client.Send(_POST, nil, message, fmt.Sprintf("queues/%s/%s", p.name, "messages"), &resp)
_, err = send(p.client, p.decoder, POST, nil, message, fmt.Sprintf("queues/%s/%s", p.name, "messages"), &resp)
return
}

Expand All @@ -92,7 +94,7 @@ func (p *MNSQueue) BatchSendMessage(messages ...MessageSendRequest) (resp BatchM
}

p.checkQPS()
_, err = p.client.Send(_POST, nil, batchRequest, fmt.Sprintf("queues/%s/%s", p.name, "messages"), &resp)
_, err = send(p.client, p.decoder, POST, nil, batchRequest, fmt.Sprintf("queues/%s/%s", p.name, "messages"), &resp)
return
}

Expand All @@ -108,7 +110,7 @@ func (p *MNSQueue) ReceiveMessage(respChan chan MessageReceiveResponse, errChan

for {
resp := MessageReceiveResponse{}
_, err := p.client.Send(_GET, nil, nil, resource, &resp)
_, err := send(p.client, p.decoder, GET, nil, nil, resource, &resp)
if err != nil {
errChan <- err
} else {
Expand Down Expand Up @@ -141,7 +143,7 @@ func (p *MNSQueue) BatchReceiveMessage(respChan chan BatchMessageReceiveResponse

for {
resp := BatchMessageReceiveResponse{}
_, err := p.client.Send(_GET, nil, nil, resource, &resp)
_, err := send(p.client, p.decoder, GET, nil, nil, resource, &resp)
if err != nil {
errChan <- err
} else {
Expand All @@ -165,7 +167,7 @@ func (p *MNSQueue) BatchReceiveMessage(respChan chan BatchMessageReceiveResponse
func (p *MNSQueue) PeekMessage(respChan chan MessageReceiveResponse, errChan chan error) {
for {
resp := MessageReceiveResponse{}
_, err := p.client.Send(_GET, nil, nil, fmt.Sprintf("queues/%s/%s?peekonly=true", p.name, "messages"), &resp)
_, err := send(p.client, p.decoder, GET, nil, nil, fmt.Sprintf("queues/%s/%s?peekonly=true", p.name, "messages"), &resp)
if err != nil {
errChan <- err
} else {
Expand All @@ -184,7 +186,7 @@ func (p *MNSQueue) BatchPeekMessage(respChan chan BatchMessageReceiveResponse, e

for {
resp := BatchMessageReceiveResponse{}
_, err := p.client.Send(_GET, nil, nil, fmt.Sprintf("queues/%s/%s?numOfMessages=%d&peekonly=true", p.name, "messages", numOfMessages), &resp)
_, err := send(p.client, p.decoder, GET, nil, nil, fmt.Sprintf("queues/%s/%s?numOfMessages=%d&peekonly=true", p.name, "messages", numOfMessages), &resp)
if err != nil {
errChan <- err
} else {
Expand All @@ -198,7 +200,7 @@ func (p *MNSQueue) BatchPeekMessage(respChan chan BatchMessageReceiveResponse, e

func (p *MNSQueue) DeleteMessage(receiptHandle string) (err error) {
p.checkQPS()
_, err = p.client.Send(_DELETE, nil, nil, fmt.Sprintf("queues/%s/%s?ReceiptHandle=%s", p.name, "messages", receiptHandle), nil)
_, err = send(p.client, p.decoder, DELETE, nil, nil, fmt.Sprintf("queues/%s/%s?ReceiptHandle=%s", p.name, "messages", receiptHandle), nil)
return
}

Expand All @@ -214,13 +216,13 @@ func (p *MNSQueue) BatchDeleteMessage(receiptHandles ...string) (err error) {
}

p.checkQPS()
_, err = p.client.Send(_DELETE, nil, handlers, fmt.Sprintf("queues/%s/%s", p.name, "messages"), nil)
_, err = send(p.client, p.decoder, DELETE, nil, handlers, fmt.Sprintf("queues/%s/%s", p.name, "messages"), nil)
return
}

func (p *MNSQueue) ChangeMessageVisibility(receiptHandle string, visibilityTimeout int64) (resp MessageVisibilityChangeResponse, err error) {
p.checkQPS()
_, err = p.client.Send(_PUT, nil, nil, fmt.Sprintf("queues/%s/%s?ReceiptHandle=%s&VisibilityTimeout=%d", p.name, "messages", receiptHandle, visibilityTimeout), &resp)
_, err = send(p.client, p.decoder, PUT, nil, nil, fmt.Sprintf("queues/%s/%s?ReceiptHandle=%s&VisibilityTimeout=%d", p.name, "messages", receiptHandle, visibilityTimeout), &resp)
return
}

Expand Down
13 changes: 8 additions & 5 deletions queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type MNSQueueManager struct {
credential Credential
accessKeyId string
accessKeySecret string

decoder MNSDecoder
}

func checkQueueName(queueName string) (err error) {
Expand Down Expand Up @@ -86,6 +88,7 @@ func NewMNSQueueManager(ownerId, accessKeyId, accessKeySecret string) AliQueueMa
ownerId: ownerId,
accessKeyId: accessKeyId,
accessKeySecret: accessKeySecret,
decoder: new(AliMNSDecoder),
}
}

Expand Down Expand Up @@ -136,7 +139,7 @@ func (p *MNSQueueManager) CreateQueue(location MNSLocation, queueName string, de
cli := NewAliMNSClient(url, p.accessKeyId, p.accessKeySecret)

var code int
code, err = cli.Send(_PUT, nil, &message, "queues/"+queueName, nil)
code, err = send(cli, p.decoder, PUT, nil, &message, "queues/"+queueName, nil)

if code == http.StatusNoContent {
err = ERR_MNS_QUEUE_ALREADY_EXIST_AND_HAVE_SAME_ATTR.New(errors.Params{"name": queueName})
Expand Down Expand Up @@ -173,7 +176,7 @@ func (p *MNSQueueManager) SetQueueAttributes(location MNSLocation, queueName str

cli := NewAliMNSClient(url, p.accessKeyId, p.accessKeySecret)

_, err = cli.Send(_PUT, nil, &message, fmt.Sprintf("queues/%s?metaoverride=true", queueName), nil)
_, err = send(cli, p.decoder, PUT, nil, &message, fmt.Sprintf("queues/%s?metaoverride=true", queueName), nil)
return
}

Expand All @@ -188,7 +191,7 @@ func (p *MNSQueueManager) GetQueueAttributes(location MNSLocation, queueName str

cli := NewAliMNSClient(url, p.accessKeyId, p.accessKeySecret)

_, err = cli.Send(_GET, nil, nil, "queues/"+queueName, &attr)
_, err = send(cli, p.decoder, GET, nil, nil, "queues/"+queueName, &attr)

return
}
Expand All @@ -204,7 +207,7 @@ func (p *MNSQueueManager) DeleteQueue(location MNSLocation, queueName string) (e

cli := NewAliMNSClient(url, p.accessKeyId, p.accessKeySecret)

_, err = cli.Send(_DELETE, nil, nil, "queues/"+queueName, nil)
_, err = send(cli, p.decoder, DELETE, nil, nil, "queues/"+queueName, nil)

return
}
Expand Down Expand Up @@ -239,7 +242,7 @@ func (p *MNSQueueManager) ListQueue(location MNSLocation, nextMarker Base64Bytes
header["x-mns-prefix"] = prefix
}

_, err = cli.Send(_GET, header, nil, "queues", &queues)
_, err = send(cli, p.decoder, GET, header, nil, "queues", &queues)

return
}
41 changes: 41 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ali_mns

import (
"net/http"

"github.com/gogap/errors"
)

func send(client MNSClient, decoder MNSDecoder, method Method, headers map[string]string, message interface{}, resource string, v interface{}) (statusCode int, err error) {
var resp *http.Response
if resp, err = client.Send(method, headers, message, resource); err != nil {
return
}

if resp != nil {
defer resp.Body.Close()
statusCode = resp.StatusCode

if resp.StatusCode != http.StatusCreated &&
resp.StatusCode != http.StatusOK &&
resp.StatusCode != http.StatusNoContent {

errResp := ErrorMessageResponse{}
if e := decoder.Decode(resp.Body, &errResp); e != nil {
err = ERR_UNMARSHAL_ERROR_RESPONSE_FAILED.New(errors.Params{"err": e})
return
}
err = to_error(errResp, resource)
return
}

if v != nil {
if e := decoder.Decode(resp.Body, v); e != nil {
err = ERR_UNMARSHAL_RESPONSE_FAILED.New(errors.Params{"err": e})
return
}
}
}

return
}

0 comments on commit 8ff1b55

Please sign in to comment.