Skip to content

Commit

Permalink
Add Streams support
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal committed Nov 23, 2018
1 parent 476a34a commit 2f9bdff
Show file tree
Hide file tree
Showing 6 changed files with 1,110 additions and 186 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ matrix:
env: DRIVER=ruby REDIS_BRANCH=3.2 LOW_TIMEOUT=0.3
- rvm: jruby-9.1.17.0
env: DRIVER=ruby REDIS_BRANCH=4.0 LOW_TIMEOUT=0.3
- rvm: 2.5.3
env: DRIVER=ruby REDIS_BRANCH=5.0

notifications:
irc:
Expand Down
363 changes: 363 additions & 0 deletions lib/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2803,6 +2803,321 @@ def geodist(key, member1, member2, unit = 'm')
end
end

# Returns the stream information each subcommand.
#
# @example consumers
# redis.xinfo(:consumers, 'mystream', 'mygroup')
# @example groups
# redis.xinfo(:groups, 'mystream')
# @example stream
# redis.xinfo(:stream, 'mystream')
#
# @param subcommand [String] e.g. `consumers` `groups` `stream`
# @param key [String] the stream key
# @param group [String] the consumer group name, required if subcommand is `consumers`
#
# @return [Array<String>] the consumer names if subcommand is `consumers`
# @return [Array<Hash>] information of the consumer groups if subcommand is `groups`
# @return [Hash] information of the stream if subcommand is `stream`
def xinfo(subcommand, key, group = nil)
args = [:xinfo, subcommand, key, group].compact
synchronize do |client|
client.call(args) do |reply|
case subcommand.to_s.downcase
when 'stream' then Hashify.call(reply)
when 'groups', 'consumers' then reply.map { |arr| Hashify.call(arr) }
else reply
end
end
end
end

# Add new entry to the stream.
#
# @example Without options
# redis.xadd('mystream', f1: 'v1', f2: 'v2')
# @example With options
# redis.xadd('mystream', { f1: 'v1', f2: 'v2' }, id: '0-0', maxlen: 1000, modifier: true)
#
# @param key [String] the stream key
# @param entry [Hash] one or multiple field-value pairs
# @param opts [Hash] several options for `XADD` command
#
# @option opts [String] :id the entry id, default value is `*`, it means auto generation
# @option opts [Integer] :maxlen max length of entries
# @option opts [Boolean] :modifier true is `~`
#
# @return [String] the entry id
def xadd(key, entry, opts = {})
args = [:xadd, key]
args.concat(['MAXLEN', (opts[:modifier] ? '~' : nil), opts[:maxlen]]) if opts[:maxlen]
args.concat([(opts[:id] || '*')], entry.to_a.flatten)
synchronize { |client| client.call(args) }
end

# Trims older entries of the stream if needed.
#
# @example Without options
# redis.xtrim('mystream', 1000)
# @example With options
# redis.xtrim('mystream', 1000, modifier: true)
#
# @param key [String] the stream key
# @param mexlen [Integer] max length of entries
# @param modifier [Boolean] `true` is `~`, default value is `false`
#
# @return [Integer] the number of entries actually deleted
def xtrim(key, maxlen, modifier: false)
args = [:xtrim, key, 'MAXLEN', (modifier ? '~' : nil), maxlen].compact
synchronize { |client| client.call(args) }
end

# Delete entries by entry ids.
#
# @example With splatted entry ids
# redis.xdel('mystream', '0-1', '0-2')
# @example With arrayed entry ids
# redis.xdel('mystream', ['0-1', '0-2'])
#
# @param key [String] the stream key
# @param ids [Array<String>] one or multiple entry ids
#
# @return [Integer] the number of entries actually deleted
def xdel(key, *ids)
args = [:xdel, key].concat(ids.flatten)
synchronize { |client| client.call(args) }
end

# Fetches entries of the stream.
#
# @example Without options
# redis.xrange('mystream')
# @example With first entry id option
# redis.xrange('mystream', first: '0-1')
# @example With first and last entry id options
# redis.xrange('mystream', first: '0-1', last: '0-3')
# @example With count options
# redis.xrange('mystream', count: 10)
#
# @param key [String] the stream key
# @param first [String] first entry id of range, default value is `-`
# @param last [String] last entry id of range, default value is `+`
# @param count [Integer] the number of entries as limit
#
# @return [Hash{String => Hash}] the entries
def xrange(key, first: '-', last: '+', count: nil)
args = [:xrange, key, first, last]
args.concat(['COUNT', count]) unless count.nil?
synchronize { |client| client.call(args, &HashifyStreamEntries) }
end

