Skip to content

Commit

Permalink
Audit Enabling / Disabling Workflows (#2715)
Browse files Browse the repository at this point in the history
* Add maybe_audit_workflow_state_changes/2 func

* Tests

* Update CL

* Tweaks

---------

Co-authored-by: Stuart Corbishley <[email protected]>
  • Loading branch information
elias-ba and stuartc authored Nov 28, 2024
1 parent 19bd2bf commit d4d52bf
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 3 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ and this project adheres to

### Added

- Enable / Disable Workflows UI
- Auditing when enabling/disabling a workflow
[#2697](https://github.com/OpenFn/lightning/issues/2697)
- Ability to enable/disable a workflow from the workflow editor
[#2698](https://github.com/OpenFn/lightning/issues/2698)

### Changed
Expand Down
32 changes: 31 additions & 1 deletion lib/lightning/workflows.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ defmodule Lightning.Workflows do
multi |> capture_snapshot()
end
end)
|> maybe_audit_workflow_state_changes(changeset)
|> Repo.transaction()
|> case do
{:ok, %{workflow: workflow}} ->
Expand Down Expand Up @@ -209,13 +210,42 @@ defmodule Lightning.Workflows do
&(Map.get(&1, :workflow) |> Snapshot.build()),
returning: false
)
|> Multi.insert(:audit, fn changes ->
|> Multi.insert(:audit_snapshot_creation, fn changes ->
%{snapshot: %{id: snapshot_id}, workflow: %{id: workflow_id}} = changes

Audit.snapshot_created(workflow_id, snapshot_id, changes.actor)
end)
end

defp maybe_audit_workflow_state_changes(multi, changeset) do
changeset
|> Ecto.Changeset.get_change(:triggers, [])
|> Enum.reduce_while(nil, fn trigger_changeset, _previous ->
case Ecto.Changeset.get_change(trigger_changeset, :enabled) do
nil -> {:cont, nil}
changed -> {:halt, {trigger_changeset.data.enabled, changed}}
end
end)
|> case do
nil ->
multi

{from, to} ->
Ecto.Multi.insert(
multi,
:audit_workflow_state_change,
fn %{workflow: %{id: workflow_id}, actor: actor} ->
Audit.workflow_state_changed(
if(to, do: "enabled", else: "disabled"),
workflow_id,
actor,
%{before: %{enabled: from}, after: %{enabled: to}}
)
end
)
end
end

# Helper to preload associations only if they are present in the attributes
defp maybe_preload(workflow, assoc, attrs) do
List.wrap(assoc)
Expand Down
13 changes: 12 additions & 1 deletion lib/lightning/workflows/audit.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ defmodule Lightning.Workflows.Audit do
repo: Lightning.Repo,
item: "workflow",
events: [
"snapshot_created"
"snapshot_created",
"enabled",
"disabled"
]

def snapshot_created(workflow_id, snapshot_id, actor) do
Expand All @@ -19,4 +21,13 @@ defmodule Lightning.Workflows.Audit do
}
)
end

def workflow_state_changed(event, workflow_id, actor, changes) do
event(
event,
workflow_id,
actor,
changes
)
end
end
103 changes: 103 additions & 0 deletions test/lightning/workflows_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,75 @@ defmodule Lightning.WorkflowsTest do
} = Repo.one(Audit)
end

test "save_workflow/1 audits when a trigger is enabled" do
%{id: user_id} = user = insert(:user)
workflow = create_workflow()

{:ok, _workflow} =
workflow
|> Workflows.update_triggers_enabled_state(true)
|> Workflows.save_workflow(user)

assert_trigger_state_audit(workflow.id, user_id, false, true, "enabled")
end

test "save_workflow/1 audits when a trigger is disabled" do
%{id: user_id} = user = insert(:user)
workflow = create_workflow(enabled: true)

{:ok, _workflow} =
workflow
|> Workflows.update_triggers_enabled_state(false)
|> Workflows.save_workflow(user)

assert_trigger_state_audit(workflow.id, user_id, true, false, "disabled")
end

test "save_workflow/1 does not audit when trigger enabled state doesn't change" do
user = insert(:user)
workflow = create_workflow(enabled: true)

{:ok, _workflow} =
workflow
|> Workflows.update_triggers_enabled_state(true)
|> Workflows.save_workflow(user)

assert Repo.aggregate(Audit, :count) == 0
end

test "save_workflow/1 does not audit when updating other workflow attributes" do
user = insert(:user)
workflow = create_workflow(enabled: true)

{:ok, _workflow} =
workflow
|> Workflows.change_workflow(%{name: "updated name"})
|> Workflows.save_workflow(user)

assert Repo.aggregate(
from(a in Audit, where: a.event in ["enabled", "disabled"]),
:count
) == 0
end

test "save_workflow/1 with simultaneous trigger and name changes only audits trigger" do
%{id: user_id} = user = insert(:user)
workflow = create_workflow(enabled: true)

{:ok, _workflow} =
workflow
|> Workflows.change_workflow(%{name: "new name"})
|> Workflows.update_triggers_enabled_state(false)
|> Workflows.save_workflow(user)

assert_trigger_state_audit(workflow.id, user_id, true, false, "disabled")

assert Repo.aggregate(
from(a in Audit, where: a.event in ["enabled", "disabled"]),
:count
) == 1
end

test "save_workflow/1 publishes event for updated Kafka triggers" do
kafka_configuration = build(:triggers_kafka_configuration)

Expand Down Expand Up @@ -676,4 +745,38 @@ defmodule Lightning.WorkflowsTest do
assert_received %KafkaTriggerUpdated{trigger_id: ^kafka_trigger_1_id}
end
end

defp assert_trigger_state_audit(
workflow_id,
user_id,
before_state,
after_state,
event
) do
audit =
from(a in Audit, where: a.event in ["enabled", "disabled"]) |> Repo.one!()

assert %{
event: ^event,
item_type: "workflow",
item_id: ^workflow_id,
actor_id: ^user_id,
changes: %{
before: %{"enabled" => ^before_state},
after: %{"enabled" => ^after_state}
}
} = audit
end

defp create_workflow(opts \\ []) do
enabled = Keyword.get(opts, :enabled, false)
trigger = build(:trigger, type: :cron, enabled: enabled)
job = build(:job)

build(:workflow)
|> with_job(job)
|> with_trigger(trigger)
|> with_edge({trigger, job})
|> insert()
end
end

0 comments on commit d4d52bf

Please sign in to comment.