Skip to content

Commit

Permalink
Insert all on a collection with the same timestamp (#2712)
Browse files Browse the repository at this point in the history
* Add all records with same timestamp

* Fix parallel collections put all benchmarking

* Replace Ecto update with CTE based SQL query

* Fix formatting

---------

Co-authored-by: Stuart Corbishley <[email protected]>
  • Loading branch information
jyeshe and stuartc authored Dec 4, 2024
1 parent 4cb69b8 commit bcd4fdb
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 39 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ and this project adheres to

### Changed

- Insert all on a collection with the same timestamp
[#2711](https://github.com/OpenFn/lightning/issues/2711)
- AI Assistant: Show disclaimer once every day per user
[#2481](https://github.com/OpenFn/lightning/issues/2481)
- AI Assistant: Scroll to new message when it arrives
Expand All @@ -56,9 +58,8 @@ and this project adheres to
- Allow filtering collection items by updated_before and updated_after.
[#2693](https://github.com/OpenFn/lightning/issues/2693)
- Add support for SMTP email configuration
[#2699](https://github.com/OpenFn/lightning/issues/2699)
⚠️️ Please note that `EMAIL_ADMIN` defaults to `[email protected]` in
production environments
[#2699](https://github.com/OpenFn/lightning/issues/2699) ⚠️️ Please note that
`EMAIL_ADMIN` defaults to `[email protected]` in production environments

### Fixed

Expand Down
11 changes: 5 additions & 6 deletions lib/lightning/collections.ex
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,10 @@ defmodule Lightning.Collections do
@spec put_all(Collection.t(), [{String.t(), String.t()}]) ::
{:ok, non_neg_integer()}
def put_all(%{id: collection_id}, kv_list) do
item_list =
Enum.with_index(kv_list, fn %{"key" => key, "value" => value},
unique_index ->
now = DateTime.add(DateTime.utc_now(), unique_index, :microsecond)
now = DateTime.utc_now()

item_list =
Enum.map(kv_list, fn %{"key" => key, "value" => value} ->
%{
collection_id: collection_id,
key: key,
Expand Down Expand Up @@ -214,12 +213,12 @@ defmodule Lightning.Collections do
defp all_query(collection_id, cursor, limit) do
Item
|> where([i], i.collection_id == ^collection_id)
|> order_by([i], asc: i.inserted_at)
|> order_by([i], asc: i.id)
|> limit(^limit)
|> then(fn query ->
case cursor do
nil -> query
ts_cursor -> where(query, [i], i.inserted_at > ^ts_cursor)
ts_cursor -> where(query, [i], i.id > ^ts_cursor)
end
end)
end
Expand Down
4 changes: 3 additions & 1 deletion lib/lightning/collections/item.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ defmodule Lightning.Collections.Item do

@primary_key false
schema "collection_items" do
field :id, :integer, primary_key: true
belongs_to :collection, Lightning.Collections.Collection, primary_key: true
field :key, :string, primary_key: true

field :key, :string
field :value, :string

timestamps(type: :utc_datetime_usec)
Expand Down
12 changes: 6 additions & 6 deletions lib/lightning_web/controllers/collections_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ defmodule LightningWeb.CollectionsController do

case Collections.get_all(collection, filters, key_pattern) do
[] -> nil
list -> {list, List.last(list).inserted_at}
list -> {list, List.last(list).id}
end
end)
|> Stream.flat_map(& &1)
Expand Down Expand Up @@ -191,8 +191,8 @@ defmodule LightningWeb.CollectionsController do

defp validate_cursor(cursor) do
with {:ok, decoded} <- Base.decode64(cursor),
{:ok, datetime, _off} <- DateTime.from_iso8601(decoded) do
{:ok, datetime}
{id, ""} <- Integer.parse(decoded) do
{:ok, id}
end
end

Expand Down Expand Up @@ -226,7 +226,7 @@ defmodule LightningWeb.CollectionsController do
defp finish_chunking(%ChunkAcc{conn: conn, cursor_data: cursor_data}) do
cursor =
if cursor_data do
cursor_data |> DateTime.to_iso8601() |> Base.encode64()
Base.encode64(to_string(cursor_data))
end

Plug.Conn.chunk(conn, ~S(], "cursor":) <> Jason.encode!(cursor) <> "}")
Expand All @@ -247,7 +247,7 @@ defmodule LightningWeb.CollectionsController do
%ChunkAcc{count: sent_count, last: last, limit: limit} = acc
)
when sent_count == limit do
{:halt, %ChunkAcc{acc | cursor_data: last.inserted_at}}
{:halt, %ChunkAcc{acc | cursor_data: last.id}}
end

defp send_chunk({chunk_items, _i}, acc) do
Expand All @@ -271,7 +271,7 @@ defmodule LightningWeb.CollectionsController do

cursor_data =
if taken_count > 0 and length(chunk_items) > taken_count do
last.inserted_at
last.id
end

acc =
Expand Down
27 changes: 15 additions & 12 deletions priv/bench/collections_put_all.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ IO.puts "### Indexes on collection_items table"
Enum.each(rows, &IO.puts(Enum.join(&1, ":\n")))

keys_count = 5_000
rounds = 300
# keys_count = 10_000
# rounds = 1

Repo.delete_all(Collections.Collection)

project =
with nil <- Repo.get_by(Projects.Project, name: "bench") do
user = Repo.get_by(Lightning.Accounts.User, email: "[email protected]")
user = Repo.get_by(Lightning.Accounts.User, email: "[email protected]") || raise "This benchmark requires demo/known user"
{:ok, project} = Projects.create_project(%{name: "bench", project_users: [%{user_id: user.id, role: :owner}]})
project
end
Expand Down Expand Up @@ -49,7 +52,6 @@ record2 = fn prefix, i ->
}
end

rounds = 300
samples1 =
1..keys_count * rounds
|> Enum.map(fn i -> record1.("keyA", i) end)
Expand All @@ -60,21 +62,22 @@ samples2 =
|> Enum.map(fn i -> record2.("keyB", i) end)
|> Enum.chunk_every(keys_count)

IO.puts("\n### Inserting #{rounds} rounds of 2x5000 with put_all...")
IO.puts("\n### Inserting #{rounds} rounds of 2x#{keys_count} with put_all...")

durations =
Enum.zip(samples1, samples2)
|> Enum.with_index(fn {sample1, sample2}, i ->
:timer.tc(fn ->
{:ok, _n} = Collections.put_all(collection1, sample1)
{:ok, _n} = Collections.put_all(collection2, sample2)
end)
|> then(fn {duration, _res} ->
duration_ms = div(duration, 1_000)
IO.puts("[#{i}] elapsed time: #{duration_ms}ms")
|> Enum.flat_map(fn {sample1, sample2} -> [{sample1, collection1}, {sample2, collection2}] end)
|> Task.async_stream(fn {sample, collection} ->
{duration, _res} = :timer.tc(fn ->
{:ok, _n} = Collections.put_all(collection, sample)
end)

div(duration, 1_000)
end, max_concurrency: 2, timeout: :infinity)
|> Enum.map(fn {:ok, duration_ms} ->
IO.puts("elapsed time: #{duration_ms}ms")
duration_ms
end)
end)

IO.puts "Average: #{Statistics.mean(durations)}ms"
IO.puts "Std Deviation: #{Statistics.stdev(durations)}ms"
11 changes: 11 additions & 0 deletions priv/repo/migrations/20241121160001_create_collection_items_id.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Lightning.Repo.Migrations.CreateCollectionItemsSerialId do
use Ecto.Migration

def change do
alter table(:collection_items) do
add :id, :bigint
end

create unique_index(:collection_items, [:collection_id, :id])
end
end
22 changes: 22 additions & 0 deletions priv/repo/migrations/20241121160002_set_collection_items_id.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Lightning.Repo.Migrations.SetCollectionItemsSerialId do
use Ecto.Migration

def up do
execute """
WITH ordered_rows AS (
SELECT collection_id, key, row_number() OVER () AS rn
FROM collection_items
ORDER BY inserted_at ASC
)
UPDATE collection_items
SET id = ordered_rows.rn
FROM ordered_rows
WHERE collection_items.collection_id = ordered_rows.collection_id
AND collection_items.key = ordered_rows.key;
"""
end

def down do
:ok
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule Lightning.Repo.Migrations.CollectionItemsSequence do
use Ecto.Migration
@disable_migration_lock true
@disable_ddl_transaction true

def up do
execute("CREATE SEQUENCE collection_items_id_seq")

alter table(:collection_items) do
modify :id, :bigint,
null: false,
default: fragment("nextval('collection_items_id_seq'::regclass)")
end

execute("SELECT setval('collection_items_id_seq'::regclass, MAX(id)) FROM collection_items")
end

def down do
alter table(:collection_items) do
modify :id, :bigint, null: true, default: nil
end

execute("DROP SEQUENCE collection_items_id_seq")
end
end
4 changes: 2 additions & 2 deletions test/lightning/collections_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ defmodule Lightning.CollectionsTest do
)
end)

%{inserted_at: cursor} = Enum.at(items, 4)
%{id: cursor} = Enum.at(items, 4)

assert Collections.get_all(collection, cursor: cursor, limit: 50)
|> Enum.count() == 30 - (4 + 1)
Expand Down Expand Up @@ -187,7 +187,7 @@ defmodule Lightning.CollectionsTest do
)
end)

%{inserted_at: cursor} = Enum.at(items, 9)
%{id: cursor} = Enum.at(items, 9)

insert(:collection_item, key: "rkeyB", collection: collection)

Expand Down
41 changes: 33 additions & 8 deletions test/lightning_web/collections_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,33 @@ defmodule LightningWeb.API.CollectionsControllerTest do
end

describe "POST /collections/:name" do
test "inserts multiple items with same timestamp", %{conn: conn} do
user = insert(:user)

project =
insert(:project, project_users: [%{user: user}])

collection = insert(:collection, project: project)

token = Lightning.Accounts.generate_api_token(user)

conn = assign_bearer(conn, token)

assert %{"upserted" => 1_000, "error" => nil} =
post(conn, ~p"/collections/#{collection.name}", %{
items:
Enum.map(1..1_000, &%{key: "foo#{&1}", value: "bar#{&1}"})
})
|> json_response(200)

assert %{"items" => items, "cursor" => nil} =
get(conn, ~p"/collections/#{collection.name}", limit: 1_000)
|> json_response(200)

assert MapSet.new(items, & &1["created"]) |> MapSet.size() == 1
assert Enum.count(items) == 1_000
end

test "upserted multiple items", %{conn: conn} do
user = insert(:user)

Expand Down Expand Up @@ -706,8 +733,7 @@ defmodule LightningWeb.API.CollectionsControllerTest do

assert json_response(conn, 200) == %{
"items" => expected_items,
"cursor" =>
Base.encode64(DateTime.to_iso8601(last_item.inserted_at))
"cursor" => Base.encode64(to_string(last_item.id))
}

# Test for the existence of a cursor when the limit is less than the
Expand All @@ -730,8 +756,7 @@ defmodule LightningWeb.API.CollectionsControllerTest do

assert json_response(conn, 200) == %{
"items" => expected_items,
"cursor" =>
Base.encode64(DateTime.to_iso8601(last_item.inserted_at))
"cursor" => Base.encode64(to_string(last_item.id))
}

# Request everything, shouldn't be getting a cursor
Expand Down Expand Up @@ -777,7 +802,7 @@ defmodule LightningWeb.API.CollectionsControllerTest do
expected_items
|> Enum.map(&encode_decode/1)

assert cursor == Base.encode64(DateTime.to_iso8601(last_item.inserted_at))
assert cursor == Base.encode64(to_string(last_item.id))
end

test "up to the limit from a cursor returning a cursor", %{conn: conn} do
Expand Down Expand Up @@ -821,14 +846,14 @@ defmodule LightningWeb.API.CollectionsControllerTest do
"cursor" => cursor
} = json_response(conn, 200)

%{inserted_at: last_inserted_at} =
%{id: id} =
Repo.get_by(Collections.Item,
collection_id: collection.id,
key: List.last(expected_items)["key"]
)

assert {:ok, ^last_inserted_at, 0} =
cursor |> Base.decode64!() |> DateTime.from_iso8601()
assert {^id, ""} =
cursor |> Base.decode64!() |> Integer.parse()
end

test "up exactly to the limit from a cursor", %{conn: conn} do
Expand Down
3 changes: 2 additions & 1 deletion test/support/factories.ex
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,10 @@ defmodule Lightning.Factories do

def collection_item_factory do
%Lightning.Collections.Item{
collection: build(:collection),
id: sequence(:id, & &1),
key: sequence(:key, &"key-#{&1}"),
value: sequence(:value, &"value-#{&1}"),
collection: build(:collection),
inserted_at:
sequence(
:inserted_at,
Expand Down

0 comments on commit bcd4fdb

Please sign in to comment.