Skip to content

Commit

Permalink
Add Streams support
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal committed Dec 10, 2018
1 parent 206f50c commit 3a16a0e
Show file tree
Hide file tree
Showing 6 changed files with 1,122 additions and 186 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ matrix:
env: DRIVER=ruby REDIS_BRANCH=3.2 LOW_TIMEOUT=0.3
- rvm: jruby-9.1.17.0
env: DRIVER=ruby REDIS_BRANCH=4.0 LOW_TIMEOUT=0.3
- rvm: 2.5.3
env: DRIVER=ruby REDIS_BRANCH=5.0

notifications:
irc:
Expand Down
368 changes: 368 additions & 0 deletions lib/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2808,6 +2808,322 @@ def geodist(key, member1, member2, unit = 'm')
end
end

# Returns the stream information each subcommand.
#
# @example stream
# redis.xinfo(:stream, 'mystream')
# @example groups
# redis.xinfo(:groups, 'mystream')
# @example consumers
# redis.xinfo(:consumers, 'mystream', 'mygroup')
#
# @param subcommand [String] e.g. `stream` `groups` `consumers`
# @param key [String] the stream key
# @param group [String] the consumer group name, required if subcommand is `consumers`
#
# @return [Hash] information of the stream if subcommand is `stream`
# @return [Array<Hash>] information of the consumer groups if subcommand is `groups`
# @return [Array<Hash>] information of the consumers if subcommand is `consumers`
def xinfo(subcommand, key, group = nil)
args = [:xinfo, subcommand, key, group].compact
synchronize do |client|
client.call(args) do |reply|
case subcommand.to_s.downcase
when 'stream' then Hashify.call(reply)
when 'groups', 'consumers' then reply.map { |arr| Hashify.call(arr) }
else reply
end
end
end
end

# Add new entry to the stream.
#
# @example Without options
# redis.xadd('mystream', f1: 'v1', f2: 'v2')
# @example With options
# redis.xadd('mystream', { f1: 'v1', f2: 'v2' }, id: '0-0', maxlen: 1000, approximate: true)
#
# @param key [String] the stream key
# @param entry [Hash] one or multiple field-value pairs
# @param opts [Hash] several options for `XADD` command
#
# @option opts [String] :id the entry id, default value is `*`, it means auto generation
# @option opts [Integer] :maxlen max length of entries
# @option opts [Boolean] :approximate whether to add `~` modifier of maxlen or not
#
# @return [String] the entry id
def xadd(key, entry, opts = {})
args = [:xadd, key]
args.concat(['MAXLEN', (opts[:approximate] ? '~' : nil), opts[:maxlen]].compact) if opts[:maxlen]
args << (opts[:id] || '*')
args.concat(entry.to_a.flatten)
synchronize { |client| client.call(args) }
end

# Trims older entries of the stream if needed.
#
# @example Without options
# redis.xtrim('mystream', 1000)
# @example With options
# redis.xtrim('mystream', 1000, approximate: true)
#
# @param key [String] the stream key
# @param mexlen [Integer] max length of entries
# @param approximate [Boolean] whether to add `~` modifier of maxlen or not
#
# @return [Integer] the number of entries actually deleted
def xtrim(key, maxlen, approximate: false)
args = [:xtrim, key, 'MAXLEN', (approximate ? '~' : nil), maxlen].compact
synchronize { |client| client.call(args) }
end

# Delete entries by entry ids.
#
# @example With splatted entry ids
# redis.xdel('mystream', '0-1', '0-2')
# @example With arrayed entry ids
# redis.xdel('mystream', ['0-1', '0-2'])
#
# @param key [String] the stream key
# @param ids [Array<String>] one or multiple entry ids
#
# @return [Integer] the number of entries actually deleted
def xdel(key, *ids)
args = [:xdel, key].concat(ids.flatten)
synchronize { |client| client.call(args) }
end

# Fetches entries of the stream.
#
# @example Without options
# redis.xrange('mystream')
# @example With first entry id option
# redis.xrange('mystream', first: '0-1')
# @example With first and last entry id options
# redis.xrange('mystream', first: '0-1', last: '0-3')
# @example With count options
# redis.xrange('mystream', count: 10)
#
# @param key [String] the stream key
# @param first [String] first entry id of range, default value is `-`
# @param last [String] last entry id of range, default value is `+`
# @param count [Integer] the number of entries as limit
#
# @return [Hash{String => Hash}] the entries
def xrange(key, first: '-', last: '+', count: nil)
args = [:xrange, key, first, last]
args.concat(['COUNT', count]) if count
synchronize { |client| client.call(args, &HashifyStreamEntries) }
end

