Skip to content

Commit

Permalink
api: support queue 1.2.0
Browse files Browse the repository at this point in the history
* bump queue package version to 1.2.0 [1]
* add Task.Touch(): increases TTR and/or TTL for tasks [2]
* add Queue.Cfg(): set queue settings [3]
* add Queue.ReleaseAll(): releases all taken tasks [4]
* add Queue.State(): returns a current queue state [5]
* add Queue.Identify(): identifies a shared session [6]

1. https://github.com/tarantool/queue/releases/tag/1.2.0
2. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L562-L576
3. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L450-L463
4. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L698-L704
5. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L377-L391
6. https://github.com/tarantool/queue/blob/1.2.0/README.md?plain=1#L465-L494

Closes #110
Closes #177
  • Loading branch information
oleg-jukovec committed Aug 24, 2022
1 parent 8f16c97 commit bda4442
Show file tree
Hide file tree
Showing 7 changed files with 423 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.

### Added

- Support queue 1.2.0 (#177)

### Changed

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ clean:

.PHONY: deps
deps: clean
( cd ./queue; tarantoolctl rocks install queue 1.1.0 )
( cd ./queue; tarantoolctl rocks install queue 1.2.0 )

.PHONY: datetime-timezones
datetime-timezones:
Expand Down
5 changes: 5 additions & 0 deletions queue/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ box.cfg{

box.once("init", function()
box.schema.user.create('test', {password = 'test'})
box.schema.func.create('queue.tube.test_queue:touch')
box.schema.func.create('queue.tube.test_queue:ack')
box.schema.func.create('queue.tube.test_queue:put')
box.schema.func.create('queue.tube.test_queue:drop')
Expand All @@ -17,7 +18,10 @@ box.cfg{
box.schema.func.create('queue.tube.test_queue:take')
box.schema.func.create('queue.tube.test_queue:delete')
box.schema.func.create('queue.tube.test_queue:release')
box.schema.func.create('queue.tube.test_queue:release_all')
box.schema.func.create('queue.tube.test_queue:bury')
box.schema.func.create('queue.identify')
box.schema.func.create('queue.state')
box.schema.func.create('queue.statistics')
box.schema.user.grant('test', 'create', 'space')
box.schema.user.grant('test', 'write', 'space', '_schema')
Expand All @@ -33,6 +37,7 @@ box.cfg{
box.schema.user.grant('test', 'read,write', 'space', '_queue_consumers')
box.schema.user.grant('test', 'read,write', 'space', '_priv')
box.schema.user.grant('test', 'read,write', 'space', '_queue_taken_2')
box.schema.user.grant('test', 'read,write', 'space', '_queue_shared_sessions')
if box.space._trigger ~= nil then
box.schema.user.grant('test', 'read', 'space', '_trigger')
end
Expand Down
19 changes: 19 additions & 0 deletions queue/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,22 @@ const (
UTUBE queueType = "utube"
UTUBE_TTL queueType = "utubettl"
)

type State int

const (
UnknownState State = iota
InitState
StartupState
RunningState
EndingState
WaitingState
)

var strToState = map[string]State{
"INIT": InitState,
"STARTUP": StartupState,
"RUNNING": RunningState,
"ENDING": EndingState,
"WAITING": WaitingState,
}
105 changes: 105 additions & 0 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,23 @@ import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/tarantool/go-tarantool"
)

// Queue is a handle to Tarantool queue's tube.
type Queue interface {
// Set queue settings.
Cfg(opts CfgOpts) error
// Exists checks tube for existence.
// Note: it uses Eval, so user needs 'execute universe' privilege.
Exists() (bool, error)
// Identify to a shared session.
// In the queue the session has a unique UUID and many connections may
// share one logical session. Also, the consumer can reconnect to the
// existing session during the ttr time.
// To get the UUID of the current session, call the Queue.Identify(nil).
Identify(u *uuid.UUID) (uuid.UUID, error)
// Create creates new tube with configuration.
// Note: it uses Eval, so user needs 'execute universe' privilege
// Note: you'd better not use this function in your application, cause it is
Expand All @@ -29,6 +38,8 @@ type Queue interface {
// Note: you'd better not use this function in your application, cause it is
// administrative task to create or delete queue.
Drop() error
// ReleaseAll forcibly returns all taken tasks to a ready state.
ReleaseAll() error
// Put creates new task in a tube.
Put(data interface{}) (*Task, error)
// PutWithOpts creates new task with options different from tube's defaults.
Expand Down Expand Up @@ -64,6 +75,8 @@ type Queue interface {
Kick(count uint64) (uint64, error)
// Delete the task identified by its id.
Delete(taskId uint64) error
// State returns a current queue state.
State() (State, error)
// Statistic returns some statistic about queue.
Statistic() (interface{}, error)
}
Expand All @@ -79,11 +92,16 @@ type cmd struct {
take string
drop string
peek string
touch string
ack string
delete string
bury string
kick string
release string
releaseAll string
cfg string
identify string
state string
statistics string
}

Expand All @@ -110,6 +128,26 @@ func (cfg Cfg) getType() string {
return kind
}

// CfgOpts is argument type for the Queue.Cfg() call.
type CfgOpts struct {
// Enable replication mode. Must be true if the queue is used in master and
// replica mode. With replication mode enabled, the potential loss of
// performance can be ~20% compared to single mode. Default value is false.
InReplicaset bool
// Time to release in seconds. The time after which, if there is no active
// connection in the session, it will be released with all its tasks.
Ttr time.Duration
}

func (opts CfgOpts) toMap() map[string]interface{} {
ret := make(map[string]interface{})
ret["in_replicaset"] = opts.InReplicaset
if opts.Ttr != 0 {
ret["ttr"] = opts.Ttr
}
return ret
}

type Opts struct {
Pri int // Task priorities.
Ttl time.Duration // Task time to live.
Expand Down Expand Up @@ -161,6 +199,12 @@ func (q *queue) Create(cfg Cfg) error {
return err
}

// Set queue settings.
func (q *queue) Cfg(opts CfgOpts) error {
_, err := q.conn.Call17(q.cmds.cfg, []interface{}{opts.toMap()})
return err
}

// Exists checks existance of a tube.
func (q *queue) Exists() (bool, error) {
cmd := "local name = ... ; return queue.tube[name] ~= nil"
Expand All @@ -173,6 +217,36 @@ func (q *queue) Exists() (bool, error) {
return exist, nil
}

// Identify to a shared session.
// In the queue the session has a unique UUID and many connections may share
// one logical session. Also, the consumer can reconnect to the existing
// session during the ttr time.
// To get the UUID of the current session, call the Queue.Identify(nil).
func (q *queue) Identify(u *uuid.UUID) (uuid.UUID, error) {
// Unfortunately we can't use go-tarantool/uuid here:
// https://github.com/tarantool/queue/issues/182
var args []interface{}
if u == nil {
args = []interface{}{}
} else {
if bytes, err := u.MarshalBinary(); err != nil {
return uuid.UUID{}, err
} else {
args = []interface{}{bytes}
}
}

if resp, err := q.conn.Call17(q.cmds.identify, args); err == nil {
if us, ok := resp.Data[0].(string); ok {
return uuid.FromBytes([]byte(us))
} else {
return uuid.UUID{}, fmt.Errorf("unexpected response: %v", resp.Data)
}
} else {
return uuid.UUID{}, err
}
}

// Put data to queue. Returns task.
func (q *queue) Put(data interface{}) (*Task, error) {
return q.put(data)
Expand Down Expand Up @@ -251,6 +325,12 @@ func (q *queue) Drop() error {
return err
}

// ReleaseAll forcibly returns all taken tasks to a ready state.
func (q *queue) ReleaseAll() error {
_, err := q.conn.Call17(q.cmds.releaseAll, []interface{}{})
return err
}

// Look at a task without changing its state.
func (q *queue) Peek(taskId uint64) (*Task, error) {
qd := queueData{q: q}
Expand All @@ -260,6 +340,10 @@ func (q *queue) Peek(taskId uint64) (*Task, error) {
return qd.task, nil
}

func (q *queue) _touch(taskId uint64, increment time.Duration) (string, error) {
return q.produce(q.cmds.touch, taskId, increment.Seconds())
}

func (q *queue) _ack(taskId uint64) (string, error) {
return q.produce(q.cmds.ack, taskId)
}
Expand Down Expand Up @@ -312,6 +396,22 @@ func (q *queue) Delete(taskId uint64) error {
return err
}

// State returns a current queue state.
func (q *queue) State() (State, error) {
resp, err := q.conn.Call17(q.cmds.state, []interface{}{})
if err != nil {
return UnknownState, err
}

if respState, ok := resp.Data[0].(string); ok {
if state, ok := strToState[respState]; ok {
return state, nil
}
return UnknownState, fmt.Errorf("unknown state: %v", resp.Data[0])
}
return UnknownState, fmt.Errorf("unexpected response: %v", resp.Data)
}

// Return the number of tasks in a queue broken down by task_state, and the
// number of requests broken down by the type of request.
func (q *queue) Statistic() (interface{}, error) {
Expand All @@ -333,11 +433,16 @@ func makeCmd(q *queue) {
take: "queue.tube." + q.name + ":take",
drop: "queue.tube." + q.name + ":drop",
peek: "queue.tube." + q.name + ":peek",
touch: "queue.tube." + q.name + ":touch",
ack: "queue.tube." + q.name + ":ack",
delete: "queue.tube." + q.name + ":delete",
bury: "queue.tube." + q.name + ":bury",
kick: "queue.tube." + q.name + ":kick",
release: "queue.tube." + q.name + ":release",
releaseAll: "queue.tube." + q.name + ":release_all",
cfg: "queue.cfg",
identify: "queue.identify",
state: "queue.state",
statistics: "queue.statistics",
}
}
Expand Down
Loading

0 comments on commit bda4442

Please sign in to comment.