diff --git a/client.go b/client.go index 34b2001..edb1edd 100644 --- a/client.go +++ b/client.go @@ -5,7 +5,6 @@ import ( "encoding/base64" "encoding/xml" "fmt" - "io/ioutil" "net/http" "net/url" "strings" @@ -35,14 +34,14 @@ func init() { } const ( - _GET Method = "GET" - _PUT = "PUT" - _POST = "POST" - _DELETE = "DELETE" + GET Method = "GET" + PUT = "PUT" + POST = "POST" + DELETE = "DELETE" ) type MNSClient interface { - Send(method Method, headers map[string]string, message interface{}, resource string, v interface{}) (statusCode int, err error) + Send(method Method, headers map[string]string, message interface{}, resource string) (resp *http.Response, err error) SetProxy(url string) } @@ -89,12 +88,12 @@ func NewAliMNSClient(url, accessKeyId, accessKeySecret string) MNSClient { } func (p *AliMNSClient) SetProxy(url string) { - p.url = url + p.proxyURL = url } func (p *AliMNSClient) proxy(req *http.Request) (*url.URL, error) { - if p.url != "" { - return url.Parse(p.url) + if p.proxyURL != "" { + return url.Parse(p.proxyURL) } return nil, nil } @@ -109,7 +108,7 @@ func (p *AliMNSClient) authorization(method Method, headers map[string]string, r return } -func (p *AliMNSClient) Send(method Method, headers map[string]string, message interface{}, resource string, v interface{}) (statusCode int, err error) { +func (p *AliMNSClient) Send(method Method, headers map[string]string, message interface{}, resource string) (resp *http.Response, err error) { var xmlContent []byte if message == nil { @@ -159,35 +158,11 @@ func (p *AliMNSClient) Send(method Method, headers map[string]string, message in req.Header.Add(header, value) } - var resp *http.Response if resp, err = p.client.Do(req); err != nil { err = ERR_SEND_REQUEST_FAILED.New(errors.Params{"err": err}) return } - if resp != nil { - defer resp.Body.Close() - statusCode = resp.StatusCode - if bBody, e := ioutil.ReadAll(resp.Body); e != nil { - err = ERR_READ_RESPONSE_BODY_FAILED.New(errors.Params{"err": e}) - return - } else if resp.StatusCode != http.StatusCreated && - resp.StatusCode != http.StatusOK && - resp.StatusCode != http.StatusNoContent { - errResp := ErrorMessageResponse{} - if e := xml.Unmarshal(bBody, &errResp); e != nil { - err = ERR_UNMARSHAL_ERROR_RESPONSE_FAILED.New(errors.Params{"err": e}) - return - } - err = to_error(errResp, resource) - return - } else if v != nil { - if e := xml.Unmarshal(bBody, v); e != nil { - err = ERR_UNMARSHAL_RESPONSE_FAILED.New(errors.Params{"err": e}) - return - } - } - } return } diff --git a/decoder.go b/decoder.go new file mode 100644 index 0000000..5a00311 --- /dev/null +++ b/decoder.go @@ -0,0 +1,20 @@ +package ali_mns + +import ( + "encoding/xml" + "io" +) + +type MNSDecoder interface { + Decode(reader io.Reader, v interface{}) (err error) +} + +type AliMNSDecoder struct { +} + +func (p *AliMNSDecoder) Decode(reader io.Reader, v interface{}) (err error) { + decoder := xml.NewDecoder(reader) + err = decoder.Decode(&v) + + return +} diff --git a/queue.go b/queue.go index 36de685..290014b 100644 --- a/queue.go +++ b/queue.go @@ -37,6 +37,7 @@ type MNSQueue struct { stopChan chan bool qpsLimit int32 qpsMonitor *QPSMonitor + decoder MNSDecoder } func NewMNSQueue(name string, client MNSClient, qps ...int32) AliMNSQueue { @@ -49,6 +50,7 @@ func NewMNSQueue(name string, client MNSClient, qps ...int32) AliMNSQueue { queue.name = name queue.stopChan = make(chan bool) queue.qpsLimit = DefaultQPSLimit + queue.decoder = new(AliMNSDecoder) if qps != nil && len(qps) == 1 && qps[0] > 0 { queue.qpsLimit = qps[0] @@ -77,7 +79,7 @@ func (p *MNSQueue) Name() string { func (p *MNSQueue) SendMessage(message MessageSendRequest) (resp MessageSendResponse, err error) { p.checkQPS() - _, err = p.client.Send(_POST, nil, message, fmt.Sprintf("queues/%s/%s", p.name, "messages"), &resp) + _, err = send(p.client, p.decoder, POST, nil, message, fmt.Sprintf("queues/%s/%s", p.name, "messages"), &resp) return } @@ -92,7 +94,7 @@ func (p *MNSQueue) BatchSendMessage(messages ...MessageSendRequest) (resp BatchM } p.checkQPS() - _, err = p.client.Send(_POST, nil, batchRequest, fmt.Sprintf("queues/%s/%s", p.name, "messages"), &resp) + _, err = send(p.client, p.decoder, POST, nil, batchRequest, fmt.Sprintf("queues/%s/%s", p.name, "messages"), &resp) return } @@ -108,7 +110,7 @@ func (p *MNSQueue) ReceiveMessage(respChan chan MessageReceiveResponse, errChan for { resp := MessageReceiveResponse{} - _, err := p.client.Send(_GET, nil, nil, resource, &resp) + _, err := send(p.client, p.decoder, GET, nil, nil, resource, &resp) if err != nil { errChan <- err } else { @@ -141,7 +143,7 @@ func (p *MNSQueue) BatchReceiveMessage(respChan chan BatchMessageReceiveResponse for { resp := BatchMessageReceiveResponse{} - _, err := p.client.Send(_GET, nil, nil, resource, &resp) + _, err := send(p.client, p.decoder, GET, nil, nil, resource, &resp) if err != nil { errChan <- err } else { @@ -165,7 +167,7 @@ func (p *MNSQueue) BatchReceiveMessage(respChan chan BatchMessageReceiveResponse func (p *MNSQueue) PeekMessage(respChan chan MessageReceiveResponse, errChan chan error) { for { resp := MessageReceiveResponse{} - _, err := p.client.Send(_GET, nil, nil, fmt.Sprintf("queues/%s/%s?peekonly=true", p.name, "messages"), &resp) + _, err := send(p.client, p.decoder, GET, nil, nil, fmt.Sprintf("queues/%s/%s?peekonly=true", p.name, "messages"), &resp) if err != nil { errChan <- err } else { @@ -184,7 +186,7 @@ func (p *MNSQueue) BatchPeekMessage(respChan chan BatchMessageReceiveResponse, e for { resp := BatchMessageReceiveResponse{} - _, err := p.client.Send(_GET, nil, nil, fmt.Sprintf("queues/%s/%s?numOfMessages=%d&peekonly=true", p.name, "messages", numOfMessages), &resp) + _, err := send(p.client, p.decoder, GET, nil, nil, fmt.Sprintf("queues/%s/%s?numOfMessages=%d&peekonly=true", p.name, "messages", numOfMessages), &resp) if err != nil { errChan <- err } else { @@ -198,7 +200,7 @@ func (p *MNSQueue) BatchPeekMessage(respChan chan BatchMessageReceiveResponse, e func (p *MNSQueue) DeleteMessage(receiptHandle string) (err error) { p.checkQPS() - _, err = p.client.Send(_DELETE, nil, nil, fmt.Sprintf("queues/%s/%s?ReceiptHandle=%s", p.name, "messages", receiptHandle), nil) + _, err = send(p.client, p.decoder, DELETE, nil, nil, fmt.Sprintf("queues/%s/%s?ReceiptHandle=%s", p.name, "messages", receiptHandle), nil) return } @@ -214,13 +216,13 @@ func (p *MNSQueue) BatchDeleteMessage(receiptHandles ...string) (err error) { } p.checkQPS() - _, err = p.client.Send(_DELETE, nil, handlers, fmt.Sprintf("queues/%s/%s", p.name, "messages"), nil) + _, err = send(p.client, p.decoder, DELETE, nil, handlers, fmt.Sprintf("queues/%s/%s", p.name, "messages"), nil) return } func (p *MNSQueue) ChangeMessageVisibility(receiptHandle string, visibilityTimeout int64) (resp MessageVisibilityChangeResponse, err error) { p.checkQPS() - _, err = p.client.Send(_PUT, nil, nil, fmt.Sprintf("queues/%s/%s?ReceiptHandle=%s&VisibilityTimeout=%d", p.name, "messages", receiptHandle, visibilityTimeout), &resp) + _, err = send(p.client, p.decoder, PUT, nil, nil, fmt.Sprintf("queues/%s/%s?ReceiptHandle=%s&VisibilityTimeout=%d", p.name, "messages", receiptHandle, visibilityTimeout), &resp) return } diff --git a/queue_manager.go b/queue_manager.go index f982ab0..c427dc1 100644 --- a/queue_manager.go +++ b/queue_manager.go @@ -31,6 +31,8 @@ type MNSQueueManager struct { credential Credential accessKeyId string accessKeySecret string + + decoder MNSDecoder } func checkQueueName(queueName string) (err error) { @@ -86,6 +88,7 @@ func NewMNSQueueManager(ownerId, accessKeyId, accessKeySecret string) AliQueueMa ownerId: ownerId, accessKeyId: accessKeyId, accessKeySecret: accessKeySecret, + decoder: new(AliMNSDecoder), } } @@ -136,7 +139,7 @@ func (p *MNSQueueManager) CreateQueue(location MNSLocation, queueName string, de cli := NewAliMNSClient(url, p.accessKeyId, p.accessKeySecret) var code int - code, err = cli.Send(_PUT, nil, &message, "queues/"+queueName, nil) + code, err = send(cli, p.decoder, PUT, nil, &message, "queues/"+queueName, nil) if code == http.StatusNoContent { err = ERR_MNS_QUEUE_ALREADY_EXIST_AND_HAVE_SAME_ATTR.New(errors.Params{"name": queueName}) @@ -173,7 +176,7 @@ func (p *MNSQueueManager) SetQueueAttributes(location MNSLocation, queueName str cli := NewAliMNSClient(url, p.accessKeyId, p.accessKeySecret) - _, err = cli.Send(_PUT, nil, &message, fmt.Sprintf("queues/%s?metaoverride=true", queueName), nil) + _, err = send(cli, p.decoder, PUT, nil, &message, fmt.Sprintf("queues/%s?metaoverride=true", queueName), nil) return } @@ -188,7 +191,7 @@ func (p *MNSQueueManager) GetQueueAttributes(location MNSLocation, queueName str cli := NewAliMNSClient(url, p.accessKeyId, p.accessKeySecret) - _, err = cli.Send(_GET, nil, nil, "queues/"+queueName, &attr) + _, err = send(cli, p.decoder, GET, nil, nil, "queues/"+queueName, &attr) return } @@ -204,7 +207,7 @@ func (p *MNSQueueManager) DeleteQueue(location MNSLocation, queueName string) (e cli := NewAliMNSClient(url, p.accessKeyId, p.accessKeySecret) - _, err = cli.Send(_DELETE, nil, nil, "queues/"+queueName, nil) + _, err = send(cli, p.decoder, DELETE, nil, nil, "queues/"+queueName, nil) return } @@ -239,7 +242,7 @@ func (p *MNSQueueManager) ListQueue(location MNSLocation, nextMarker Base64Bytes header["x-mns-prefix"] = prefix } - _, err = cli.Send(_GET, header, nil, "queues", &queues) + _, err = send(cli, p.decoder, GET, header, nil, "queues", &queues) return } diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..44cf047 --- /dev/null +++ b/utils.go @@ -0,0 +1,41 @@ +package ali_mns + +import ( + "net/http" + + "github.com/gogap/errors" +) + +func send(client MNSClient, decoder MNSDecoder, method Method, headers map[string]string, message interface{}, resource string, v interface{}) (statusCode int, err error) { + var resp *http.Response + if resp, err = client.Send(method, headers, message, resource); err != nil { + return + } + + if resp != nil { + defer resp.Body.Close() + statusCode = resp.StatusCode + + if resp.StatusCode != http.StatusCreated && + resp.StatusCode != http.StatusOK && + resp.StatusCode != http.StatusNoContent { + + errResp := ErrorMessageResponse{} + if e := decoder.Decode(resp.Body, &errResp); e != nil { + err = ERR_UNMARSHAL_ERROR_RESPONSE_FAILED.New(errors.Params{"err": e}) + return + } + err = to_error(errResp, resource) + return + } + + if v != nil { + if e := decoder.Decode(resp.Body, v); e != nil { + err = ERR_UNMARSHAL_RESPONSE_FAILED.New(errors.Params{"err": e}) + return + } + } + } + + return +}