# Fetches entries of the stream in descending order.
#
# @example Without options
# redis.xrevrange('mystream')
# @example With first entry id option
# redis.xrevrange('mystream', first: '0-1')
# @example With first and last entry id options
# redis.xrevrange('mystream', first: '0-1', last: '0-3')
# @example With count options
# redis.xrevrange('mystream', count: 10)
#
# @param key [String] the stream key
# @param first [String] first entry id of range, default value is `-`
# @param last [String] last entry id of range, default value is `+`
# @param count [Integer] the number of entries as limit
#
# @return [Hash{String => Hash}] the entries
def xrevrange(key, first: '-', last: '+', count: nil)
args = [:xrevrange, key, last, first]
args.concat(['COUNT', count]) if count
synchronize { |client| client.call(args, &HashifyStreamEntries) }
end

# Returns the number of entries inside a stream.
#
# @example With key
# redis.xlen('mystream')
#
# @param key [String] the stream key
#
# @return [Integer] the number of entries
def xlen(key)
synchronize { |client| client.call([:xlen, key]) }
end

# Fetches entries from one or multiple streams. Optionally blocking.
#
# @example With a key
# redis.xread('mystream', '0-0')
# @example With multiple keys
# redis.xread(%w[mystream1 mystream2], %w[0-0 0-0])
# @example With count option
# redis.xread('mystream', '0-0', count: 2)
# @example With block option
# redis.xread('mystream', '$', block: 1000)
#
# @param keys [Array<String>] one or multiple stream keys
# @param ids [Array<String>] one or multiple entry ids
# @param count [Integer] the number of entries as limit per stream
# @param block [Integer] the number of milliseconds as blocking timeout
#
# @return [Hash{String => Hash{String => Hash}}] the entries
def xread(keys, ids, count: nil, block: nil)
args = [:xread]
args.concat(['COUNT', count]) if count
args.concat(['BLOCK', block.to_i]) if block
_xread(args, keys, ids, block)
end

# Manages the consumer group of the stream.
#
# @example With `create` subcommand
# redis.xgroup(:create, 'mystream', 'mygroup', '$')
# @example With `setid` subcommand
# redis.xgroup(:setid, 'mystream', 'mygroup', '$')
# @example With `destroy` subcommand
# redis.xgroup(:destroy, 'mystream', 'mygroup')
# @example With `delconsumer` subcommand
# redis.xgroup(:delconsumer, 'mystream', 'mygroup', 'consumer1')
#
# @param subcommand [String] `create` `setid` `destroy` `delconsumer`
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param id_or_consumer [String]
# * the entry id or `$`, required if subcommand is `create` or `setid`
# * the consumer name, required if subcommand is `delconsumer`
# @param mkstream [Boolean] whether to create an empty stream automatically or not
#
# @return [String] `OK` if subcommand is `create` or `setid`
# @return [Integer] effected count if subcommand is `destroy` or `delconsumer`
def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)
args = [:xgroup, subcommand, key, group, id_or_consumer, (mkstream ? 'MKSTREAM' : nil)].compact
synchronize { |client| client.call(args) }
end

# Fetches a subset of the entries from one or multiple streams related with the consumer group.
# Optionally blocking.
#
# @example With a key
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>')
# @example With multiple keys
# redis.xreadgroup('mygroup', 'consumer1', %w[mystream1 mystream2], %w[> >])
# @example With count option
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', count: 2)
# @example With block option
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', block: 1000)
# @example With noack option
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', noack: true)
#
# @param group [String] the consumer group name
# @param consumer [String] the consumer name
# @param keys [Array<String>] one or multiple stream keys
# @param ids [Array<String>] one or multiple entry ids
# @param opts [Hash] several options for `XREADGROUP` command
#
# @option opts [Integer] :count the number of entries as limit
# @option opts [Integer] :block the number of milliseconds as blocking timeout
# @option opts [Boolean] :noack whether message loss is acceptable or not
#
# @return [Hash{String => Hash{String => Hash}}] the entries
def xreadgroup(group, consumer, keys, ids, opts = {})
args = [:xreadgroup, 'GROUP', group, consumer]
args.concat(['COUNT', opts[:count]]) if opts[:count]
args.concat(['BLOCK', opts[:block].to_i]) if opts[:block]
args << 'NOACK' if opts[:noack]
_xread(args, keys, ids, opts[:block])
end

# Removes one or multiple entries from the pending entries list of a stream consumer group.
#
# @example With a entry id
# redis.xack('mystream', 'mygroup', '1526569495631-0')
# @example With splatted entry ids
# redis.xack('mystream', 'mygroup', '0-1', '0-2')
# @example With arrayed entry ids
# redis.xack('mystream', 'mygroup', %w[0-1 0-2])
#
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param ids [Array<String>] one or multiple entry ids
#
# @return [Integer] the number of entries successfully acknowledged
def xack(key, group, *ids)
args = [:xack, key, group].concat(ids.flatten)
synchronize { |client| client.call(args) }
end

