forked from alphagov/publishing-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdownstream_live_worker.rb
105 lines (89 loc) · 3.21 KB
/
downstream_live_worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
require "sidekiq-unique-jobs"
class DownstreamLiveWorker
include DownstreamQueue
include Sidekiq::Worker
include PerformAsyncInQueue
sidekiq_options queue: HIGH_QUEUE,
unique: :until_executing,
unique_args: :uniq_args
def self.uniq_args(args)
[
args.first["content_id"],
args.first["locale"],
args.first["message_queue_event_type"],
args.first.fetch("update_dependencies", true),
args.first.fetch("orphaned_content_ids", []),
name,
]
end
def perform(args = {})
assign_attributes(args.symbolize_keys)
unless edition
raise AbortWorkerError, "A downstreamable edition was not found for content_id: #{content_id} and locale: #{locale}"
end
unless dependency_resolution_source_content_id.nil?
DownstreamService.set_govuk_dependency_resolution_source_content_id_header(
dependency_resolution_source_content_id,
)
end
payload = DownstreamPayload.new(edition, payload_version, draft: false)
update_expanded_links(payload)
DownstreamService.update_live_content_store(payload) if edition.base_path
if %w[published unpublished].include?(edition.state)
event_type = message_queue_event_type || edition.update_type
DownstreamService.broadcast_to_message_queue(payload, event_type)
end
enqueue_dependencies if update_dependencies
rescue AbortWorkerError => e
notify_airbrake(e, args)
end
private
attr_reader :content_id,
:locale,
:edition,
:payload_version,
:message_queue_event_type,
:update_dependencies,
:dependency_resolution_source_content_id,
:orphaned_content_ids,
:source_command,
:source_fields
def assign_attributes(attributes)
@content_id = attributes.fetch(:content_id)
@locale = attributes.fetch(:locale)
@payload_version = Event.maximum_id
@edition = Queries::GetEditionForContentStore.call(content_id, locale, include_draft: false)
@orphaned_content_ids = attributes.fetch(:orphaned_content_ids, [])
@message_queue_event_type = attributes.fetch(:message_queue_event_type, nil)
@update_dependencies = attributes.fetch(:update_dependencies, true)
@dependency_resolution_source_content_id = attributes.fetch(
:dependency_resolution_source_content_id,
nil,
)
@source_command = attributes[:source_command]
@source_fields = attributes.fetch(:source_fields, [])
end
def enqueue_dependencies
DependencyResolutionWorker.perform_async(
content_store: Adapters::ContentStore,
content_id: content_id,
locale: locale,
orphaned_content_ids: orphaned_content_ids,
source_command: source_command,
source_document_type: edition.document_type,
source_fields: source_fields,
)
end
def notify_airbrake(error, parameters)
GovukError.notify(error, level: "warning", extra: parameters)
end
def update_expanded_links(downstream_payload)
ExpandedLinks.locked_update(
content_id: content_id,
locale: locale,
with_drafts: false,
payload_version: payload_version,
expanded_links: downstream_payload.expanded_links,
)
end
end