Skip to content

Commit

Permalink
Add replay support (5-days back history)
Browse files Browse the repository at this point in the history
o Fix wrong retry reset when an error is returned by GNIP
o Fix retrier (and decrement default max number of retries to 2)
  • Loading branch information
Laurent Farcy committed Jul 5, 2016
1 parent 277ec92 commit b688f04
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 36 deletions.
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ The ```:stop_timeout``` may be fine-tune when passing options to the tracker.

As highly recommended by GNIP, the PowerTrack::Stream client manages an exponential
backoff retry mechanism when a disconnection happens. The reconnections can be
fine-tuned through the ```max_retries``` and ```backoff``` options passed to the
```track``` call.
fine-tuned through the ```:max_retries``` and ```:backoff``` options passed to
the ```track``` call.

## Backfill

Expand All @@ -102,6 +102,19 @@ last 5 minutes when reconnecting.
Provide a (numerical) client id as the last (but optional) argument of the
PowerTrack::Stream constructor to enable this feature.

## Replay

Replay is a feature provided by GNIP to recover lost activities over the last
5 days. The Replay stream lives aside the realtime stream and is activated
by setting the ```:replay``` option to true when building a ```PowerTrack::Stream```
object.

Once Replay is activated, you use the stream as previously, starting by
configuring some rules that define which activities you will recover. Once done,
you can track the stream by specifying a timeframe with the ```:from```
and ```:to options```. By default, replay happens over 30 minutes, starting 1
hour ago.

## Errors

All the errors that come from PowerTrack are defined through an ad-hoc exception
Expand Down
1 change: 1 addition & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,6 @@ See [Managing disconnections](http://support.gnip.com/articles/disconnections-ex
## Other features

* _[DONE]_ Support test and development streams
* _[DONE]_ Support Replay mode (5-days back history)
* Support status dashboard
* Support Historical Powertrack
2 changes: 1 addition & 1 deletion lib/powertrack/streaming/retrier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def stop
# retrier.
def retry(&block)
# TODO: manage exceptions
while @continue && @retries < @max_retries
while @continue && @retries <= @max_retries
res = yield
if @continue
@retries += 1
Expand Down
71 changes: 53 additions & 18 deletions lib/powertrack/streaming/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Stream
include VoidLogger::LoggerMixin

# The format of the URLs to connect to the various stream services
FEATURE_URL_FORMAT = "https://%s:%s/accounts/%s/publishers/%s/streams/track/%s%s.json".freeze
FEATURE_URL_FORMAT = "https://%s:%s/accounts/%s/publishers/%s/%s/track/%s%s.json".freeze

# The default timeout on a connection to PowerTrack. Can be overriden per call.
DEFAULT_CONNECTION_TIMEOUT = 30
Expand All @@ -32,16 +32,22 @@ class Stream
connect_timeout: DEFAULT_CONNECTION_TIMEOUT,
inactivity_timeout: DEFAULT_INACTIVITY_TIMEOUT,
# use a client id if you want to leverage the Backfill feature
client_id: nil
client_id: nil,
# enable the replay mode to get activities over the last 5 days
# see http://support.gnip.com/apis/replay/api_reference.html
replay: false
}

DEFAULT_OK_RESPONSE_STATUS = 200

