-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
unqueue_message_job.rb
462 lines (417 loc) · 23.4 KB
/
unqueue_message_job.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
class UnqueueMessageJob < Postal::Job
def perform
if original_message = QueuedMessage.find_by_id(params['id'])
if original_message.acquire_lock
log "Lock acquired for queued message #{original_message.id}"
begin
original_message.message
rescue Postal::MessageDB::Message::NotFound
log "Unqueue #{original_message.id} because backend message has been removed."
original_message.destroy
return
end
unless original_message.retriable?
log "Skipping because retry after isn't reached"
original_message.unlock
return
end
begin
other_messages = original_message.batchable_messages(100)
log "Found #{other_messages.size} associated messages to process at the same time (batch key: #{original_message.batch_key})"
rescue
original_message.unlock
raise
end
([original_message] + other_messages).each do |queued_message|
log_prefix = "[#{queued_message.server_id}::#{queued_message.message_id} #{queued_message.id}]"
begin
log "#{log_prefix} Got queued message with exclusive lock"
begin
queued_message.message
rescue Postal::MessageDB::Message::NotFound
log "#{log_prefix} Unqueueing #{queued_message.id} because backend message has been removed"
queued_message.destroy
next
end
#
# If the server is suspended, hold all messages
#
if queued_message.server.suspended?
log "#{log_prefix} Server is suspended. Holding message."
queued_message.message.create_delivery('Held', :details => "Mail server has been suspended. No e-mails can be processed at present. Contact support for assistance.")
queued_message.destroy
next
end
# We might not be able to send this any more, check the attempts
if queued_message.attempts >= Postal.config.general.maximum_delivery_attempts
details = "Maximum number of delivery attempts (#{queued_message.attempts}) has been reached."
if queued_message.message.scope == 'incoming'
# Send bounces to incoming e-mails when they are hard failed
if bounce_id = queued_message.send_bounce
details += " Bounce sent to sender (see message <msg:#{bounce_id}>)"
end
elsif queued_message.message.scope == 'outgoing'
# Add the recipient to the suppression list
if queued_message.server.message_db.suppression_list.add(:recipient, queued_message.message.rcpt_to, :reason => "too many soft fails")
log "Added #{queued_message.message.rcpt_to} to suppression list because maximum attempts has been reached"
details += " Added #{queued_message.message.rcpt_to} to suppression list because delivery has failed #{queued_message.attempts} times."
end
end
queued_message.message.create_delivery('HardFail', :details => details)
queued_message.destroy
log "#{log_prefix} Message has reached maximum number of attempts. Hard failing."
next
end
# If the raw message has been removed (removed by retention)
unless queued_message.message.raw_message?
log "#{log_prefix} Raw message has been removed. Not sending."
queued_message.message.create_delivery('HardFail', :details => "Raw message has been removed. Cannot send message.")
queued_message.destroy
next
end
#
# Handle Incoming Messages
#
if queued_message.message.scope == 'incoming'
#
# If this is a bounce, we need to handle it as such
#
if queued_message.message.bounce == 1
log "#{log_prefix} Message is a bounce"
original_messages = queued_message.message.original_messages
unless original_messages.empty?
for original_message in queued_message.message.original_messages
queued_message.message.update(:bounce_for_id => original_message.id, :domain_id => original_message.domain_id)
queued_message.message.create_delivery('Processed', :details => "This has been detected as a bounce message for <msg:#{original_message.id}>.")
original_message.bounce!(queued_message.message)
log "#{log_prefix} Bounce linked with message #{original_message.id}"
end
queued_message.destroy
next
end
# This message was sent to the return path but hasn't been matched
# to an original message. If we have a route for this, route it
# otherwise we'll drop at this point.
if queued_message.message.route_id.nil?
log "#{log_prefix} No source messages found. Hard failing."
queued_message.message.create_delivery('HardFail', :details => "This message was a bounce but we couldn't link it with any outgoing message and there was no route for it.")
queued_message.destroy
next
end
end
#
# Update live stats
#
queued_message.message.database.live_stats.increment(queued_message.message.scope)
#
# Inspect incoming messages
#
if queued_message.message.inspected == 0
log "#{log_prefix} Inspecting message"
queued_message.message.inspect_message
if queued_message.message.inspected == 1
is_spam = queued_message.message.spam_score > queued_message.server.spam_threshold
queued_message.message.update(:spam => 1) if is_spam
queued_message.message.append_headers(
"X-Postal-Spam: #{queued_message.message.spam == 1 ? 'yes' : 'no'}",
"X-Postal-Spam-Threshold: #{queued_message.server.spam_threshold}",
"X-Postal-Spam-Score: #{queued_message.message.spam_score}",
"X-Postal-Threat: #{queued_message.message.threat == 1 ? 'yes' : 'no'}"
)
log "#{log_prefix} Message inspected successfully. Headers added."
end
end
#
# If this message has a SPAM score higher than is permitted
#
if queued_message.message.spam_score >= queued_message.server.spam_failure_threshold
log "#{log_prefix} Message has a spam score higher than the server's maxmimum. Hard failing."
queued_message.message.create_delivery('HardFail', :details => "Message's spam score is higher than the failure threshold for this server. Threshold is currently #{queued_message.server.spam_failure_threshold}.")
queued_message.destroy
next
end
# If the server is in development mode, hold it
if queued_message.server.mode == 'Development' && !queued_message.manual?
log "Server is in development mode so holding."
queued_message.message.create_delivery('Held', :details => "Server is in development mode.")
queued_message.destroy
log "#{log_prefix} Server is in development mode. Holding."
next
end
#
# Find out what sort of message we're supposed to be sending and dispatch this request over to
# the sender.
#
if route = queued_message.message.route
# If the route says we're holding quananteed mail and this is spam, we'll hold this
if route.spam_mode == 'Quarantine' && queued_message.message.spam == 1 && !queued_message.manual?
queued_message.message.create_delivery('Held', :details => "Message placed into quarantine.")
queued_message.destroy
log "#{log_prefix} Route says to quarantine spam message. Holding."
next
end
# If the route says we're holding quananteed mail and this is spam, we'll hold this
if route.spam_mode == 'Fail' && queued_message.message.spam == 1 && !queued_message.manual?
queued_message.message.create_delivery('HardFail', :details => "Message is spam and the route specifies it should be failed.")
queued_message.destroy
log "#{log_prefix} Route says to fail spam message. Hard failing."
next
end
#
# Messages that should be blindly accepted are blindly accepted
#
if route.mode == 'Accept'
queued_message.message.create_delivery('Processed', :details => "Message has been accepted but not sent to any endpoints.")
queued_message.destroy
log "#{log_prefix} Route says to accept without endpoint. Marking as processed."
next
end
#
# Messages that should be accepted and held should be held
#
if route.mode == 'Hold'
log "#{log_prefix} Route says to hold message."
if queued_message.manual?
log "#{log_prefix} Message was queued manually. Marking as processed."
queued_message.message.create_delivery('Processed', :details => "Message has been processed.")
else
log "#{log_prefix} Message was not queued manually. Holding."
queued_message.message.create_delivery('Held', :details => "Message has been accepted but not sent to any endpoints.")
end
queued_message.destroy
next
end
#
# Messages that should be bounced should be bounced (or rejected if they got this far)
#
if route.mode == 'Bounce' || route.mode == 'Reject'
if id = queued_message.send_bounce
queued_message.message.create_delivery('HardFail', :details => "Message has been bounced because the route asks for this. See message <msg:#{id}>")
log "#{log_prefix} Route says to bounce. Hard failing and sent bounce (#{id})."
end
queued_message.destroy
next
end
begin
if @fixed_result
result = @fixed_result
else
case queued_message.message.endpoint
when SMTPEndpoint
sender = cached_sender(Postal::SMTPSender, queued_message.message.recipient_domain, nil, :servers => [queued_message.message.endpoint])
when HTTPEndpoint
sender = cached_sender(Postal::HTTPSender, queued_message.message.endpoint)
when AddressEndpoint
sender = cached_sender(Postal::SMTPSender, queued_message.message.endpoint.domain, nil, :force_rcpt_to => queued_message.message.endpoint.address)
else
log "#{log_prefix} Invalid endpoint for route (#{queued_message.message.endpoint_type})"
queued_message.message.create_delivery('HardFail', :details => "Invalid endpoint for route.")
queued_message.destroy
next
end
result = sender.send_message(queued_message.message)
if result.connect_error
@fixed_result = result
end
end
end
# Log the result
log_details = result.details
if result.type =='HardFail' && result.suppress_bounce
# The delivery hard failed, but requested that no bounce be sent
log "#{log_prefix} Suppressing bounce message after hard fail"
elsif result.type =='HardFail' && queued_message.message.send_bounces?
# If the message is a hard fail, send a bounce message for this message.
log "#{log_prefix} Sending a bounce because message hard failed"
if bounce_id = queued_message.send_bounce
log_details += ". " unless log_details =~ /\.\z/
log_details += " Sent bounce message to sender (see message <msg:#{bounce_id}>)"
end
end
queued_message.message.create_delivery(result.type, :details => log_details, :output => result.output&.strip, :sent_with_ssl => result.secure, :log_id => result.log_id, :time => result.time)
if result.retry
log "#{log_prefix} Message requeued for trying later."
queued_message.retry_later(result.retry.is_a?(Fixnum) ? result.retry : nil)
queued_message.allocate_ip_address
queued_message.update_column(:ip_address_id, queued_message.ip_address&.id)
else
log "#{log_prefix} Message processing completed."
queued_message.message.endpoint.mark_as_used
queued_message.destroy
end
else
log "#{log_prefix} No route and/or endpoint available for processing. Hard failing."
queued_message.message.create_delivery('HardFail', :details => "Message does not have a route and/or endpoint available for delivery.")
queued_message.destroy
next
end
end
#
# Handle Outgoing Messages
#
if queued_message.message.scope == 'outgoing'
if queued_message.message.domain.nil?
log "#{log_prefix} Message has no domain. Hard failing."
queued_message.message.create_delivery('HardFail', :details => "Message's domain no longer exist")
queued_message.destroy
next
end
#
# If there's no to address, we can't do much. Fail it.
#
if queued_message.message.rcpt_to.blank?
log "#{log_prefix} Message has no to address. Hard failing."
queued_message.message.create_delivery('HardFail', :details => "Message doesn't have an RCPT to")
queued_message.destroy
next
end
#
# If the credentials for this message is marked as holding and this isn't manual, hold it
#
if !queued_message.manual? && queued_message.message.credential && queued_message.message.credential.hold?
log "#{log_prefix} Credential wants us to hold messages. Holding."
queued_message.message.create_delivery('Held', :details => "Credential is configured to hold all messages authenticated by it.")
queued_message.destroy
next
end
#
# If the recipient is on the suppression list and this isn't a manual queueing block sending
#
if !queued_message.manual? && sl = queued_message.server.message_db.suppression_list.get(:recipient, queued_message.message.rcpt_to)
log "#{log_prefix} Recipient is on the suppression list. Holding."
queued_message.message.create_delivery('Held', :details => "Recipient (#{queued_message.message.rcpt_to}) is on the suppression list (reason: #{sl['reason']})")
queued_message.destroy
next
end
# Extract a tag and add it to the message if one doesn't exist
if queued_message.message.tag.nil? && tag = queued_message.message.headers['x-postal-tag']
log "#{log_prefix} Added tag #{tag.last}"
queued_message.message.update(:tag => tag.last)
end
# Parse the content of the message as appropriate
if queued_message.message.should_parse?
log "#{log_prefix} Parsing message content as it hasn't been parsed before"
queued_message.message.parse_content
end
# Inspect outgoing messages when there's a threshold set for the server
if queued_message.message.inspected == 0 && queued_message.server.outbound_spam_threshold
log "#{log_prefix} Inspecting message"
queued_message.message.inspect_message
if queued_message.message.inspected == 1
if queued_message.message.spam_score >= queued_message.server.outbound_spam_threshold
queued_message.message.update(:spam => 1)
end
log "#{log_prefix} Message inspected successfully"
end
end
if queued_message.message.spam == 1
queued_message.message.create_delivery("HardFail", :details => "Message is likely spam. Threshold is #{queued_message.server.outbound_spam_threshold} and the message scored #{queued_message.message.spam_score}.")
queued_message.destroy
log "#{log_prefix} Message is spam (#{queued_message.message.spam_score}). Hard failing."
next
end
# Add outgoing headers
if !queued_message.message.has_outgoing_headers?
queued_message.message.add_outgoing_headers
end
# Check send limits
if queued_message.server.send_limit_exceeded?
# If we're over the limit, we're going to be holding this message
queued_message.server.update_columns(:send_limit_exceeded_at => Time.now, :send_limit_approaching_at => nil)
queued_message.message.create_delivery('Held', :details => "Message held because send limit (#{queued_message.server.send_limit}) has been reached.")
queued_message.destroy
log "#{log_prefix} Server send limit has been exceeded. Holding."
next
elsif queued_message.server.send_limit_approaching?
# If we're approaching the limit, just say we are but continue to process the message
queued_message.server.update_columns(:send_limit_approaching_at => Time.now, :send_limit_exceeded_at => nil)
else
queued_message.server.update_columns(:send_limit_approaching_at => nil, :send_limit_exceeded_at => nil)
end
# Update the live stats for this message.
queued_message.message.database.live_stats.increment(queued_message.message.scope)
# If the server is in development mode, hold it
if queued_message.server.mode == 'Development' && !queued_message.manual?
log "Server is in development mode so holding."
queued_message.message.create_delivery('Held', :details => "Server is in development mode.")
queued_message.destroy
log "#{log_prefix} Server is in development mode. Holding."
next
end
# Send the outgoing message to the SMTP sender
begin
if @fixed_result
result = @fixed_result
else
sender = cached_sender(Postal::SMTPSender, queued_message.message.recipient_domain, queued_message.ip_address)
result = sender.send_message(queued_message.message)
if result.connect_error
@fixed_result = result
end
end
end
#
# If the message has been hard failed, check to see how many other recent hard fails we've had for the address
# and if there are more than 2, suppress the address for 30 days.
#
if result.type == 'HardFail'
recent_hard_fails = queued_message.server.message_db.select(:messages, :where => {:rcpt_to => queued_message.message.rcpt_to, :status => 'HardFail', :timestamp => {:greater_than => 24.hours.ago.to_f}}, :count => true)
if recent_hard_fails >= 1
if queued_message.server.message_db.suppression_list.add(:recipient, queued_message.message.rcpt_to, :reason => "too many hard fails")
log "#{log_prefix} Added #{queued_message.message.rcpt_to} to suppression list because #{recent_hard_fails} hard fails in 24 hours"
result.details += "." if result.details =~ /\.\z/
result.details += " Recipient added to suppression list (too many hard fails)."
end
end
end
#
# If a message is sent successfully, remove the users from the suppression list
#
if result.type == 'Sent'
if queued_message.server.message_db.suppression_list.remove(:recipient, queued_message.message.rcpt_to)
log "#{log_prefix} Removed #{queued_message.message.rcpt_to} from suppression list because success"
result.details += "." if result.details =~ /\.\z/
result.details += " Recipient removed from suppression list."
end
end
# Log the result
queued_message.message.create_delivery(result.type, :details => result.details, :output => result.output, :sent_with_ssl => result.secure, :log_id => result.log_id, :time => result.time)
if result.retry
log "#{log_prefix} Message requeued for trying later."
queued_message.retry_later(result.retry.is_a?(Fixnum) ? result.retry : nil)
else
log "#{log_prefix} Processing complete"
queued_message.destroy
end
end
rescue => e
log "#{log_prefix} Internal error: #{e.class}: #{e.message}"
e.backtrace.each { |e| log("#{log_prefix} #{e}") }
queued_message.retry_later
log "#{log_prefix} Queued message was unlocked"
if defined?(Raven)
Raven.capture_exception(e, :extra => {:job_id => self.id, :server_id => queued_message.server_id, :message_id => queued_message.message_id})
end
if queued_message.message
queued_message.message.create_delivery("Error", :details => "An internal error occurred while sending this message. This message will be retried automatically. If this persists, contact support for assistance.", :output => "#{e.class}: #{e.message}", :log_id => "J-#{self.id}")
end
end
end
else
log "Couldn't get lock for message #{params['id']}. I won't do this."
end
else
log "No queued message with ID #{params['id']} was available for processing."
end
ensure
@sender&.finish rescue nil
end
private
def cached_sender(klass, *args)
@sender ||= begin
sender = klass.new(*args)
sender.start
sender
end
end
end