Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: during live requests, the server returns a cursor for the client to use for cache-busting #1826

Merged
merged 13 commits into from
Oct 10, 2024
Merged
8 changes: 8 additions & 0 deletions .changeset/khaki-dolphins-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@electric-sql/client": patch
"@core/sync-service": patch
---

Fix inconsistencies in http proxies for caching live long-polling requests.

The server now returns a cursor for the client to use in requests to cache-bust any stale caches.
25 changes: 21 additions & 4 deletions packages/sync-service/lib/electric/plug/serve_shape_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ defmodule Electric.Plug.ServeShapePlug do
@up_to_date [Jason.encode!(%{headers: %{control: "up-to-date"}})]
@must_refetch Jason.encode!([%{headers: %{control: "must-refetch"}}])

defmodule TimeUtils do
@oct9th2024 DateTime.from_naive!(~N[2024-10-09 00:00:00], "Etc/UTC")
def seconds_since_oct9th_2024_next_interval(conn) do
long_poll_timeout = conn.assigns.config[:long_poll_timeout]
now = DateTime.utc_now()

diff_in_seconds = DateTime.diff(now, @oct9th2024, :second)
next_interval = ceil(diff_in_seconds / long_poll_timeout) * long_poll_timeout

next_interval
end
end

defmodule Params do
use Ecto.Schema
import Ecto.Changeset
Expand Down Expand Up @@ -331,16 +344,20 @@ defmodule Electric.Plug.ServeShapePlug do

