diff --git a/.changeset/khaki-dolphins-promise.md b/.changeset/khaki-dolphins-promise.md new file mode 100644 index 0000000000..644476a491 --- /dev/null +++ b/.changeset/khaki-dolphins-promise.md @@ -0,0 +1,8 @@ +--- +"@electric-sql/client": patch +"@core/sync-service": patch +--- + +Fix inconsistencies in http proxies for caching live long-polling requests. + +The server now returns a cursor for the client to use in requests to cache-bust any stale caches. diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index 453eb1e84a..1a4e5df27f 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -23,6 +23,19 @@ defmodule Electric.Plug.ServeShapePlug do @up_to_date [Jason.encode!(%{headers: %{control: "up-to-date"}})] @must_refetch Jason.encode!([%{headers: %{control: "must-refetch"}}]) + defmodule TimeUtils do + @oct9th2024 DateTime.from_naive!(~N[2024-10-09 00:00:00], "Etc/UTC") + def seconds_since_oct9th_2024_next_interval(conn) do + long_poll_timeout = conn.assigns.config[:long_poll_timeout] + now = DateTime.utc_now() + + diff_in_seconds = DateTime.diff(now, @oct9th2024, :second) + next_interval = ceil(diff_in_seconds / long_poll_timeout) * long_poll_timeout + + next_interval + end + end + defmodule Params do use Ecto.Schema import Ecto.Changeset @@ -331,16 +344,20 @@ defmodule Electric.Plug.ServeShapePlug do defp put_resp_cache_headers(%Conn{assigns: %{config: config, live: live}} = conn, _) do if live do - put_resp_header( - conn, + conn + |> put_resp_header( "cache-control", - "max-age=5, stale-while-revalidate=5" + "public, max-age=5, stale-while-revalidate=5" + ) + |> put_resp_header( + "electric-next-cursor", + TimeUtils.seconds_since_oct9th_2024_next_interval(conn) |> Integer.to_string() ) else put_resp_header( conn, "cache-control", - "max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}" + "public, max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}" ) end end diff --git a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs index cb6f172c7f..13403918a2 100644 --- a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs +++ b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs @@ -203,7 +203,7 @@ defmodule Electric.Plug.ServeShapePlugTest do assert conn.status == 200 assert Plug.Conn.get_resp_header(conn, "cache-control") == [ - "max-age=#{max_age}, stale-while-revalidate=#{stale_age}" + "public, max-age=#{max_age}, stale-while-revalidate=#{stale_age}" ] end @@ -381,7 +381,7 @@ defmodule Electric.Plug.ServeShapePlugTest do ] assert Plug.Conn.get_resp_header(conn, "cache-control") == [ - "max-age=5, stale-while-revalidate=5" + "public, max-age=5, stale-while-revalidate=5" ] assert Plug.Conn.get_resp_header(conn, "electric-chunk-last-offset") == [next_offset_str] @@ -467,7 +467,7 @@ defmodule Electric.Plug.ServeShapePlugTest do assert Jason.decode!(conn.resp_body) == [%{"headers" => %{"control" => "up-to-date"}}] assert Plug.Conn.get_resp_header(conn, "cache-control") == [ - "max-age=5, stale-while-revalidate=5" + "public, max-age=5, stale-while-revalidate=5" ] assert Plug.Conn.get_resp_header(conn, "electric-chunk-up-to-date") == [""] diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 32a62daa06..7a1a4e50da 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -17,6 +17,8 @@ import { } from './fetch' import { CHUNK_LAST_OFFSET_HEADER, + LIVE_CACHE_BUSTER_HEADER, + LIVE_CACHE_BUSTER_QUERY_PARAM, LIVE_QUERY_PARAM, OFFSET_QUERY_PARAM, SHAPE_ID_HEADER, @@ -143,6 +145,7 @@ export class ShapeStream = Row> >() #lastOffset: Offset + #liveCacheBuster: string // Seconds since our Electric Epoch 😎 #lastSyncedAt?: number // unix time #isUpToDate: boolean = false #connected: boolean = false @@ -153,6 +156,7 @@ export class ShapeStream = Row> validateOptions(options) this.options = { subscribe: true, ...options } this.#lastOffset = this.options.offset ?? `-1` + this.#liveCacheBuster = `` this.#shapeId = this.options.shapeId this.#messageParser = new MessageParser(options.parser) @@ -197,6 +201,10 @@ export class ShapeStream = Row> if (this.#isUpToDate) { fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`) + fetchUrl.searchParams.set( + LIVE_CACHE_BUSTER_QUERY_PARAM, + this.#liveCacheBuster + ) } if (this.#shapeId) { @@ -248,6 +256,11 @@ export class ShapeStream = Row> this.#lastOffset = lastOffset as Offset } + const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER_HEADER) + if (liveCacheBuster) { + this.#liveCacheBuster = liveCacheBuster + } + const getSchema = (): Schema => { const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER) return schemaHeader ? JSON.parse(schemaHeader) : {} @@ -376,6 +389,7 @@ export class ShapeStream = Row> */ #reset(shapeId?: string) { this.#lastOffset = `-1` + this.#liveCacheBuster = `` this.#shapeId = shapeId this.#isUpToDate = false this.#connected = false diff --git a/packages/typescript-client/src/constants.ts b/packages/typescript-client/src/constants.ts index d5b1a53121..bfa2676ee0 100644 --- a/packages/typescript-client/src/constants.ts +++ b/packages/typescript-client/src/constants.ts @@ -1,4 +1,6 @@ export const SHAPE_ID_HEADER = `electric-shape-id` +export const LIVE_CACHE_BUSTER_HEADER = `electric-next-cursor` +export const LIVE_CACHE_BUSTER_QUERY_PARAM = `cursor` export const CHUNK_LAST_OFFSET_HEADER = `electric-chunk-last-offset` export const CHUNK_UP_TO_DATE_HEADER = `electric-chunk-up-to-date` export const SHAPE_SCHEMA_HEADER = `electric-schema` diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index 288ebcf531..c61e242d11 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -98,6 +98,7 @@ describe(`HTTP Sync`, () => { expect(urlsRequested[0].searchParams.has(`live`)).false expect(urlsRequested[1].searchParams.get(`offset`)).not.toBe(`-1`) expect(urlsRequested[1].searchParams.has(`live`)).true + expect(urlsRequested[1].searchParams.has(`cursor`)).true // first request comes back immediately and is up to date, second one // should hang while waiting for updates @@ -539,7 +540,11 @@ describe(`HTTP Sync`, () => { const cacheHeaders = res.headers.get(`cache-control`) assert(cacheHeaders !== null, `Response should have cache-control header`) const directives = parse(cacheHeaders) - expect(directives).toEqual({ 'max-age': 1, 'stale-while-revalidate': 3 }) + expect(directives).toEqual({ + public: true, + 'max-age': 1, + 'stale-while-revalidate': 3, + }) const etagHeader = res.headers.get(`etag`) assert(etagHeader !== null, `Response should have etag header`) diff --git a/website/docs/quickstart.md b/website/docs/quickstart.md index c43eb0629a..a4b713a1dc 100644 --- a/website/docs/quickstart.md +++ b/website/docs/quickstart.md @@ -109,7 +109,7 @@ HTTP/1.1 200 OK date: Thu, 18 Jul 2024 10:49:12 GMT content-length: 643 vary: accept-encoding -cache-control: max-age=60, stale-while-revalidate=300 +cache-control: public, max-age=60, stale-while-revalidate=300 x-request-id: F-NJAXyulHAQP2MAAABN access-control-allow-origin: * access-control-expose-headers: * diff --git a/website/electric-api.yaml b/website/electric-api.yaml index 4c5d924f87..10881749fe 100644 --- a/website/electric-api.yaml +++ b/website/electric-api.yaml @@ -88,6 +88,13 @@ paths: This allows you to implement a long-polling strategy to consume real-time updates. + - name: cursor + in: query + schema: + type: string + description: |- + This is a cursor generated by the server during live requests. It helps bust caches for + responses from previous long-polls. - name: shape_id in: query schema: @@ -127,7 +134,7 @@ paths: cache-control: schema: type: string - example: "max-age=60, stale-while-revalidate=300" + example: "public, max-age=60, stale-while-revalidate=300" description: |- Cache control header as a string of comma separated directives. @@ -140,7 +147,7 @@ paths: Etag header specifying the shape ID and offset for efficient caching. In the format `{shape_id}:{start_offset}:{end_offset}`. - x-electric-chunk-last-offset: + electric-chunk-last-offset: schema: type: string example: "26800584_4" @@ -151,7 +158,7 @@ paths: you have provided. This header simplifies client development by avoiding the need to parse the last offset out of the stream of log entries. - x-electric-shape-id: + electric-shape-id: schema: type: string example: "3833821-1721812114261" @@ -160,7 +167,7 @@ paths: Must be provided as the `shape_id` parameter when making subsequent requests where `offset` is not `-1`. - x-electric-schema: + electric-schema: schema: type: string example: "{\"id\":{\"type\":\"int4\",\"dimensions\":0},\"title\":{\"type\":\"text\",\"dimensions\":0},\"status\":{\"type\":\"text\",\"dimensions\":0,\"max_length\":8}}"