From 218e4665cb1380506398c9a7e7bcd60ef6a4762f Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 9 Oct 2024 13:48:08 -0600 Subject: [PATCH 01/12] fix: during live requests, the server returns a cursor for the client to use for cache-busting --- .../lib/electric/plug/serve_shape_plug.ex | 23 ++++++++++++++++--- packages/typescript-client/src/client.ts | 10 ++++++++ packages/typescript-client/src/constants.ts | 1 + .../test/integration.test.ts | 1 + 4 files changed, 32 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index 453eb1e84a..e5c5b2b4f8 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -23,6 +23,18 @@ defmodule Electric.Plug.ServeShapePlug do @up_to_date [Jason.encode!(%{headers: %{control: "up-to-date"}})] @must_refetch Jason.encode!([%{headers: %{control: "must-refetch"}}]) +defmodule TimeUtils do + def seconds_since_oct9th_2024_next_interval do + oct9th2024 = DateTime.from_naive!(~N[2024-10-09 00:00:00], "Etc/UTC") + now = DateTime.utc_now() + + diff_in_seconds = DateTime.diff(now, oct9th2024, :second) + next_interval = ceil(diff_in_seconds / 20) * 20 + + next_interval + end +end + defmodule Params do use Ecto.Schema import Ecto.Changeset @@ -329,12 +341,17 @@ defmodule Electric.Plug.ServeShapePlug do end end + defp put_resp_cache_headers(%Conn{assigns: %{config: config, live: live}} = conn, _) do if live do - put_resp_header( - conn, + conn + |> put_resp_header( "cache-control", - "max-age=5, stale-while-revalidate=5" + "public, max-age=5, stale-while-revalidate=5" + ) + |> put_resp_header( + "electric-next-cursor", + TimeUtils.seconds_since_oct9th_2024_next_interval() |> Integer.to_string() ) else put_resp_header( diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 32a62daa06..fddf8c6e00 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -17,6 +17,7 @@ import { } from './fetch' import { CHUNK_LAST_OFFSET_HEADER, + LIVE_NEXT_CURSOR, LIVE_QUERY_PARAM, OFFSET_QUERY_PARAM, SHAPE_ID_HEADER, @@ -143,6 +144,7 @@ export class ShapeStream = Row> >() #lastOffset: Offset + #nextLiveCursor: string // Seconds since our Electric Epoch 😎 #lastSyncedAt?: number // unix time #isUpToDate: boolean = false #connected: boolean = false @@ -153,6 +155,7 @@ export class ShapeStream = Row> validateOptions(options) this.options = { subscribe: true, ...options } this.#lastOffset = this.options.offset ?? `-1` + this.#nextLiveCursor = `` this.#shapeId = this.options.shapeId this.#messageParser = new MessageParser(options.parser) @@ -197,6 +200,7 @@ export class ShapeStream = Row> if (this.#isUpToDate) { fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`) + fetchUrl.searchParams.set(`cursor`, this.#nextLiveCursor) } if (this.#shapeId) { @@ -248,6 +252,11 @@ export class ShapeStream = Row> this.#lastOffset = lastOffset as Offset } + const nextLiveCursor = headers.get(LIVE_NEXT_CURSOR) + if (nextLiveCursor) { + this.#nextLiveCursor = nextLiveCursor + } + const getSchema = (): Schema => { const schemaHeader = headers.get(SHAPE_SCHEMA_HEADER) return schemaHeader ? JSON.parse(schemaHeader) : {} @@ -376,6 +385,7 @@ export class ShapeStream = Row> */ #reset(shapeId?: string) { this.#lastOffset = `-1` + this.#nextLiveCursor = `` this.#shapeId = shapeId this.#isUpToDate = false this.#connected = false diff --git a/packages/typescript-client/src/constants.ts b/packages/typescript-client/src/constants.ts index d5b1a53121..f3b2f17abf 100644 --- a/packages/typescript-client/src/constants.ts +++ b/packages/typescript-client/src/constants.ts @@ -1,4 +1,5 @@ export const SHAPE_ID_HEADER = `electric-shape-id` +export const LIVE_NEXT_CURSOR = `electric-next-cursor` export const CHUNK_LAST_OFFSET_HEADER = `electric-chunk-last-offset` export const CHUNK_UP_TO_DATE_HEADER = `electric-chunk-up-to-date` export const SHAPE_SCHEMA_HEADER = `electric-schema` diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index 288ebcf531..e4fa8e006f 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -98,6 +98,7 @@ describe(`HTTP Sync`, () => { expect(urlsRequested[0].searchParams.has(`live`)).false expect(urlsRequested[1].searchParams.get(`offset`)).not.toBe(`-1`) expect(urlsRequested[1].searchParams.has(`live`)).true + expect(urlsRequested[1].searchParams.has(`cursor`)).true // first request comes back immediately and is up to date, second one // should hang while waiting for updates From 12c2b6777e7c92d29521ad9d7ff24d42f9cae45e Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 9 Oct 2024 14:06:01 -0600 Subject: [PATCH 02/12] Add changeset --- .changeset/khaki-dolphins-promise.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changeset/khaki-dolphins-promise.md diff --git a/.changeset/khaki-dolphins-promise.md b/.changeset/khaki-dolphins-promise.md new file mode 100644 index 0000000000..644476a491 --- /dev/null +++ b/.changeset/khaki-dolphins-promise.md @@ -0,0 +1,8 @@ +--- +"@electric-sql/client": patch +"@core/sync-service": patch +--- + +Fix inconsistencies in http proxies for caching live long-polling requests. + +The server now returns a cursor for the client to use in requests to cache-bust any stale caches. From fc04d63287cb6d9f8ce869ee46e9eb58586181a7 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 9 Oct 2024 14:06:56 -0600 Subject: [PATCH 03/12] Fix formatting --- .../lib/electric/plug/serve_shape_plug.ex | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index e5c5b2b4f8..967346d17f 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -23,17 +23,17 @@ defmodule Electric.Plug.ServeShapePlug do @up_to_date [Jason.encode!(%{headers: %{control: "up-to-date"}})] @must_refetch Jason.encode!([%{headers: %{control: "must-refetch"}}]) -defmodule TimeUtils do - def seconds_since_oct9th_2024_next_interval do - oct9th2024 = DateTime.from_naive!(~N[2024-10-09 00:00:00], "Etc/UTC") - now = DateTime.utc_now() + defmodule TimeUtils do + def seconds_since_oct9th_2024_next_interval do + oct9th2024 = DateTime.from_naive!(~N[2024-10-09 00:00:00], "Etc/UTC") + now = DateTime.utc_now() - diff_in_seconds = DateTime.diff(now, oct9th2024, :second) - next_interval = ceil(diff_in_seconds / 20) * 20 + diff_in_seconds = DateTime.diff(now, oct9th2024, :second) + next_interval = ceil(diff_in_seconds / 20) * 20 - next_interval + next_interval + end end -end defmodule Params do use Ecto.Schema @@ -341,7 +341,6 @@ end end end - defp put_resp_cache_headers(%Conn{assigns: %{config: config, live: live}} = conn, _) do if live do conn From a1b2674c0b43aa8ba45d96da6271741babe2f813 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 9 Oct 2024 14:09:58 -0600 Subject: [PATCH 04/12] fix tests --- .../test/electric/plug/serve_shape_plug_test.exs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs index cb6f172c7f..13403918a2 100644 --- a/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs +++ b/packages/sync-service/test/electric/plug/serve_shape_plug_test.exs @@ -203,7 +203,7 @@ defmodule Electric.Plug.ServeShapePlugTest do assert conn.status == 200 assert Plug.Conn.get_resp_header(conn, "cache-control") == [ - "max-age=#{max_age}, stale-while-revalidate=#{stale_age}" + "public, max-age=#{max_age}, stale-while-revalidate=#{stale_age}" ] end @@ -381,7 +381,7 @@ defmodule Electric.Plug.ServeShapePlugTest do ] assert Plug.Conn.get_resp_header(conn, "cache-control") == [ - "max-age=5, stale-while-revalidate=5" + "public, max-age=5, stale-while-revalidate=5" ] assert Plug.Conn.get_resp_header(conn, "electric-chunk-last-offset") == [next_offset_str] @@ -467,7 +467,7 @@ defmodule Electric.Plug.ServeShapePlugTest do assert Jason.decode!(conn.resp_body) == [%{"headers" => %{"control" => "up-to-date"}}] assert Plug.Conn.get_resp_header(conn, "cache-control") == [ - "max-age=5, stale-while-revalidate=5" + "public, max-age=5, stale-while-revalidate=5" ] assert Plug.Conn.get_resp_header(conn, "electric-chunk-up-to-date") == [""] From 026fb5aa912c4dc0cfca9173ba19dd610150a126 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 9 Oct 2024 14:17:26 -0600 Subject: [PATCH 05/12] Add public a few more places --- packages/sync-service/lib/electric/plug/serve_shape_plug.ex | 2 +- website/docs/quickstart.md | 2 +- website/electric-api.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index 967346d17f..3d2ad53dc4 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -356,7 +356,7 @@ defmodule Electric.Plug.ServeShapePlug do put_resp_header( conn, "cache-control", - "max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}" + "public, max-age=#{config[:max_age]}, stale-while-revalidate=#{config[:stale_age]}" ) end end diff --git a/website/docs/quickstart.md b/website/docs/quickstart.md index c43eb0629a..a4b713a1dc 100644 --- a/website/docs/quickstart.md +++ b/website/docs/quickstart.md @@ -109,7 +109,7 @@ HTTP/1.1 200 OK date: Thu, 18 Jul 2024 10:49:12 GMT content-length: 643 vary: accept-encoding -cache-control: max-age=60, stale-while-revalidate=300 +cache-control: public, max-age=60, stale-while-revalidate=300 x-request-id: F-NJAXyulHAQP2MAAABN access-control-allow-origin: * access-control-expose-headers: * diff --git a/website/electric-api.yaml b/website/electric-api.yaml index 4c5d924f87..de194e97fc 100644 --- a/website/electric-api.yaml +++ b/website/electric-api.yaml @@ -127,7 +127,7 @@ paths: cache-control: schema: type: string - example: "max-age=60, stale-while-revalidate=300" + example: "public, max-age=60, stale-while-revalidate=300" description: |- Cache control header as a string of comma separated directives. From e774600ab935711d58ec76ca06ae64a64b7419b1 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 9 Oct 2024 14:21:51 -0600 Subject: [PATCH 06/12] missing public --- packages/typescript-client/test/integration.test.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/typescript-client/test/integration.test.ts b/packages/typescript-client/test/integration.test.ts index e4fa8e006f..c61e242d11 100644 --- a/packages/typescript-client/test/integration.test.ts +++ b/packages/typescript-client/test/integration.test.ts @@ -540,7 +540,11 @@ describe(`HTTP Sync`, () => { const cacheHeaders = res.headers.get(`cache-control`) assert(cacheHeaders !== null, `Response should have cache-control header`) const directives = parse(cacheHeaders) - expect(directives).toEqual({ 'max-age': 1, 'stale-while-revalidate': 3 }) + expect(directives).toEqual({ + public: true, + 'max-age': 1, + 'stale-while-revalidate': 3, + }) const etagHeader = res.headers.get(`etag`) assert(etagHeader !== null, `Response should have etag header`) From f22b278f3725e9441ca85c4390ab6f826bf38dc1 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Wed, 9 Oct 2024 14:42:49 -0600 Subject: [PATCH 07/12] Update openapi spec --- website/electric-api.yaml | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/website/electric-api.yaml b/website/electric-api.yaml index de194e97fc..10881749fe 100644 --- a/website/electric-api.yaml +++ b/website/electric-api.yaml @@ -88,6 +88,13 @@ paths: This allows you to implement a long-polling strategy to consume real-time updates. + - name: cursor + in: query + schema: + type: string + description: |- + This is a cursor generated by the server during live requests. It helps bust caches for + responses from previous long-polls. - name: shape_id in: query schema: @@ -140,7 +147,7 @@ paths: Etag header specifying the shape ID and offset for efficient caching. In the format `{shape_id}:{start_offset}:{end_offset}`. - x-electric-chunk-last-offset: + electric-chunk-last-offset: schema: type: string example: "26800584_4" @@ -151,7 +158,7 @@ paths: you have provided. This header simplifies client development by avoiding the need to parse the last offset out of the stream of log entries. - x-electric-shape-id: + electric-shape-id: schema: type: string example: "3833821-1721812114261" @@ -160,7 +167,7 @@ paths: Must be provided as the `shape_id` parameter when making subsequent requests where `offset` is not `-1`. - x-electric-schema: + electric-schema: schema: type: string example: "{\"id\":{\"type\":\"int4\",\"dimensions\":0},\"title\":{\"type\":\"text\",\"dimensions\":0},\"status\":{\"type\":\"text\",\"dimensions\":0,\"max_length\":8}}" From 66b3182695fe8cb19234b9302ae6273a5ee410c0 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 10 Oct 2024 06:11:57 -0700 Subject: [PATCH 08/12] Update packages/typescript-client/src/constants.ts Co-authored-by: Stefanos Mousafeiris --- packages/typescript-client/src/constants.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/typescript-client/src/constants.ts b/packages/typescript-client/src/constants.ts index f3b2f17abf..cdd2ee936e 100644 --- a/packages/typescript-client/src/constants.ts +++ b/packages/typescript-client/src/constants.ts @@ -1,5 +1,6 @@ export const SHAPE_ID_HEADER = `electric-shape-id` -export const LIVE_NEXT_CURSOR = `electric-next-cursor` +export const LIVE_NEXT_CURSOR_HEADER = `electric-next-cursor` +export const LIVE_NEXT_CURSOR_QUERY_PARAM = `cursor` export const CHUNK_LAST_OFFSET_HEADER = `electric-chunk-last-offset` export const CHUNK_UP_TO_DATE_HEADER = `electric-chunk-up-to-date` export const SHAPE_SCHEMA_HEADER = `electric-schema` From 1585b720afd98850b76788850b6533f8c9b31a7f Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 10 Oct 2024 06:12:17 -0700 Subject: [PATCH 09/12] Update packages/typescript-client/src/client.ts Co-authored-by: Stefanos Mousafeiris --- packages/typescript-client/src/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index fddf8c6e00..276f5df773 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -200,7 +200,7 @@ export class ShapeStream = Row> if (this.#isUpToDate) { fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`) - fetchUrl.searchParams.set(`cursor`, this.#nextLiveCursor) + fetchUrl.searchParams.set(LIVE_NEXT_CURSOR_QUERY_PARAM, this.#nextLiveCursor) } if (this.#shapeId) { From 84dcc3c62314d82cf9b334b2c1cc159563795313 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 10 Oct 2024 09:20:31 -0600 Subject: [PATCH 10/12] use config value for long poll timeout & cache creation of epoch date --- .../lib/electric/plug/serve_shape_plug.ex | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex index 3d2ad53dc4..1a4e5df27f 100644 --- a/packages/sync-service/lib/electric/plug/serve_shape_plug.ex +++ b/packages/sync-service/lib/electric/plug/serve_shape_plug.ex @@ -24,12 +24,13 @@ defmodule Electric.Plug.ServeShapePlug do @must_refetch Jason.encode!([%{headers: %{control: "must-refetch"}}]) defmodule TimeUtils do - def seconds_since_oct9th_2024_next_interval do - oct9th2024 = DateTime.from_naive!(~N[2024-10-09 00:00:00], "Etc/UTC") + @oct9th2024 DateTime.from_naive!(~N[2024-10-09 00:00:00], "Etc/UTC") + def seconds_since_oct9th_2024_next_interval(conn) do + long_poll_timeout = conn.assigns.config[:long_poll_timeout] now = DateTime.utc_now() - diff_in_seconds = DateTime.diff(now, oct9th2024, :second) - next_interval = ceil(diff_in_seconds / 20) * 20 + diff_in_seconds = DateTime.diff(now, @oct9th2024, :second) + next_interval = ceil(diff_in_seconds / long_poll_timeout) * long_poll_timeout next_interval end @@ -350,7 +351,7 @@ defmodule Electric.Plug.ServeShapePlug do ) |> put_resp_header( "electric-next-cursor", - TimeUtils.seconds_since_oct9th_2024_next_interval() |> Integer.to_string() + TimeUtils.seconds_since_oct9th_2024_next_interval(conn) |> Integer.to_string() ) else put_resp_header( From f030e5b979abacc86334bf275509754a28900643 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 10 Oct 2024 09:24:24 -0600 Subject: [PATCH 11/12] Improve name in client for cache buster --- packages/typescript-client/src/client.ts | 19 +++++++++++-------- packages/typescript-client/src/constants.ts | 4 ++-- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 276f5df773..9ee246ec55 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -17,7 +17,7 @@ import { } from './fetch' import { CHUNK_LAST_OFFSET_HEADER, - LIVE_NEXT_CURSOR, + LIVE_CACHE_BUSTER, LIVE_QUERY_PARAM, OFFSET_QUERY_PARAM, SHAPE_ID_HEADER, @@ -144,7 +144,7 @@ export class ShapeStream = Row> >() #lastOffset: Offset - #nextLiveCursor: string // Seconds since our Electric Epoch 😎 + #liveCacheBuster: string // Seconds since our Electric Epoch 😎 #lastSyncedAt?: number // unix time #isUpToDate: boolean = false #connected: boolean = false @@ -155,7 +155,7 @@ export class ShapeStream = Row> validateOptions(options) this.options = { subscribe: true, ...options } this.#lastOffset = this.options.offset ?? `-1` - this.#nextLiveCursor = `` + this.#liveCacheBuster = `` this.#shapeId = this.options.shapeId this.#messageParser = new MessageParser(options.parser) @@ -200,7 +200,10 @@ export class ShapeStream = Row> if (this.#isUpToDate) { fetchUrl.searchParams.set(LIVE_QUERY_PARAM, `true`) - fetchUrl.searchParams.set(LIVE_NEXT_CURSOR_QUERY_PARAM, this.#nextLiveCursor) + fetchUrl.searchParams.set( + LIVE_CACHE_BUSTER_QUERY_PARAM, + this.#liveCacheBuster + ) } if (this.#shapeId) { @@ -252,9 +255,9 @@ export class ShapeStream = Row> this.#lastOffset = lastOffset as Offset } - const nextLiveCursor = headers.get(LIVE_NEXT_CURSOR) - if (nextLiveCursor) { - this.#nextLiveCursor = nextLiveCursor + const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER) + if (liveCacheBuster) { + this.#liveCacheBuster = liveCacheBuster } const getSchema = (): Schema => { @@ -385,7 +388,7 @@ export class ShapeStream = Row> */ #reset(shapeId?: string) { this.#lastOffset = `-1` - this.#nextLiveCursor = `` + this.#liveCacheBuster = `` this.#shapeId = shapeId this.#isUpToDate = false this.#connected = false diff --git a/packages/typescript-client/src/constants.ts b/packages/typescript-client/src/constants.ts index cdd2ee936e..bfa2676ee0 100644 --- a/packages/typescript-client/src/constants.ts +++ b/packages/typescript-client/src/constants.ts @@ -1,6 +1,6 @@ export const SHAPE_ID_HEADER = `electric-shape-id` -export const LIVE_NEXT_CURSOR_HEADER = `electric-next-cursor` -export const LIVE_NEXT_CURSOR_QUERY_PARAM = `cursor` +export const LIVE_CACHE_BUSTER_HEADER = `electric-next-cursor` +export const LIVE_CACHE_BUSTER_QUERY_PARAM = `cursor` export const CHUNK_LAST_OFFSET_HEADER = `electric-chunk-last-offset` export const CHUNK_UP_TO_DATE_HEADER = `electric-chunk-up-to-date` export const SHAPE_SCHEMA_HEADER = `electric-schema` From 1d35059f55a15bfb1913bfb0ca51cd0c7a88d12b Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Thu, 10 Oct 2024 09:33:09 -0600 Subject: [PATCH 12/12] Fix --- packages/typescript-client/src/client.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 9ee246ec55..7a1a4e50da 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -17,7 +17,8 @@ import { } from './fetch' import { CHUNK_LAST_OFFSET_HEADER, - LIVE_CACHE_BUSTER, + LIVE_CACHE_BUSTER_HEADER, + LIVE_CACHE_BUSTER_QUERY_PARAM, LIVE_QUERY_PARAM, OFFSET_QUERY_PARAM, SHAPE_ID_HEADER, @@ -255,7 +256,7 @@ export class ShapeStream = Row> this.#lastOffset = lastOffset as Offset } - const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER) + const liveCacheBuster = headers.get(LIVE_CACHE_BUSTER_HEADER) if (liveCacheBuster) { this.#liveCacheBuster = liveCacheBuster }