defp put_resp_cache_headers(%Conn{assigns: %{config: config, live: live}} = conn, _) do
msfstef marked this conversation as resolved.
Show resolved Hide resolved
if live do
put_resp_header(
conn,
conn
|> put_resp_header(
"cache-control",
"max-age=5, stale-while-revalidate=5"
"public, max-age=5, stale-while-revalidate=5"
)
|> put_resp_header(
"electric-next-cursor",
TimeUtils.seconds_since_oct9th_2024_next_interval(conn) |> Integer.to_string()
)
else
put_resp_header(
conn,
"cache-control",
"max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}"
"public, max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}"
msfstef marked this conversation as resolved.
Show resolved Hide resolved
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
assert conn.status == 200

assert Plug.Conn.get_resp_header(conn, "cache-control") == [
"max-age=#{max_age}, stale-while-revalidate=#{stale_age}"
"public, max-age=#{max_age}, stale-while-revalidate=#{stale_age}"
]
end

Expand Down Expand Up @@ -381,7 +381,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
]

assert Plug.Conn.get_resp_header(conn, "cache-control") == [
"max-age=5, stale-while-revalidate=5"
"public, max-age=5, stale-while-revalidate=5"
]

assert Plug.Conn.get_resp_header(conn, "electric-chunk-last-offset") == [next_offset_str]
Expand Down Expand Up @@ -467,7 +467,7 @@ defmodule Electric.Plug.ServeShapePlugTest do
assert Jason.decode!(conn.resp_body) == [%{"headers" => %{"control" => "up-to-date"}}]

assert Plug.Conn.get_resp_header(conn, "cache-control") == [
"max-age=5, stale-while-revalidate=5"
"public, max-age=5, stale-while-revalidate=5"
]

assert Plug.Conn.get_resp_header(conn, "electric-chunk-up-to-date") == [""]
Expand Down
14 changes: 14 additions & 0 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import {
} from './fetch'
import {
CHUNK_LAST_OFFSET_HEADER,
LIVE_CACHE_BUSTER_HEADER,
LIVE_CACHE_BUSTER_QUERY_PARAM,
LIVE_QUERY_PARAM,
OFFSET_QUERY_PARAM,
SHAPE_ID_HEADER,
Expand Down Expand Up @@ -143,6 +145,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
>()

#lastOffset: Offset
#liveCacheBuster: string // Seconds since our Electric Epoch 😎
#lastSyncedAt?: number // unix time
#isUpToDate: boolean = false
#connected: boolean = false
Expand All @@ -153,6 +156,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
validateOptions(options)
this.options = { subscribe: true, ...options }
this.#lastOffset = this.options.offset ?? `-1`
this.#liveCacheBuster = ``
this.#shapeId = this.options.shapeId
this.#messageParser = new MessageParser<T>(options.parser)

Expand Down Expand Up @@ -197,6 +201,10 @@ export class ShapeStream<T extends Row<unknown> = Row>

if (this.#isUpToDate) {
fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`)
fetchUrl.searchParams.set(
LIVE_CACHE_BUSTER_QUERY_PARAM,
this.#liveCacheBuster
)
}

if (this.#shapeId) {
Expand Down Expand Up @@ -248,6 +256,11 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#lastOffset = lastOffset as Offset
}

const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER_HEADER)
if (liveCacheBuster) {
this.#liveCacheBuster = liveCacheBuster
}

const getSchema = (): Schema => {
const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER)
return schemaHeader ? JSON.parse(schemaHeader) : {}
Expand Down Expand Up @@ -376,6 +389,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
*/
#reset(shapeId?: string) {
this.#lastOffset = `-1`
this.#liveCacheBuster = ``
this.#shapeId = shapeId
this.#isUpToDate = false
this.#connected = false
Expand Down
2 changes: 2 additions & 0 deletions packages/typescript-client/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
export const SHAPE_ID_HEADER = `electric-shape-id`
export const LIVE_CACHE_BUSTER_HEADER = `electric-next-cursor`
export const LIVE_CACHE_BUSTER_QUERY_PARAM = `cursor`
export const CHUNK_LAST_OFFSET_HEADER = `electric-chunk-last-offset`
export const CHUNK_UP_TO_DATE_HEADER = `electric-chunk-up-to-date`
export const SHAPE_SCHEMA_HEADER = `electric-schema`
Expand Down
7 changes: 6 additions & 1 deletion packages/typescript-client/test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ describe(`HTTP Sync`, () => {
expect(urlsRequested[0].searchParams.has(`live`)).false
expect(urlsRequested[1].searchParams.get(`offset`)).not.toBe(`-1`)
expect(urlsRequested[1].searchParams.has(`live`)).true
expect(urlsRequested[1].searchParams.has(`cursor`)).true

// first request comes back immediately and is up to date, second one
// should hang while waiting for updates
Expand Down Expand Up @@ -539,7 +540,11 @@ describe(`HTTP Sync`, () => {
const cacheHeaders = res.headers.get(`cache-control`)
assert(cacheHeaders !== null, `Response should have cache-control header`)
const directives = parse(cacheHeaders)
expect(directives).toEqual({ 'max-age': 1, 'stale-while-revalidate': 3 })
expect(directives).toEqual({
public: true,
'max-age': 1,
'stale-while-revalidate': 3,
})
const etagHeader = res.headers.get(`etag`)
assert(etagHeader !== null, `Response should have etag header`)

Expand Down
2 changes: 1 addition & 1 deletion website/docs/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ HTTP/1.1 200 OK
date: Thu, 18 Jul 2024 10:49:12 GMT
content-length: 643
vary: accept-encoding
cache-control: max-age=60, stale-while-revalidate=300
cache-control: public, max-age=60, stale-while-revalidate=300
x-request-id: F-NJAXyulHAQP2MAAABN
access-control-allow-origin: *
access-control-expose-headers: *
Expand Down
15 changes: 11 additions & 4 deletions website/electric-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ paths:

This allows you to implement a long-polling strategy to consume
real-time updates.
- name: cursor
in: query
schema:
type: string
description: |-
This is a cursor generated by the server during live requests. It helps bust caches for
responses from previous long-polls.
- name: shape_id
in: query
schema:
Expand Down Expand Up @@ -127,7 +134,7 @@ paths:
cache-control:
schema:
type: string
example: "max-age=60, stale-while-revalidate=300"
example: "public, max-age=60, stale-while-revalidate=300"
KyleAMathews marked this conversation as resolved.
Show resolved Hide resolved
msfstef marked this conversation as resolved.
Show resolved Hide resolved
description: |-
Cache control header as a string of comma separated directives.

Expand All @@ -140,7 +147,7 @@ paths:
Etag header specifying the shape ID and offset for efficient caching.

In the format `{shape_id}:{start_offset}:{end_offset}`.
x-electric-chunk-last-offset:
electric-chunk-last-offset:
schema:
type: string
example: "26800584_4"
Expand All @@ -151,7 +158,7 @@ paths:
you have provided. This header simplifies client development by
avoiding the need to parse the last offset out of the stream of
log entries.
x-electric-shape-id:
electric-shape-id:
schema:
type: string
example: "3833821-1721812114261"
Expand All @@ -160,7 +167,7 @@ paths:

Must be provided as the `shape_id` parameter when making
subsequent requests where `offset` is not `-1`.
x-electric-schema:
electric-schema:
schema:
type: string
example: "{\"id\":{\"type\":\"int4\",\"dimensions\":0},\"title\":{\"type\":\"text\",\"dimensions\":0},\"status\":{\"type\":\"text\",\"dimensions\":0,\"max_length\":8}}"
Expand Down
Loading