# Fetches entries of the stream in descending order.
#
# @example Without options
# redis.xrevrange('mystream')
# @example With first entry id option
# redis.xrevrange('mystream', first: '0-1')
# @example With first and last entry id options
# redis.xrevrange('mystream', first: '0-1', last: '0-3')
# @example With count options
# redis.xrevrange('mystream', count: 10)
#
# @param key [String] the stream key
# @param first [String] first entry id of range, default value is `-`
# @param last [String] last entry id of range, default value is `+`
# @param count [Integer] the number of entries as limit
#
# @return [Hash{String => Hash}] the entries
def xrevrange(key, first: '-', last: '+', count: nil)
args = [:xrevrange, key, last, first]
args.concat(['COUNT', count]) unless count.nil?
synchronize { |client| client.call(args, &HashifyStreamEntries) }
end

# Returns the number of entries inside a stream.
#
# @example With key
# redis.xlen('mystream')
#
# @param key [String] the stream key
#
# @return [Integer] the number of entries
def xlen(key)
synchronize { |client| client.call([:xlen, key]) }
end

# Fetches entries from one or multiple streams. Optionally blocking.
#
# @example With a key
# redis.xread('mystream', '0-0')
# @example With multiple keys
# redis.xread(%w[mystream1 mystream2], %w[0-0 0-0])
# @example With count option
# redis.xread('mystream', '0-0', count: 2)
# @example With block option
# redis.xread('mystream', '$', block: 1000)
#
# @param keys [Array<String>, String] one or multiple stream keys
# @param ids [Array<String>, String] one or multiple entry ids
# @param count [Integer] the number of entries as limit per stream
# @param block [Integer] the number of milliseconds as blocking timeout
#
# @return [Hash{String => Hash{String => Hash}}] the entries
def xread(keys, ids, count: nil, block: nil)
args = [:xread]
args.concat(['COUNT', count]) unless count.nil?
args.concat(['BLOCK', block.to_i]) unless block.nil?
keys = keys.is_a?(Array) ? keys : [keys]
ids = ids.is_a?(Array) ? ids : [ids]
args.concat(['STREAMS'], keys, ids)
_xread(args, block)
end

# Manages the consumer group of the stream.
#
# @example With `create` subcommand
# redis.xgroup(:create, 'mystream', 'mygroup', '$')
# @example With `setid` subcommand
# redis.xgroup(:setid, 'mystream', 'mygroup', '$')
# @example With `destroy` subcommand
# redis.xgroup(:destroy, 'mystream', 'mygroup')
# @example With `delconsumer` subcommand
# redis.xgroup(:delconsumer, 'mystream', 'mygroup', 'consumer1')
#
# @param subcommand [String] `create` `setid` `destroy` `delconsumer`
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param id_or_consumer [String]
# * the entry id or `$`, required if subcommand is `create` or `setid`
# * the consumer name, required if subcommand is `delconsumer`
# @param mkstream [Boolean] specify `true` if you'd like to create an empty stream automatically
#
# @return [String] `OK` if subcommand is `create` or `setid`
# @return [Integer] effected count if subcommand is `destroy` or `delconsumer`
def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)
args = [:xgroup, subcommand, key, group, id_or_consumer, (mkstream ? 'MKSTREAM' : nil)].compact
synchronize { |client| client.call(args) }
end

# Fetches a subset of the entries from one or multiple streams related with the consumer group.
# Optionally blocking.
#
# @example With a key
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>')
# @example With multiple keys
# redis.xreadgroup('mygroup', 'consumer1', %w[mystream1 mystream2], %w[> >])
# @example With count option
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', count: 2)
# @example With block option
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', block: 1000)
# @example With noack option
# redis.xreadgroup('mygroup', 'consumer1', 'mystream', '>', noack: true)
#
# @param group [String] the consumer group name
# @param consumer [String] the consumer name
# @param keys [Array<String>, String] one or multiple stream keys
# @param ids [Array<String>, String] one or multiple entry ids
# @param count [Integer] the number of entries as limit
# @param block [Integer] the number of milliseconds as blocking timeout
# @param noack [Boolean] specify `true` if message loss is acceptable
#
# @return [Hash{String => Hash{String => Hash}}] the entries
def xreadgroup(group, consumer, keys, ids, count: nil, block: nil, noack: false)
args = [:xreadgroup, 'GROUP', group, consumer]
args.concat(['COUNT', count]) unless count.nil?
args.concat(['BLOCK', block.to_i]) unless block.nil?
args.concat(['NOACK']) if noack
keys = keys.is_a?(Array) ? keys : [keys]
ids = ids.is_a?(Array) ? ids : [ids]
args.concat(['STREAMS'], keys, ids)
_xread(args, block)
end