# the patterns used to identify the various types of message received from GNIP
# The patterns used to identify the various types of message received from GNIP
# everything else is an activity
HEARTBEAT_MESSAGE_PATTERN = /\A\s*\z/
SYSTEM_MESSAGE_PATTERN = /\A\s*\{\s*"(info|warn|error)":/mi

# The format used to send UTC timestamps in Replay mode
REPLAY_TIMESTAMP_FORMAT = '%Y%m%d%H%M'

attr_reader :username, :account_name, :data_source, :label

def initialize(username, password, account_name, data_source, label, options=nil)
Expand All @@ -52,6 +58,8 @@ def initialize(username, password, account_name, data_source, label, options=nil
@label = label
@options = DEFAULT_STREAM_OPTIONS.merge(options || {})
@client_id = @options[:client_id]
@replay = !!@options[:replay]
@stream_mode = @replay ? 'replay' : 'streams'
end

# Adds many rules to your PowerTrack stream’s ruleset.
Expand Down Expand Up @@ -105,14 +113,18 @@ def list_rules(options=nil)
# receive GZip-compressed payloads ?
compressed: true,
# max number of retries after a disconnection
max_retries: 3,
max_retries: 2,
# advanced options to configure exponential backoff used for retries
backoff: nil,
# max number of seconds to wait for last message handlers to complete
stop_timeout: 10,
# pass message in raw form (JSON formatted string) instead of JSON-decoded
# Ruby objects to message handlers
raw: false,
# the starting date from which the activities will be recovered (replay mode only)
from: nil,
# the ending date to which the activities will be recovered (replay mode only)
to: nil,
# called for each message received, except heartbeats
on_message: nil,
# called for each activity received
Expand Down Expand Up @@ -158,6 +170,7 @@ def feature_url(hostname, feature=nil)
gnip_server_port,
@account_name,
@data_source,
@stream_mode,
@label,
feature ]

Expand Down Expand Up @@ -187,6 +200,7 @@ def connection_headers
# Opens a new connection to GNIP PowerTrack.
def connect(hostname, feature=nil)
url = feature_url(hostname, feature)
logger.debug("Connecting to '#{url}' with headers #{connection_headers}...")
EventMachine::HttpRequest.new(url, connection_headers)
end

Expand Down Expand Up @@ -283,14 +297,14 @@ def make_rules_request(verb, options=nil)
handle_api_response(resp_status, resp_error, resp_body, options[:ok])
end

# Returns the type of message received on the stream, nil when the type
# cannot be identified.
# Returns the type of message received on the stream, together with a
# level indicator in case of a system message, nil otherwise.
def message_type(message)
case message
when HEARTBEAT_MESSAGE_PATTERN then :heartbeat
when SYSTEM_MESSAGE_PATTERN then :system
when HEARTBEAT_MESSAGE_PATTERN then [ :heartbeat, nil ]
when SYSTEM_MESSAGE_PATTERN then [ :system, $1.downcase.to_sym ]
else
:activity
[ :activity, nil ]
end
end

Expand Down Expand Up @@ -322,7 +336,25 @@ def track_once(options, retrier)
EM.run do
logger.info "Starting the reactor..."
con = connect('stream')
http = con.get(head: track_req_headers(options[:compressed]))
get_opts = { head: track_req_headers(options[:compressed]) }

# add a timeframe in replay mode
if @replay
now = Time.now
# start 1 hour ago by default
from = options[:from] || (now - 60*60)
# stop 30 minutes ago by default
to = options[:to] || (now - 30*60)

get_opts[:query] = {
'fromDate' => from.utc.strftime(REPLAY_TIMESTAMP_FORMAT),
'toDate' => to.utc.strftime(REPLAY_TIMESTAMP_FORMAT)
}

logger.info "Replay mode enabled from '#{from}' to '#{to}'"
end

http = con.get(get_opts)

# polls to see if the connection should be closed
close_watcher = EM.add_periodic_timer(1) do
Expand Down Expand Up @@ -352,19 +384,21 @@ def track_once(options, retrier)
next
end

# reset retries when some (valid) data are received
if retrier.retrying?
logger.info "Resetting retries..."
retrier.reset!
end

# process the chunk
buffer.process(chunk) do |raw|
logger.debug "New message received"

# get the message type and its (optional) level
m_type, m_level = message_type(raw)

# reset retries when some (valid) data are received
if retrier.retrying? && m_level != :error
logger.info "Resetting retries..."
retrier.reset!
end

EM.defer do
# select the right message handler(s) according to the message type
m_type = message_type(raw)

if m_type == :heartbeat
on_heartbeat.call if on_heartbeat
else
Expand Down Expand Up @@ -401,6 +435,7 @@ def track_once(options, retrier)
resp_status = http_client.response_header.status || DEFAULT_OK_RESPONSE_STATUS
resp_error = http_client.error
resp_body = http_client.response

wait_til_defers_finish_and_stop(stop_timeout)
end
end
Expand Down
5 changes: 3 additions & 2 deletions test/minitest_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ def powertrack_config
end

# Returns a brand-new stream based on the config found in test/powertrack.yml.
def new_stream
def new_stream(replay=false)
PowerTrack::Stream.new(
powertrack_config[:username],
powertrack_config[:password],
powertrack_config[:account_name],
powertrack_config[:data_source],
powertrack_config[:stream_label])
replay ? 'prod' : powertrack_config[:stream_label],
replay: replay)
end
end
10 changes: 9 additions & 1 deletion test/test_manage_rules.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@
class TestManageRules < Minitest::Test

def test_add_then_delete_a_single_rule
stream = new_stream
add_then_delete_a_single_rule(false)
end

def test_add_then_delete_a_single_rule_in_replay_mode
add_then_delete_a_single_rule(true)
end

def add_then_delete_a_single_rule(replay)
stream = new_stream(replay)

rule = PowerTrack::Rule.new('coke')
assert rule.valid?
Expand Down
54 changes: 42 additions & 12 deletions test/test_track_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@

class TestTrackStream < Minitest::Test

def test_track_simple_stream
stream = new_stream
def test_track_realtime_stream
track_simple_stream(false)
end

def test_track_replay_stream
track_simple_stream(true)
end

def track_simple_stream(replay)
stream = new_stream(replay)

# add a logger
stream.logger = Logger.new(STDERR)
Expand All @@ -23,6 +31,8 @@ def test_track_simple_stream
received = 0
tweeted = 0
closed = false
from = nil
to = nil

# ready to track
on_message = lambda do |message|
Expand All @@ -34,28 +44,48 @@ def test_track_simple_stream
on_activity = lambda do |tweet|
tweeted += 1
end
on_system = lambda do |message|
$stderr.puts message.inspect
end

close_now = lambda { closed }

delay = 60
Thread.new do
$stderr.puts "Time-bomb thread running for #{delay} seconds..."
sleep delay
$stderr.puts "Time to shut down !"
closed = true
if replay
now = Time.now
from = now - 31*60
to = now - 30*60
delay = to - from
else
delay = 60
Thread.new do
$stderr.puts "Time-bomb thread running for #{delay} seconds..."
sleep delay
$stderr.puts "Time to shut down !"
closed = true
end
end

started_at = Time.now
res = stream.track(on_message: on_message,
on_heartbeat: on_heartbeat,
on_activity: on_activity,
on_system: on_system,
close_now: close_now,
max_retries: 3,
fake_disconnections: 20)
max_retries: replay ? 0 : 2,
fake_disconnections: replay ? nil : 20,
from: from,
to: to)

ended_at = Time.now

assert_nil res
assert closed, 'Stream not closed'
assert Time.now - started_at >= delay
assert replay || closed, 'Stream not closed'

if replay
assert (ended_at - started_at) <= delay
else
assert (ended_at - started_at) >= delay
end

assert heartbeats > 0, 'No heartbeat received'
puts "#{heartbeats} heartbeats received"
Expand Down

0 comments on commit b688f04

Please sign in to comment.