# Changes the ownership of a pending entry
#
# @example With splatted entry ids
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-1', '0-2')
# @example With arrayed entry ids
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2])
# @example With idle option
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], idle: 1000)
# @example With time option
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], time: 1542866959000)
# @example With retrycount option
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], retrycount: 10)
# @example With force option
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], force: true)
# @example With justid option
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], justid: true)
#
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param consumer [String] the consumer name
# @param min_idle_time [Integer] the number of milliseconds
# @param ids [Array<String>] one or multiple entry ids
# @param opts [Hash] several options for `XCLAIM` command
#
# @option opts [Integer] :idle the number of milliseconds as last time it was delivered of the entry
# @option opts [Integer] :time the number of milliseconds as a specific Unix Epoch time
# @option opts [Integer] :retrycount the number of retry counter
# @option opts [Boolean] :force whether to create the pending entry to the pending entries list or not
# @option opts [Boolean] :justid whether to fetch just an array of entry ids or not
#
# @return [Hash{String => Hash}] the entries successfully claimed
# @return [Array<String>] the entry ids successfully claimed if justid option is `true`
def xclaim(key, group, consumer, min_idle_time, *ids, **opts)
args = [:xclaim, key, group, consumer, min_idle_time].concat(ids.flatten)
args.concat(['IDLE', opts[:idle].to_i]) if opts[:idle]
args.concat(['TIME', opts[:time].to_i]) if opts[:time]
args.concat(['RETRYCOUNT', opts[:retrycount]]) if opts[:retrycount]
args << 'FORCE' if opts[:force]
args << 'JUSTID' if opts[:justid]
blk = opts[:justid] ? Noop : HashifyStreamEntries
synchronize { |client| client.call(args, &blk) }
end

# Fetches not acknowledging pending entries
#
# @example With key and group
# redis.xpending('mystream', 'mygroup')
# @example With range options
# redis.xpending('mystream', 'mygroup', first: '-', last: '+', count: 10)
# @example With range and consumer options
# redis.xpending('mystream', 'mygroup', 'consumer1', first: '-', last: '+', count: 10)
#
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param consumer [String] the consumer name
# @param opts [Hash] several options for `XPENDING` command
#
# @option opts [String] :first first entry id of range
# @option opts [String] :last last entry id of range
# @option opts [Integer] :count the number of entries as limit
#
# @return [Hash] the summary of pending entries
# @return [Array<Hash>] the pending entries details if options were specified
def xpending(key, group, consumer = nil, **opts)
args = [:xpending, key, group, opts[:first], opts[:last], opts[:count], consumer].compact
summary_needed = consumer.nil? && opts.empty?
blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails
synchronize { |client| client.call(args, &blk) }
end

# Interact with the sentinel command (masters, master, slaves, failover)
#
# @param [String] subcommand e.g. `masters`, `master`, `slaves`
Expand Down Expand Up @@ -2953,6 +3269,43 @@ def method_missing(command, *args)
end.compact]
}

HashifyStreams =
lambda { |reply|
return {} if reply.nil?
reply.map do |stream_key, entries|
[stream_key, HashifyStreamEntries.call(entries)]
end.to_h
}

HashifyStreamEntries =
lambda { |reply|
reply.map do |entry_id, values|
[entry_id, values.each_slice(2).to_h]
end.to_h
}

HashifyStreamPendings =
lambda { |reply|
{
'size' => reply[0],
'min_entry_id' => reply[1],
'max_entry_id' => reply[2],
'consumers' => reply[3].nil? ? {} : Hash[reply[3]]
}
}

HashifyStreamPendingDetails =
lambda { |reply|
reply.map do |arr|
{
'entry_id' => arr[0],
'consumer' => arr[1],
'elapsed' => arr[2],
'count' => arr[3]
}
end
}

HashifyClusterNodeInfo =
lambda { |str|
arr = str.split(' ')
Expand Down Expand Up @@ -3018,6 +3371,21 @@ def _subscription(method, timeout, channels, block)
@client = original
end
end

def _xread(args, keys, ids, blocking_timeout_msec)
keys = keys.is_a?(Array) ? keys : [keys]
ids = ids.is_a?(Array) ? ids : [ids]
args.concat(['STREAMS'], keys, ids)

synchronize do |client|
if blocking_timeout_msec.nil?
client.call(args, &HashifyStreams)
else
timeout = client.timeout.to_f + blocking_timeout_msec.to_f / 1000.0
client.call_with_timeout(args, timeout, &HashifyStreams)
end
end
end
end

require_relative "redis/version"
Expand Down
Loading

0 comments on commit 3a16a0e

Please sign in to comment.