# Removes one or multiple entries from the pending entries list of a stream consumer group.
#
# @example With a entry id
# redis.xack('mystream', 'mygroup', '1526569495631-0')
# @example With splatted entry ids
# redis.xack('mystream', 'mygroup', '0-1', '0-2')
# @example With arrayed entry ids
# redis.xack('mystream', 'mygroup', %w[0-1 0-2])
#
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param ids [Array<String>] one or multiple entry ids
#
# @return [Integer] the number of entries successfully acknowledged
def xack(key, group, *ids)
args = [:xack, key, group].concat(ids.flatten)
synchronize { |client| client.call(args) }
end

# Changes the ownership of a pending entry
#
# @example With splatted entry ids
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-1', '0-2')
# @example With arrayed entry ids
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2])
# @example With idle option
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], idle: 1000)
# @example With time option
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], time: 1542866959000)
# @example With retrycount option
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], retrycount: 10)
# @example With force option
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], force: true)
# @example With justid option
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, %w[0-1 0-2], justid: true)
#
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param consumer [String] the consumer name
# @param min_idle_time [Integer] the number of milliseconds
# @param ids [Array<String>] one or multiple entry ids
# @param idle [Integer] the number of milliseconds as last time it was delivered of the entry
# @param time [Integer] the number of milliseconds as a specific Unix time
# @param retrycount [Integer] the number of retry counter
# @param force [Boolean] `true` if you'd like to create the pending entry in the peiding entries list
# @param justid [Boolean] `true` if you'd like to get just an array of entry ids
#
# @return [Hash{String => Hash}] the entries successfully claimed
# @return [Array<String>] the entry ids successfully claimed if justid option is `true`
def xclaim(key, group, consumer, min_idle_time, *ids, idle: nil, time: nil, retrycount: nil, force: false, justid: false)
args = [:xclaim, key, group, consumer, min_idle_time].concat(ids.flatten)
args.concat(['IDLE', idle.to_i]) unless idle.nil?
args.concat(['TIME', time.to_i]) unless time.nil?
args.concat(['RETRYCOUNT', retrycount.to_i]) unless time.nil?
args.concat(['FORCE']) if force
args.concat(['JUSTID']) if justid
blk = justid ? Noop : HashifyStreamEntries
synchronize { |client| client.call(args, &blk) }
end

# Fetches not acknowledging pending entries
#
# @example With key and group
# redis.xpending('mystream', 'mygroup')
# @example With range options
# redis.xpending('mystream', 'mygroup', first: '-', last: '+', count: 10)
# @example With range and consumer options
# redis.xpending('mystream', 'mygroup', 'consumer1', first: '-', last: '+', count: 10)
#
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param consumer [String] the consumer name
# @param first [String] first entry id of range
# @param last [String] last entry id of range
# @param count [Integer] the number of entries as limit
#
# @return [Hash] the summary of pending entries
# @return [Array<Hash>] the pending entries details if options were specified
def xpending(key, group, consumer = nil, first: nil, last: nil, count: nil)
args = [:xpending, key, group, first, last, count, consumer].compact
summary_needed = consumer.nil? && first.nil? && last.nil? && count.nil?
blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails
synchronize { |client| client.call(args, &blk) }
end

# Interact with the sentinel command (masters, master, slaves, failover)
#
# @param [String] subcommand e.g. `masters`, `master`, `slaves`
Expand Down Expand Up @@ -2948,6 +3263,43 @@ def method_missing(command, *args)
end.compact]
}

HashifyStreams =
lambda { |reply|
return {} if reply.nil?
reply.map do |stream_key, entries|
[stream_key, HashifyStreamEntries.call(entries)]
end.to_h
}

HashifyStreamEntries =
lambda { |reply|
reply.map do |entry_id, values|
[entry_id, values.each_slice(2).to_h]
end.to_h
}

HashifyStreamPendings =
lambda { |reply|
{
'size' => reply[0],
'min_entry_id' => reply[1],
'max_entry_id' => reply[2],
'consumers' => reply[3].nil? ? {} : Hash[reply[3]]
}
}

HashifyStreamPendingDetails =
lambda { |reply|
reply.map do |arr|
{
'entry_id' => arr[0],
'consumer' => arr[1],
'elapsed' => arr[2],
'count' => arr[3]
}
end
}

HashifyClusterNodeInfo =
lambda { |str|
arr = str.split(' ')
Expand Down Expand Up @@ -3013,6 +3365,17 @@ def _subscription(method, timeout, channels, block)
@client = original
end
end

def _xread(args, blocking_timeout_msec)
synchronize do |client|
if blocking_timeout_msec.nil?
client.call(args, &HashifyStreams)
else
timeout = client.timeout.to_f + blocking_timeout_msec.to_f / 1000.0
client.call_with_timeout(args, timeout, &HashifyStreams)
end
end
end
end

require_relative "redis/version"
Expand Down
Loading

0 comments on commit 2f9bdff

Please sign in to comment.