From 3e16cb41192357343ca10d44aac2897203d2d17b Mon Sep 17 00:00:00 2001 From: flxxyz Date: Thu, 18 May 2023 19:44:33 +0800 Subject: [PATCH] refactor: fix sent draft event & optimized filters & optimized package imports --- deno.jsonc | 26 ++- deno.lock | 122 ++++++------- src/@types/adapters.ts | 2 +- src/@types/base.ts | 4 +- src/@types/event.ts | 3 +- src/@types/repositories.ts | 9 +- src/adapters/inmemory-cache-adapter.ts | 2 +- src/adapters/web-server-adapter.ts | 28 +-- src/adapters/web-socket-adapter.ts | 2 +- src/adapters/web-socket-server-adapter.ts | 71 +++----- src/app/worker.ts | 2 +- src/constants/base.ts | 5 + .../invoices/post-invoice-controller.ts | 2 +- src/core-services/InstanceStatusService.ts | 23 --- src/core-services/WebSocketServerService.ts | 25 ++- src/core-services/core/Events.ts | 16 +- src/database/DatabaseWatcher.ts | 148 +++------------ src/database/client.ts | 2 +- src/database/convertChangeStreamPayload.ts | 9 +- src/database/convertOplogPayload.ts | 76 -------- src/database/escapeRegExp.ts | 3 - src/database/example/createInstance.ts | 60 ------- src/database/example/node.ts | 52 ------ src/database/models/Events.ts | 170 +++++++++++++++--- src/database/models/InstanceStatus.ts | 81 --------- src/database/models/Users.ts | 9 +- src/database/models/index.ts | 2 - src/database/types/IEvent.ts | 18 -- src/database/types/IInstanceStatus.ts | 29 --- src/database/types/IRecord.ts | 5 - src/database/types/IUser.ts | 8 - src/database/types/index.ts | 4 - src/database/watchCollections.ts | 4 +- src/database/watchers.ts | 14 +- src/factories/core-services-factory.ts | 7 +- src/factories/payments-processor-factory.ts | 2 +- .../default-event-strategy.ts | 4 - .../event-strategies/delete-event-strategy.ts | 4 - ...arameterized-replaceable-event-strategy.ts | 4 - .../replaceable-event-strategy.ts | 3 - .../get-invoice-request-handler.ts | 2 +- .../get-terms-request-handler.ts | 2 +- .../rate-limiter-middleware.ts | 2 +- src/handlers/subscribe-message-handler.ts | 3 +- .../lnbits-payment-processor.ts | 2 +- .../lnurl-payments-processor.ts | 2 +- .../zebedee-payments-processor.ts | 2 +- src/repositories/event-repository.ts | 104 +---------- src/repositories/user-repository.ts | 2 +- src/tor/client.ts | 8 +- src/utils/event.ts | 5 +- src/utils/settings.ts | 6 +- src/utils/stream.ts | 2 +- src/utils/transform.ts | 2 +- test/integration/features/helpers.ts | 7 +- test/integration/features/shared.ts | 22 ++- .../delegated-event-message-handler.test.ts | 2 +- .../handlers/event-message-handler.test.ts | 2 +- .../default-event-strategy.test.ts | 10 +- .../delete-event-strategy.test.ts | 10 +- ...terized-replaceable-event-strategy.test.ts | 10 +- .../replaceable-event-strategy.test.ts | 10 +- .../subscribe-message-handler.test.ts | 4 +- test/unit/tor/onion.test.ts | 6 +- test/unit/utils/settings.test.ts | 4 +- 65 files changed, 418 insertions(+), 873 deletions(-) delete mode 100644 src/core-services/InstanceStatusService.ts delete mode 100644 src/database/convertOplogPayload.ts delete mode 100644 src/database/escapeRegExp.ts delete mode 100644 src/database/example/createInstance.ts delete mode 100644 src/database/example/node.ts delete mode 100644 src/database/models/InstanceStatus.ts delete mode 100644 src/database/models/index.ts delete mode 100644 src/database/types/IEvent.ts delete mode 100644 src/database/types/IInstanceStatus.ts delete mode 100644 src/database/types/IRecord.ts delete mode 100644 src/database/types/IUser.ts delete mode 100644 src/database/types/index.ts diff --git a/deno.jsonc b/deno.jsonc index 9264cbc8..d4ff6d1c 100644 --- a/deno.jsonc +++ b/deno.jsonc @@ -5,8 +5,11 @@ "test": "deno test --no-check -A", "test:unit": "deno test test/unit --no-check -A", "test:integration": "deno test test/integration --no-check -A", - "example:db:createInstance": "deno run --allow-all src/database/example/createInstance.ts", - "example:db:node": "deno run --allow-all src/database/example/node.ts", + "test:integration:nip01": "deno test test/integration/features/nip-01/nip-01.feature.test.ts --no-check -A", + "test:integration:nip09": "deno test test/integration/features/nip-09/nip-09.feature.test.ts --no-check -A", + "test:integration:nip16": "deno test test/integration/features/nip-16/nip-16.feature.test.ts --no-check -A", + "test:integration:nip28": "deno test test/integration/features/nip-28/nip-28.feature.test.ts --no-check -A", + "test:integration:nip33": "deno test test/integration/features/nip-33/nip-33.feature.test.ts --no-check -A", "git-hooks": "deno run --no-check=remote --allow-run=deno,git --allow-read --allow-write=.git-hooks https://deno.land/x/githooked/mod.ts install", "bump": "deno run --allow-all .bump/bump.ts" }, @@ -29,7 +32,24 @@ "joi": "https://cdn.skypack.dev/joi@17.7.0?dts", "rxjs": "https://cdn.skypack.dev/rxjs@7.8.0?dts", "chai-as-promised": "https://cdn.skypack.dev/chai-as-promised@7.1.1?dts", - "events": "https://deno.land/std@0.177.0/node/events.ts" + "events": "node:events", + "stream": "node:stream", + "stream/promises": "node:stream/promises", + "dns": "node:dns", + "net": "node:net", + "tls": "node:tls", + "path": "node:path", + "os": "node:os", + "fs": "node:fs", + "fs/promises": "node:fs/promises", + "bson": "npm:bson@5.3.0", + "@isaacs/ttlcache": "npm:@isaacs/ttlcache@1.4.0", + "mongodb": "npm:mongodb@5.3.0", + "mongoose": "npm:mongoose@7.1.1", + "axios": "npm:axios@1.2.6", + "tor-control-ts": "npm:tor-control-ts@1.0.0", + "js-yaml": "npm:js-yaml@4.1.0", + "bech32": "npm:bech32@2.0.0" }, "fmt": { "files": { diff --git a/deno.lock b/deno.lock index d34ab36c..c76955d2 100644 --- a/deno.lock +++ b/deno.lock @@ -14,17 +14,44 @@ "https://cdn.skypack.dev/-/joi@v17.7.0-RK0tA5T7StpATAhyOHd0/dist=es2019,mode=imports/optimized/joi.js": "302fe80da79cf00792631c966c91f2067e1fc6c335f70079579323ff28b0f041", "https://cdn.skypack.dev/-/joi@v17.7.0-RK0tA5T7StpATAhyOHd0/dist=es2019,mode=types/lib/index.d.ts": "356aa4e888eedc8b29ae1532aea69c6d819b4795552d34cc9a04f8ac97b7e60c", "https://cdn.skypack.dev/-/pathval@v1.1.1-4hf44tKt2vdiXpyywiwN/dist=es2019,mode=imports/optimized/pathval.js": "778f821f97d08e946a894e3b94635a63121d8bd5000aeb3d00785318d5ed32de", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/AsyncScheduler-1be47af6.js": "eb94849b5a274c8cd075dec4312cba3998ccaf820149462c490ceebb75fbc987", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/EmptyError-41db9d3f.js": "b9406314bc6936aaf61de500a900f4fd8bf1012bdd8dc301c1ecd324e65057b9", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/Notification-5a9034fd.js": "053ac309d71f0e752343b659fd8440305612bc7ba5665acf3180798fa8ff3d13", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/Observable-7e44db0b.js": "4850765ea35eba1d219484bced5d58c088bb212f2c4de037ded304c66a5a2435", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/OperatorSubscriber-6d72c4ab.js": "81f1ae08d9c0c5ec618f6cfb8dd4abec3b85c411f198694e54f4d624a20e95fc", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/Subject-289b09f9.js": "f226d38bbd77961774a1df0219e0bac5a22b256276167d326b2c480fffab39dd", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/Subscriber-368decef.js": "15405901be614ace9de7787172c8510a903eaa8831d91c407696099d70f1b562", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/Subscription-b18e6761.js": "a0862b17da978e5f9b61ab3d92494368c4c9866eb0b9f487dfdedb2e77c5b37f", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/VirtualTimeScheduler-9142a7bb.js": "1a742b7d31a597e7024a38b26b5a6353956a9fbcf32ca491acf93a010819edc6", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/createErrorClass-d37b97e3.js": "4398cce2f98e5b5a965e278fe97749fe2849bdfc2f7c836af40cd0718ae07b3f", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/dateTimestampProvider-43e4031c.js": "91b5af8cb2394e9168701e79872557ed1da75522642bf0179ae5c1ad26617eb1", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/innerFrom-911ff024.js": "4249e9884ee94c33208868321f903fc7f7e8dc2d49fb9bc79290140af3fc4874", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/lift-8243c0ff.js": "af4bfefee1b2bf408b6530591b8c8cabff34d0160bc56c66ebe6baf6deea6b51", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/map-3df1de07.js": "234ebbf399f55766008765d9e2fb68b998ac54f58330b89d6b8db0489568c65f", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/common/zipWith-ee4749e2.js": "5f2a9a01d80f89e2d74f361e966d16315c57fad4d0999393c395ba448d65fb55", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/rxjs.js": "53f80634624765127e4af3087e5c9e998ff04ac68f994b518328d29eb7978fe0", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/rxjs/internal/AsyncSubject.js": "ddc8e59238dd34ebcfca85a240de7c46a7258288cddb369e4da19cefe7df1360", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/rxjs/internal/BehaviorSubject.js": "038dbfe6b372347c522eaaae60b0c993aebf2ebc083f99d221474caad14dc427", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/rxjs/internal/NotificationFactories.js": "bc822ec4b2d05f3726c8bd3735dcb44775ec85aed8435ab473e8d992bd4dfead", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/rxjs/internal/ReplaySubject.js": "7791dcdb33a9fe41e2692e42485519fd62fe2dd4aacd391449db3b4c209e6eb4", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/rxjs/internal/Scheduler.js": "ce5d2f71ad5e76ffa039827b091a10da299695f93c46376e46cdc1e082e0a87c", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/rxjs/internal/config.js": "78900e946906e3f588b78a53d433b9ce8deb63db0fa84c150ffaf2f54ab3f420", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/rxjs/internal/firstValueFrom.js": "7ddb48503167cfa36f0c184bd8d50b59af83b58afae3b44cde1385667a13a736", + "https://cdn.skypack.dev/-/rxjs@v7.8.0-oE0GhbhAdiS0mpw9Iw6u/dist=es2019,mode=imports/optimized/rxjs/internal/lastValueFrom.js": "7bab19ffb1619b9f4d2f6f1e43e3c4a0e71659dd1c2094a869cdaf3089c57253", "https://cdn.skypack.dev/-/sinon-chai@v3.7.0-sPO3yztXU6JvAbBcqOA6/dist=es2019,mode=imports/optimized/sinon-chai.js": "da69ffb373302464defc70b76f18a103fedad00eca82751840388a4881d452bb", "https://cdn.skypack.dev/-/sinon-chai@v3.7.0-sPO3yztXU6JvAbBcqOA6/dist=es2019,mode=types/index.d.ts": "1b96f339dbcb43da644b021168045ac6236520a3f8017c622dcc0503b23e54f5", "https://cdn.skypack.dev/-/sinon@v11.1.1-a4QDxvQ2C1lyBOKwzLcr/dist=es2019,mode=types/index.d.ts": "194a06aac2c91f395d387a41a7b74a1cf3b153049c68f698f3da5c106bc71968", "https://cdn.skypack.dev/-/sinon@v15.0.1-qlKtgxOm72taiA3pbkK2/dist=es2019,mode=imports/optimized/sinon.js": "4fa78f658343ba3153a936f7ff0488ad9845324c1854d6d73a0a6d7a5d4bc7d5", "https://cdn.skypack.dev/-/sinon@v15.0.1-qlKtgxOm72taiA3pbkK2/dist=es2019,mode=types/index.d.ts": "e49a3c9e2a9a7386a21a6c1ebf9d3883b2712c99fc14ecf8ea1433f6e7a5845c", + "https://cdn.skypack.dev/-/tslib@v2.4.1-EnXszYOb70PNwhRTrKtW/dist=es2019,mode=imports/optimized/common/tslib-5f663465.js": "5962d7e4d3723d7189d43ed6143cf8a00fe688624c4e541e0bea70fff1ae0745", + "https://cdn.skypack.dev/-/tslib@v2.4.1-EnXszYOb70PNwhRTrKtW/dist=es2019,mode=imports/optimized/tslib.js": "ce8027a3f38ac0e0d3c85d525de72b2c6934ff125b6a0449038df975dbcc2967", "https://cdn.skypack.dev/-/type-detect@v4.0.8-3tmjjwwFw2jHiLYy7SbM/dist=es2019,mode=imports/optimized/type-detect.js": "eb7ac639d3de6a0b64aaf9032a5b01ab79afb7c768cc02c07956613ecbd0bd32", "https://cdn.skypack.dev/chai-as-promised@7.1.1?dts": "830017e3a2f15f0f888639655c861d666ffa3c290b30fb095609c6781cd7b9d3", "https://cdn.skypack.dev/chai@4.3.4?dts": "19489c45018b0bcfb63455624ade9adf84e2a3c99a08379160d90763ba7e3f41", "https://cdn.skypack.dev/error/unknown:chai?from=chai-as-promised": "d81c24bdeb6353bd4c69635a787f47511d41c71940b159e88e0e7a15d7573b95", "https://cdn.skypack.dev/error/unknown:chai?from=sinon-chai": "c5eea0fab0f428ffb949645e4adcb0da74722dc5b483bb13bf48d0723436a185", "https://cdn.skypack.dev/joi@17.7.0?dts": "70e396b701dea0c5a59bf8a5578457cb04a98e5ba37d77b10900c7c7aed8e0e3", + "https://cdn.skypack.dev/rxjs@7.8.0?dts": "e5c758eebc71433a4c75900690399c74ca86bc31085a40c69509f377aeacccc1", "https://cdn.skypack.dev/sinon-chai@3.7.0?dts": "e1773fa39d5bebee9e9f4a32b62ebc67c7330043a725ecf0e4c9bf5730738db1", "https://cdn.skypack.dev/sinon@15.0.1?dts": "220487f84273811340f37025cfa8c092492aaa700d9318fe2938f4adb04fea5e", "https://deno.land/std@0.139.0/bytes/equals.ts": "fc16dff2090cced02497f16483de123dfa91e591029f985029193dfaa9d894c9", @@ -686,29 +713,24 @@ "npm": { "specifiers": { "@isaacs/ttlcache": "@isaacs/ttlcache@1.4.0", + "@isaacs/ttlcache@1.4.0": "@isaacs/ttlcache@1.4.0", "axios@1.2.6": "axios@1.2.6", "bech32@2.0.0": "bech32@2.0.0", "js-yaml@4.1.0": "js-yaml@4.1.0", "knex@2.4.2": "knex@2.4.2", - "mongodb": "mongodb@5.1.0", - "mongoose": "mongoose@7.1.0", - "rxjs@7.8.0": "rxjs@7.8.0" + "mongodb": "mongodb@5.5.0", + "mongodb@5.3.0": "mongodb@5.3.0", + "mongodb@5.5.0": "mongodb@5.5.0", + "mongoose": "mongoose@7.1.1", + "mongoose@7.1.1": "mongoose@7.1.1" }, "packages": { - "@isaacs/ttlcache@1.2.1": { - "integrity": "sha512-6hUKl0TcmeenJXitePBS/vPn1l/C8+sO4vvSmRh/hW4CeBm+QselPM6AyiM7WON6jApouCJGUfHYbaNObcMFrQ==", - "dependencies": {} - }, - "@isaacs/ttlcache@1.2.2": { - "integrity": "sha512-UvS9r/yugpqc/uLRdmxfFkaPGfjvVQPjHn3cGFWiT5K1H8o0t3MsRUAB5xs//4dNEmSSi6WpMqVpMjKbsVv82g==", - "dependencies": {} - }, "@isaacs/ttlcache@1.4.0": { "integrity": "sha512-2i5IL+VRvSOAhyqOI6lVrPzHiLhmC+gZEfG/4z8YA7/zRvQGs75SdF9qsZiNWPgKd1SK06NirpIU+Hd+XWDPfQ==", "dependencies": {} }, - "@types/node@18.15.5": { - "integrity": "sha512-Ark2WDjjZO7GmvsyFFf81MXuGTA/d6oP38anyxWOL6EREyBKAxKoFHwBhaZxCfLRLpO8JgVXwqOwSwa7jRcjew==", + "@types/node@20.2.0": { + "integrity": "sha512-3iD2jaCCziTx04uudpJKwe39QxXgSUnpxXSvRQjRvHPxFQfmfP4NXIm/NURVeNlTCc+ru4WqjYGTmpXrW9uMlw==", "dependencies": {} }, "@types/webidl-conversions@7.0.0": { @@ -718,7 +740,7 @@ "@types/whatwg-url@8.2.2": { "integrity": "sha512-FtQu10RWgn3D9U4aazdwIE2yzphmTJREDqNdODHrbrZmmMqI0vMheC/6NE/J1Yveaj8H+ela+YwWTjq5PGmuhA==", "dependencies": { - "@types/node": "@types/node@18.15.5", + "@types/node": "@types/node@20.2.0", "@types/webidl-conversions": "@types/webidl-conversions@7.0.0" } }, @@ -742,12 +764,8 @@ "integrity": "sha512-LcknSilhIGatDAsY1ak2I8VtGaHNhgMSYVxFrGLXv+xLHytaKZKcaUJJUE7qmBr7h33o5YQwP55pMI0xmkpJwg==", "dependencies": {} }, - "bson@5.1.0": { - "integrity": "sha512-FEecNHkhYRBe7X9KDkdG12xNuz5VHGeH6mCE0B5sBmYtiR/Ux/9vUH/v4NUoBCDr6NuEhvahjoLiiRogptVW0A==", - "dependencies": {} - }, - "bson@5.2.0": { - "integrity": "sha512-HevkSpDbpUfsrHWmWiAsNavANKYIErV2ePXllp1bwq5CDreAaFVj6RVlZpJnxK4WWDCJ/5jMUpaY6G526q3Hjg==", + "bson@5.3.0": { + "integrity": "sha512-ukmCZMneMlaC5ebPHXIkP8YJzNl5DC41N5MAIvKDqLggdao342t4McltoJBQfQya/nHBWAcSsYRqlXPoQkTJag==", "dependencies": {} }, "colorette@2.0.19": { @@ -820,8 +838,8 @@ "integrity": "sha512-WKa+XuLG1A1R0UWhl2+1XQSi+fZWMsYKffMZTTYsiZaUD8k2yDAj5atimTUD2TZkyCkNEeYE5NhFZmupOGtjYQ==", "dependencies": {} }, - "is-core-module@2.11.0": { - "integrity": "sha512-RRjxlvLDkD1YJwDbroBHMb+cukurkDWNyHx7D3oNB5x9rb5ogcksMC5wHCadcXoo67gVr/+3GFySh3134zi6rw==", + "is-core-module@2.12.1": { + "integrity": "sha512-Q4ZuBAe2FUsKtyQJoQHlvP8OvBERxO3jEmy1I7hcRXcJBGGHFh/aJBswbXuS9sgrDH2QUO8ilkwNPHvHMd8clg==", "dependencies": { "has": "has@1.0.3" } @@ -880,52 +898,28 @@ "whatwg-url": "whatwg-url@11.0.0" } }, - "mongodb@5.1.0": { - "integrity": "sha512-qgKb7y+EI90y4weY3z5+lIgm8wmexbonz0GalHkSElQXVKtRuwqXuhXKccyvIjXCJVy9qPV82zsinY0W1FBnJw==", + "mongodb@5.3.0": { + "integrity": "sha512-Wy/sbahguL8c3TXQWXmuBabiLD+iVmz+tOgQf+FwkCjhUIorqbAxRbbz00g4ZoN4sXIPwpAlTANMaGRjGGTikQ==", "dependencies": { - "bson": "bson@5.1.0", + "bson": "bson@5.3.0", "mongodb-connection-string-url": "mongodb-connection-string-url@2.6.0", "saslprep": "saslprep@1.0.3", "socks": "socks@2.7.1" } }, - "mongodb@5.3.0": { - "integrity": "sha512-Wy/sbahguL8c3TXQWXmuBabiLD+iVmz+tOgQf+FwkCjhUIorqbAxRbbz00g4ZoN4sXIPwpAlTANMaGRjGGTikQ==", + "mongodb@5.5.0": { + "integrity": "sha512-XgrkUgAAdfnZKQfk5AsYL8j7O99WHd4YXPxYxnh8dZxD+ekYWFRA3JktUsBnfg+455Smf75/+asoU/YLwNGoQQ==", "dependencies": { - "bson": "bson@5.2.0", + "bson": "bson@5.3.0", "mongodb-connection-string-url": "mongodb-connection-string-url@2.6.0", "saslprep": "saslprep@1.0.3", "socks": "socks@2.7.1" } }, - "mongoose@7.0.2": { - "integrity": "sha512-whX+5lAOLOs6VXRr9w+6m5qb8m/IXWLLb9+0/HRUh2TiIYtTt7UvajK92zW6wllCjBkrrnz/MDIOTCWMbs8K4g==", + "mongoose@7.1.1": { + "integrity": "sha512-AIxaWwGY+td7QOMk4NgK6fbRuGovFyDzv65nU1uj1DsUh3lpjfP3iFYHSR+sUKrs7nbp19ksLlRXkmInBteSCA==", "dependencies": { - "bson": "bson@5.1.0", - "kareem": "kareem@2.5.1", - "mongodb": "mongodb@5.1.0", - "mpath": "mpath@0.9.0", - "mquery": "mquery@5.0.0", - "ms": "ms@2.1.3", - "sift": "sift@16.0.1" - } - }, - "mongoose@7.0.4": { - "integrity": "sha512-MEmQOOqQUvW1PJcji64NtA2EFGHrEvk9o4g//isVYSJW2+8Y8u49C2qFBKzn1t6/l9onQn012o/PcFqR6ixQpQ==", - "dependencies": { - "bson": "bson@5.1.0", - "kareem": "kareem@2.5.1", - "mongodb": "mongodb@5.1.0", - "mpath": "mpath@0.9.0", - "mquery": "mquery@5.0.0", - "ms": "ms@2.1.3", - "sift": "sift@16.0.1" - } - }, - "mongoose@7.1.0": { - "integrity": "sha512-shoo9z/7o96Ojx69wpJn65+EC+Mt3q1SWTducW+F2Y4ieCXo0lZwpCZedgC841MIvJ7V8o6gmzoN1NfcnOTOuw==", - "dependencies": { - "bson": "bson@5.2.0", + "bson": "bson@5.3.0", "kareem": "kareem@2.5.1", "mongodb": "mongodb@5.3.0", "mpath": "mpath@0.9.0", @@ -971,27 +965,21 @@ "rechoir@0.8.0": { "integrity": "sha512-/vxpCXddiX8NGfGO/mTafwjq4aFa/71pvamip0++IQk3zG8cbCj0fifNPrjjF1XMXUne91jL9OoxmdykoEtifQ==", "dependencies": { - "resolve": "resolve@1.22.1" + "resolve": "resolve@1.22.3" } }, "resolve-from@5.0.0": { "integrity": "sha512-qYg9KP24dD5qka9J47d0aVky0N+b4fTU89LN9iDnjB5waksiC49rvMB0PrUJQGoTmH50XPiqOvAjDfaijGxYZw==", "dependencies": {} }, - "resolve@1.22.1": { - "integrity": "sha512-nBpuuYuY5jFsli/JIs1oldw6fOQCBioohqWZg/2hiaOybXOft4lonv85uDOKXdf8rhyK159cxU5cDcK/NKk8zw==", + "resolve@1.22.3": { + "integrity": "sha512-P8ur/gp/AmbEzjr729bZnLjXK5Z+4P0zhIJgBgzqRih7hL7BOukHGtSTA3ACMY467GRFz3duQsi0bDZdR7DKdw==", "dependencies": { - "is-core-module": "is-core-module@2.11.0", + "is-core-module": "is-core-module@2.12.1", "path-parse": "path-parse@1.0.7", "supports-preserve-symlinks-flag": "supports-preserve-symlinks-flag@1.0.0" } }, - "rxjs@7.8.0": { - "integrity": "sha512-F2+gxDshqmIub1KdvZkaEfGDwLNpPvk9Fs6LD/MyQxNgMds/WH9OdDDXOmxUZpME+iSK3rQCctkL0DYyytUqMg==", - "dependencies": { - "tslib": "tslib@2.5.0" - } - }, "saslprep@1.0.3": { "integrity": "sha512-/MY/PEMbk2SuY5sScONwhUDsV2p77Znkb/q3nSVstq/yQzYJOH/Azh29p9oJLsl3LnQwSvZDKagDGBsBwSooag==", "dependencies": { @@ -1037,10 +1025,6 @@ "punycode": "punycode@2.3.0" } }, - "tslib@2.5.0": { - "integrity": "sha512-336iVw3rtn2BUK7ORdIAHTyxHGRIHVReokCR3XjbckJMK7ms8FysBfhLR8IXnAgy7T0PTPNBWKiH514FOW/WSg==", - "dependencies": {} - }, "webidl-conversions@7.0.0": { "integrity": "sha512-VwddBukDzu71offAQR975unBIGqfKZpM+8ZX6ySk8nYhVoo5CYaZyzt3YBvYtRtO+aoGlqxPg/B87NGVZ/fu6g==", "dependencies": {} diff --git a/src/@types/adapters.ts b/src/@types/adapters.ts index 7520a2bb..5e604259 100644 --- a/src/@types/adapters.ts +++ b/src/@types/adapters.ts @@ -1,4 +1,4 @@ -import { EventEmitter } from 'node:events' +import { EventEmitter } from 'events' import { Bulk } from 'redis' diff --git a/src/@types/base.ts b/src/@types/base.ts index e972273b..dd15478b 100644 --- a/src/@types/base.ts +++ b/src/@types/base.ts @@ -1,7 +1,7 @@ -import net from 'node:net' +import net from 'net' import { Knex } from 'npm:knex@2.4.2' -import mongoose from 'npm:mongoose' +import mongoose from 'mongoose' import { EventTags } from '../constants/base.ts' diff --git a/src/@types/event.ts b/src/@types/event.ts index b248b348..938d9687 100644 --- a/src/@types/event.ts +++ b/src/@types/event.ts @@ -1,4 +1,5 @@ import { Buffer } from 'Buffer' +import { ObjectId } from 'mongodb' import { ContextMetadataKey, EventDeduplicationMetadataKey, EventDelegatorMetadataKey, EventExpirationTimeMetadataKey, EventKinds } from '../constants/base.ts' import { ContextMetadata, EventId, Pubkey, Tag } from './base.ts' @@ -36,7 +37,7 @@ export interface ParameterizedReplaceableEvent extends Event { } export interface DBEvent { - id: string + _id: ObjectId event_id: Buffer event_pubkey: Buffer event_kind: number diff --git a/src/@types/repositories.ts b/src/@types/repositories.ts index f43f471c..f53dab4a 100644 --- a/src/@types/repositories.ts +++ b/src/@types/repositories.ts @@ -1,10 +1,9 @@ -import { PassThrough } from 'node:stream' +import { PassThrough } from 'stream' -import mongoose from 'npm:mongoose' +import mongoose from 'mongoose' -import { IEvent } from '../database/types/IEvent.ts' import { DatabaseClient, EventId, Pubkey } from './base.ts' -import { Event } from './event.ts' +import { Event, DBEvent } from './event.ts' import { Invoice } from './invoice.ts' import { SubscriptionFilter } from './subscription.ts' import { User } from './user.ts' @@ -18,7 +17,7 @@ export interface IQueryResult extends Pick, keyof Promise & Exp export interface IEventRepository { create(event: Event): Promise upsert(event: Event): Promise - findByFilters(filters: SubscriptionFilter[]): mongoose.Aggregate + findByFilters(filters: SubscriptionFilter[]): mongoose.Aggregate insertStubs(pubkey: string, eventIdsToDelete: EventId[]): Promise deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise } diff --git a/src/adapters/inmemory-cache-adapter.ts b/src/adapters/inmemory-cache-adapter.ts index e32481cc..6283c455 100644 --- a/src/adapters/inmemory-cache-adapter.ts +++ b/src/adapters/inmemory-cache-adapter.ts @@ -1,5 +1,5 @@ import { Bulk } from 'redis' -import TTLCache from 'npm:@isaacs/ttlcache' +import TTLCache from '@isaacs/ttlcache' import { ICacheAdapter } from '../@types/adapters.ts' import { createLogger } from '../factories/logger-factory.ts' diff --git a/src/adapters/web-server-adapter.ts b/src/adapters/web-server-adapter.ts index 38e90658..271ed6ef 100644 --- a/src/adapters/web-server-adapter.ts +++ b/src/adapters/web-server-adapter.ts @@ -1,5 +1,4 @@ -import { EventEmitter } from 'node:events' -import { Duplex } from 'node:stream' +import { EventEmitter } from 'events' import { Application } from 'oak' @@ -15,11 +14,6 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter ) { debug('created') super() - // this.webServer - // .on('error', this.onError.bind(this)) - // .on('clientError', this.onClientError.bind(this)) - // .once('close', this.onClose.bind(this)) - // .once('listening', this.onListening.bind(this)) } public listen(port: number): void { @@ -27,27 +21,13 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter this.controller = new AbortController() const { signal } = this.controller this.webServer.listen({ port, signal }) - } - - private onListening() { - debug('olistening for incoming connections') - debug('listening for incoming connections') + this.webServer.addEventListener('error', ({ error }) => this.onError(error)) } private onError(error: Error) { console.error('web-server-adapter: error:', error) } - private onClientError(error: Error, socket: Duplex) { - debug('onClientError', error, socket) - - console.error('web-server-adapter: client socket error:', error) - if (error['code'] === 'ECONNRESET' || !socket.writable) { - return - } - socket.end('HTTP/1.1 400 Bad Request\r\nContent-Type: text/html\r\n') - } - public close(callback?: () => void): void { debug('closing') this.controller?.abort?.() @@ -56,9 +36,7 @@ export class WebServerAdapter extends EventEmitter implements IWebServerAdapter debug('closed') } - protected onClose(e) { - debug('stopped listening to incoming connections', e) - + protected onClose() { debug('stopped listening to incoming connections') } } diff --git a/src/adapters/web-socket-adapter.ts b/src/adapters/web-socket-adapter.ts index 5de530c1..fcf894d9 100644 --- a/src/adapters/web-socket-adapter.ts +++ b/src/adapters/web-socket-adapter.ts @@ -1,4 +1,4 @@ -import { EventEmitter } from 'node:events' +import { EventEmitter } from 'events' import { Buffer } from 'Buffer' import { Request } from 'oak' diff --git a/src/adapters/web-socket-server-adapter.ts b/src/adapters/web-socket-server-adapter.ts index 99337d38..f2e0dc40 100644 --- a/src/adapters/web-socket-server-adapter.ts +++ b/src/adapters/web-socket-server-adapter.ts @@ -4,12 +4,13 @@ import { propEq } from 'ramda' import { IWebSocketAdapter, IWebSocketServerAdapter } from '../@types/adapters.ts' import { Factory } from '../@types/base.ts' import { Event } from '../@types/event.ts' -// import { getRemoteAddress } from '../utils/http.ts' -// import { isRateLimited } from '../handlers/request-handlers/rate-limiter-middleware.ts' +import { getRemoteAddress } from '../utils/http.ts' +import { isRateLimited } from '../handlers/request-handlers/rate-limiter-middleware.ts' import { Settings } from '../@types/settings.ts' import { WebSocketAdapterEvent, WebSocketServerAdapterEvent } from '../constants/adapter.ts' import { createLogger } from '../factories/logger-factory.ts' import { WebServerAdapter } from './web-server-adapter.ts' +import { Context } from '../@types/controllers.ts' const debug = createLogger('web-socket-server-adapter') @@ -30,24 +31,15 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock this.on(WebSocketServerAdapterEvent.Broadcast, this.onBroadcast.bind(this)) this.initMiddleWare() - // this.webSocketServer - // .on(WebSocketServerAdapterEvent.Connection, this.onConnection.bind(this)) - // .on('error', (error: any) => { - // debug('error: %o', error) - // }) - // this.heartbeatInterval = setInterval(this.onHeartbeat.bind(this), WSS_CLIENT_HEALTH_PROBE_INTERVAL) } public initMiddleWare() { this.webServer.use(async (ctx, next) => { if (ctx.isUpgradable) { const webSocket = ctx.upgrade() - const req = ctx.request - webSocket.onopen = () => { - this.webSocketsAdapters.set( - webSocket, - this.createWebSocketAdapter([webSocket, req, this]), - ) + webSocket.onopen = () => this.onConnection(ctx, webSocket) + webSocket.onerror = (error) => { + debug('error: %o', error) } } else { await next() @@ -57,8 +49,6 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock public close(callback?: () => void): void { super.close(() => { debug('closing') - // clearInterval(this.heartbeatInterval) - this.webSocketsAdapters.forEach( (webSocketAdapter: IWebSocketAdapter, webSocket: WebSocket) => { if (webSocketAdapter) { @@ -72,14 +62,7 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock }, ) callback?.() - debug('closing web socket server') - // this.webSocketServer.close(() => { - // this.webSocketServer.removeAllListeners() - // if (typeof callback !== 'undefined') { - // callback() - // } - // debug('closed') - // }) + debug('closed') }) this.removeAllListeners() } @@ -97,41 +80,29 @@ export class WebSocketServerAdapter extends WebServerAdapter implements IWebSock }, ) } + public removeClient = (client: WebSocket) => { this.webSocketsAdapters.delete(client) } + public getConnectedClients(): number { return Array.from(this.webSocketsAdapters).filter( propEq('readyState', WebSocket.OPEN), ).length } - // private async onConnection(client: WebSocket, req: Request) { - // try { - // const currentSettings = this.settings() - // const remoteAddress = getRemoteAddress(req, currentSettings) - // // const remoteAddress = '192.168.0.126' - // debug('client %s connected: %o', remoteAddress, req.headers) + private async onConnection(ctx: Context, client: WebSocket) { + const req: Request = ctx.request + const currentSettings = this.settings() + const remoteAddress = getRemoteAddress(req, currentSettings) + debug('client %s connected: %o', remoteAddress, req.headers) - // if (await isRateLimited(remoteAddress, currentSettings)) { - // debug('client %s terminated: rate-limited', remoteAddress) - // client.close() - // return - // } + if (await isRateLimited(remoteAddress, currentSettings)) { + debug('client %s terminated: rate-limited', remoteAddress) + client.close() + return + } - // this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, req, this])) - // } catch (e) { - // console.info('链接错误的', e) - // } - - // } - - // private onHeartbeat() { - // this.webSocketsAdapters.forEach((webSocket) => { - // const webSocketAdapter = this.webSocketsAdapters.get(webSocket) as IWebSocketAdapter - // if (webSocketAdapter) { - // webSocketAdapter.emit(WebSocketAdapterEvent.Heartbeat) - // } - // }) - // } + this.webSocketsAdapters.set(client, this.createWebSocketAdapter([client, req, this])) + } } diff --git a/src/app/worker.ts b/src/app/worker.ts index 200d2f0c..170a3453 100644 --- a/src/app/worker.ts +++ b/src/app/worker.ts @@ -1,4 +1,4 @@ -import { FSWatcher } from 'node:fs' +import { FSWatcher } from 'fs' import { IWebSocketServerAdapter } from '../@types/adapters.ts' import { IRunnable } from '../@types/base.ts' diff --git a/src/constants/base.ts b/src/constants/base.ts index 00175566..ce71f705 100644 --- a/src/constants/base.ts +++ b/src/constants/base.ts @@ -58,3 +58,8 @@ export const EventDelegatorMetadataKey = Symbol('Delegator') export const EventDeduplicationMetadataKey = Symbol('Deduplication') export const ContextMetadataKey = Symbol('Context') export const EventExpirationTimeMetadataKey = Symbol('Expiration') + +export enum Sort { + ASC = 1, + DESC = -1, +} diff --git a/src/controllers/invoices/post-invoice-controller.ts b/src/controllers/invoices/post-invoice-controller.ts index ef994f01..418749f2 100644 --- a/src/controllers/invoices/post-invoice-controller.ts +++ b/src/controllers/invoices/post-invoice-controller.ts @@ -1,4 +1,4 @@ -import { readFileSync } from 'node:fs' +import { readFileSync } from 'fs' import { path } from 'ramda' diff --git a/src/core-services/InstanceStatusService.ts b/src/core-services/InstanceStatusService.ts deleted file mode 100644 index f1741e98..00000000 --- a/src/core-services/InstanceStatusService.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { ServiceClass } from './index.ts' - -export class InstanceStatusService extends ServiceClass { - protected name = 'instances' - - constructor() { - super() - - this.onEvent('watch.instanceStatus', (event) => { - const { clientAction, data } = event - if (clientAction === 'inserted') { - console.log( - '[services] InstanceStatus onEvent(watch.instanceStatus)', - data, - ) - } - }) - } - - async started(): Promise { - console.log('[services] InstanceStatus started.') - } -} diff --git a/src/core-services/WebSocketServerService.ts b/src/core-services/WebSocketServerService.ts index 103ac086..f95d7550 100644 --- a/src/core-services/WebSocketServerService.ts +++ b/src/core-services/WebSocketServerService.ts @@ -3,22 +3,35 @@ import { WebSocketServerAdapterEvent } from '../constants/adapter.ts' import { createLogger } from '../factories/logger-factory.ts' import { ServiceClass } from './index.ts' import { toNostrEvent } from '../utils/event.ts' -import { IEvent } from '../database/types/IEvent.ts' +import { DBEvent } from '../@types/event.ts' +import { readReplicaEventsModel } from '../database/models/Events.ts' const debug = createLogger('core-service:web-socket-server-service') export class WebSocketServerService extends ServiceClass { - protected name = 'instances' + protected name = 'WebSocketServerService' constructor(adapter: WebSocketServerAdapter) { super() - this.onEvent('WebSocketServer.broadcast', (event) => { - const { clientAction, data } = event + this.onEvent('events.broadcast', async (event) => { + const { clientAction, data, diff, id } = event if (clientAction === 'inserted') { - debug('WebSocketServer.broadcast %s data: %o', clientAction, data) + debug('events.broadcast %s data: %o', clientAction, data) if (data && typeof data !== 'undefined') { - adapter.emit(WebSocketServerAdapterEvent.Broadcast, toNostrEvent(data as IEvent)) + // ignore draft + if (data.event_kind === 5 && data.event_signature) { + return + } + adapter.emit(WebSocketServerAdapterEvent.Broadcast, toNostrEvent(data as DBEvent)) + } + } + + if (clientAction === 'updated') { + debug('events.broadcast %s diff: %o', clientAction, diff) + const data = await readReplicaEventsModel.findById(id) as DBEvent | null + if (data) { + adapter.emit(WebSocketServerAdapterEvent.Broadcast, toNostrEvent(data as DBEvent)) } } }) diff --git a/src/core-services/core/Events.ts b/src/core-services/core/Events.ts index f434ea90..da199f6e 100644 --- a/src/core-services/core/Events.ts +++ b/src/core-services/core/Events.ts @@ -1,18 +1,14 @@ -import type { IEvent, IInstanceStatus } from '../../database/types/index.ts' +import { ObjectId } from 'mongodb' + +import type { DBEvent } from '../../@types/event.ts' export type ClientAction = 'inserted' | 'updated' | 'removed' | 'changed' export type EventSignatures = { - 'watch.instanceStatus'(data: { - clientAction: ClientAction - data?: undefined | Partial - diff?: undefined | Record - id: string - }): void - 'WebSocketServer.broadcast'(data: { + 'events.broadcast'(data: { clientAction: ClientAction - data?: undefined | Partial + data?: undefined | Partial diff?: undefined | Record - id: string + id: ObjectId }): void } diff --git a/src/database/DatabaseWatcher.ts b/src/database/DatabaseWatcher.ts index 75d0666a..c5c3537c 100644 --- a/src/database/DatabaseWatcher.ts +++ b/src/database/DatabaseWatcher.ts @@ -1,11 +1,7 @@ import EventEmitter from 'events' -import type { ChangeStreamDeleteDocument, ChangeStreamInsertDocument, ChangeStreamUpdateDocument, Db, Timestamp, WithId } from 'npm:mongodb' -import { MongoClient } from 'npm:mongodb' +import type { ChangeStreamDeleteDocument, ChangeStreamInsertDocument, ChangeStreamUpdateDocument, Db, WithId, ObjectId, ChangeStream } from 'mongodb' -import { convertChangeStreamPayload } from './convertChangeStreamPayload.ts' -import { convertOplogPayload } from './convertOplogPayload.ts' -import { escapeRegExp } from './escapeRegExp.ts' -import type { IRecord } from './types/index.ts' +import { convertChangeStreamPayload, IRecord } from './convertChangeStreamPayload.ts' import watchCollections from './watchCollections.ts' export type RecordDeleted = WithId & { @@ -19,7 +15,7 @@ const instancePing = parseInt(String(Deno.env.get('MULTIPLE_INSTANCES_PING_INTER const maxDocMs = instancePing * 4 // 4 times the ping interval export type RealTimeData = { - id: string + id: ObjectId action: 'insert' | 'update' | 'remove' clientAction: 'inserted' | 'updated' | 'removed' data?: T @@ -28,144 +24,41 @@ export type RealTimeData = { oplog?: true } -const ignoreChangeStream = ['yes', 'true'].includes( - String(Deno.env.get('IGNORE_CHANGE_STREAM')).toLowerCase(), -) - -const useMeteorOplog = ['yes', 'true'].includes( - String(Deno.env.get('USE_NATIVE_OPLOG')).toLowerCase(), -) - export class DatabaseWatcher extends EventEmitter { private db: Db - private _oplogHandle?: any - private metrics?: any + private changeStream: ChangeStream + /** * Last doc timestamp received from a real time event */ private lastDocTS: Date | undefined constructor( - { db, _oplogHandle, metrics }: { + { db, metrics }: { db: Db - _oplogHandle?: any metrics?: any }, ) { super() this.db = db - this._oplogHandle = _oplogHandle this.metrics = metrics } async watch(): Promise { - if (useMeteorOplog) { - // TODO remove this when updating to Meteor 2.8 - console.warn( - 'Using USE_NATIVE_OPLOG=true is currently discouraged due to known performance issues. Please use IGNORE_CHANGE_STREAM=true instead.', - ) - this.watchMeteorOplog() - return - } - - if (ignoreChangeStream) { - await this.watchOplog() - return - } - try { this.watchChangeStream() } catch (_err: unknown) { - await this.watchOplog() + throw new Error('MongoDB ChangeStream is not supported') } } - private async watchOplog(): Promise { - if (!Deno.env.has('MONGO_OPLOG_URL')) { - throw Error('No $MONGO_OPLOG_URL provided') - } - - const isMasterDoc = await this.db.admin().command({ ismaster: 1 }) - if (!isMasterDoc || !isMasterDoc.setName) { - throw Error('$MONGO_URL should be a replica set\'s URL') - } - - const dbName = this.db.databaseName - - const client = new MongoClient(Deno.env.get('MONGO_OPLOG_URL') as string, { - maxPoolSize: 1, - }) - - if (client.db().databaseName !== 'local') { - throw Error( - '$MONGO_OPLOG_URL must be set to the \'local\' database of a Mongo replica set', - ) - } - - await client.connect() - - console.log('Using oplog') - - const db = client.db() - - const oplogCollection = db.collection('oplog.rs') - - const lastOplogEntry = await oplogCollection.findOne<{ ts: Timestamp }>( - {}, - { sort: { $natural: -1 }, projection: { _id: 0, ts: 1 } }, - ) - - const oplogSelector = { - ns: new RegExp(`^(?:${[escapeRegExp(`${dbName}.`)].join('|')})`), - op: { $in: ['i', 'u', 'd'] }, - ...(lastOplogEntry && { ts: { $gt: lastOplogEntry.ts } }), - } - - const cursor = oplogCollection.find(oplogSelector) - - cursor.addCursorFlag('tailable', true) - cursor.addCursorFlag('awaitData', true) - cursor.addCursorFlag('oplogReplay', true) - - const stream = cursor.stream() - - stream.on('data', (doc) => { - const doesMatter = watchCollections.some((collection) => doc.ns === `${dbName}.${collection}`) - if (!doesMatter) { - return - } - - this.emitDoc( - doc.ns.slice(dbName.length + 1), - convertOplogPayload({ - id: doc.op === 'u' ? doc.o2._id : doc.o._id, - op: doc, - }), - ) - }) - } - - private watchMeteorOplog(): void { - if (!this._oplogHandle) { - throw new Error('no-oplog-handle') - } - - console.log('Using Meteor oplog') - - watchCollections.forEach((collection) => { - this._oplogHandle.onOplogEntry({ collection }, (event: any) => { - this.emitDoc(collection, convertOplogPayload(event)) - }) - }) - } - private watchChangeStream(): void { try { - const changeStream = this.db.watch< + this.changeStream = this.db.watch< IRecord, | ChangeStreamInsertDocument | ChangeStreamUpdateDocument @@ -178,13 +71,8 @@ export class DatabaseWatcher extends EventEmitter { }, }, ]) - changeStream.on('change', (event: any) => { - this.emitDoc(event.ns.coll, convertChangeStreamPayload(event)) - }) - - changeStream.on('error', (err: any) => { - throw err - }) + this.changeStream.on('change', this.onChangeStreamChange) + this.changeStream.on('error', this.onChangeStreamError) console.log('Using change streams') } catch (err: unknown) { @@ -194,6 +82,22 @@ export class DatabaseWatcher extends EventEmitter { } } + public async close() { + if (this.changeStream) { + this.removeAllListeners() + this.changeStream.removeAllListeners() + await this.changeStream.close() + } + } + + private onChangeStreamChange = (event: any) => { + this.emitDoc(event.ns.coll, convertChangeStreamPayload(event)) + } + + private onChangeStreamError = (err: any) => { + throw err + } + private emitDoc( collection: string, doc: RealTimeData | void, diff --git a/src/database/client.ts b/src/database/client.ts index dd208f65..1d0ce1bb 100644 --- a/src/database/client.ts +++ b/src/database/client.ts @@ -1,4 +1,4 @@ -import mongoose from 'npm:mongoose' +import mongoose from 'mongoose' import Config from '../config/index.ts' import { createLogger } from '../factories/logger-factory.ts' diff --git a/src/database/convertChangeStreamPayload.ts b/src/database/convertChangeStreamPayload.ts index f178f2f3..98dde983 100644 --- a/src/database/convertChangeStreamPayload.ts +++ b/src/database/convertChangeStreamPayload.ts @@ -1,8 +1,13 @@ // deno-lint-ignore-file no-explicit-any -import type { ChangeStreamDeleteDocument, ChangeStreamInsertDocument, ChangeStreamUpdateDocument } from 'npm:mongodb' +import type { ChangeStreamDeleteDocument, ChangeStreamInsertDocument, ChangeStreamUpdateDocument } from 'mongodb' import type { RealTimeData } from './DatabaseWatcher.ts' -import type { IRecord } from './types/IRecord.ts' + +export interface IRecord { + _id: string + created_at: Date + updated_at: Date +} export function convertChangeStreamPayload( event: diff --git a/src/database/convertOplogPayload.ts b/src/database/convertOplogPayload.ts deleted file mode 100644 index be81ff82..00000000 --- a/src/database/convertOplogPayload.ts +++ /dev/null @@ -1,76 +0,0 @@ -// deno-lint-ignore-file no-explicit-any no-prototype-builtins -import type { RealTimeData } from './DatabaseWatcher.ts' -import type { IRecord } from './types/IRecord.ts' - -const actions = { - i: 'insert', - u: 'update', - d: 'remove', -} - -export function convertOplogPayload({ - id, - op, -}: { - id: string - op: { op: 'i' | 'u' | 'd'; o: any } -}): RealTimeData | void { - const action = actions[op.op] - if (action === 'insert') { - return { - action, - clientAction: 'inserted', - id: op.o._id, - data: op.o, - oplog: true, - } - } - - if (action === 'update') { - if (!op.o.$set && !op.o.$unset) { - return { - action, - clientAction: 'updated', - id, - data: op.o, - oplog: true, - } - } - - const diff: any = {} - if (op.o.$set) { - for (const key in op.o.$set) { - if (op.o.$set.hasOwnProperty(key)) { - diff[key] = op.o.$set[key] - } - } - } - const unset: any = {} - if (op.o.$unset) { - for (const key in op.o.$unset) { - if (op.o.$unset.hasOwnProperty(key)) { - diff[key] = undefined - unset[key] = 1 - } - } - } - - return { - action, - clientAction: 'updated', - id, - diff, - unset, - oplog: true, - } - } - - if (action === 'remove') { - return { - action, - clientAction: 'removed', - id, - oplog: true, - } - } -} diff --git a/src/database/escapeRegExp.ts b/src/database/escapeRegExp.ts deleted file mode 100644 index 587b5718..00000000 --- a/src/database/escapeRegExp.ts +++ /dev/null @@ -1,3 +0,0 @@ -const toString = (object: unknown): string => (object ? `${object}` : '') - -export const escapeRegExp = (input: string): string => toString(input).replace(/[-.*+?^=!:${}()|[\]\/\\]/g, '\\$&') diff --git a/src/database/example/createInstance.ts b/src/database/example/createInstance.ts deleted file mode 100644 index 629c6545..00000000 --- a/src/database/example/createInstance.ts +++ /dev/null @@ -1,60 +0,0 @@ -// no-explicit-any -import { faker } from 'npm:@faker-js/faker' -import mongoose from 'npm:mongoose' - -import { InstanceStatusModel } from '../models/InstanceStatus.ts' - -const sleep = (ms = 1000) => new Promise((resolve) => setTimeout(resolve, ms)) - -async function createInstance() { - const mongoUri = Deno.env.get('MONGO_URI') as string - await mongoose.connect(mongoUri, { - keepAlive: true, - }) - console.log('Connected to database') - - await sleep(10 * 1000) - - faker.locale = 'zh_CN' - - const InstanceStatus = new InstanceStatusModel() - const instance = { - _id: faker.database.mongodbObjectId(), - name: faker.internet.userName(), - pid: faker.datatype.number({ min: 1, max: 65535 }), - extraInformation: { - host: faker.internet.ipv4(), - port: faker.internet.port(), - os: { - vendor: Deno.build.vendor, - platform: Deno.build.os, - arch: Deno.build.arch, - uptime: Deno.osUptime(), - loadavg: Deno.loadavg(), - totalmem: Deno.systemMemoryInfo().total, - freemem: Deno.systemMemoryInfo().free, - cpus: navigator.hardwareConcurrency, - usageMemory: Deno.memoryUsage(), - }, - denoVersion: Deno.version.deno, - }, - } - InstanceStatus.set(instance) - InstanceStatus._createdAt = new Date() - InstanceStatus._updatedAt = new Date() - const model = await InstanceStatus.save() - console.log('doc -----', model) - - await sleep(5 * 1000) - model.set('name', 'hello') - const _updateData = await InstanceStatus.save() - console.log('updateData -----', _updateData) - - await sleep(10 * 1000) - const _removeData = await InstanceStatus.deleteOne() - console.log('removeData -----', _removeData) - - await mongoose.disconnect() -} - -createInstance() diff --git a/src/database/example/node.ts b/src/database/example/node.ts deleted file mode 100644 index 5d2d77ee..00000000 --- a/src/database/example/node.ts +++ /dev/null @@ -1,52 +0,0 @@ -/* eslint-disable @typescript-eslint/ban-ts-comment */ -// deno-lint-ignore-file ban-ts-comment -import mongoose from 'npm:mongoose' - -import { api, LocalBroker } from '../../core-services/index.ts' -import { InstanceStatusService } from '../../core-services/InstanceStatusService.ts' -import { DatabaseWatcher } from '../DatabaseWatcher.ts' -import { initWatchers } from '../watchers.ts' - -async function run() { - const mongoUri = Deno.env.get('MONGO_URI') as string - - const conn = mongoose.createConnection(mongoUri, { - keepAlive: true, - }) - conn.on('open', () => { - console.log('Connected to database') - }) - await conn.asPromise() - - const mongo = conn as mongoose.Connection - const db = mongo.db - // @ts-ignore - const _oplogHandle = mongo?._oplogHandle - const watcher = new DatabaseWatcher({ - db, - _oplogHandle, - }) - watcher.watch().catch((err: Error) => { - console.error(err, 'Fatal error occurred when watching database') - Deno.exit(1) - }) - - initWatchers(watcher, api.broadcastLocal.bind(api)) - - setInterval(function _checkDatabaseWatcher() { - if (watcher.isLastDocDelayed()) { - console.error('No real time data received recently') - } - }, 20000) - - const broker = new LocalBroker() - broker.onBroadcast((eventName, ...args) => { - console.log('broadcast', [{ eventName, args }]) - }) - - api.registerService(new InstanceStatusService()) - api.setBroker(broker) - api.start() -} - -run() diff --git a/src/database/models/Events.ts b/src/database/models/Events.ts index 8da814ca..6f4bc4fd 100644 --- a/src/database/models/Events.ts +++ b/src/database/models/Events.ts @@ -1,27 +1,12 @@ -import mongoose from 'npm:mongoose' +import mongoose, { FilterQuery } from 'mongoose' -import { Tag } from '../../@types/base.ts' import { getMasterDbClient, getReadReplicaDbClient } from '../client.ts' import { Buffer } from 'Buffer' - -export interface EventInput { - event_id: Buffer - event_pubkey: Buffer - event_kind: number - event_created_at: number - event_content: string - event_tags: Tag[][] - event_signature: Buffer - event_delegator?: Buffer | null - event_deduplication?: string[] | null - first_seen: Date -} - -export interface EventDocument extends EventInput, mongoose.Document { - created_at: Date - updated_at: Date - expires_at?: number -} +import { DBEvent } from '../../@types/event.ts' +import { SubscriptionFilter } from '../../@types/subscription.ts' +import { isGenericTagQuery } from '../../utils/filter.ts' +import { Sort } from '../../constants/base.ts' +import { toBuffer } from '../../utils/transform.ts' const EventSchema = new mongoose.Schema({ event_id: { @@ -58,6 +43,7 @@ const EventSchema = new mongoose.Schema({ event_deduplication: [mongoose.Schema.Types.Mixed], first_seen: { type: Date }, deleted_at: { type: Date }, + expires_at: { type: Number }, }) EventSchema.index({ 'event_id': 1 }, { @@ -73,21 +59,157 @@ EventSchema.index({ 'event_kind': 1 }, { EventSchema.index({ 'event_signature': 1 }, { background: true, }) +EventSchema.index({ 'event_created_at': 1 }, { + background: true, +}) EventSchema.index({ 'event_tags.0.0': 1 }, { background: true, + sparse: true, }) EventSchema.index({ 'event_tags.0.1': 1 }, { background: true, + sparse: true, +}) +EventSchema.index({ 'event_tags.1.0': 1 }, { + background: true, + sparse: true, +}) +EventSchema.index({ 'event_tags.1.1': 1 }, { + background: true, + sparse: true, }) EventSchema.index({ 'remote_address': 1 }, { background: true, }) +EventSchema.index({ 'expires_at': 1 }, { + background: true, + sparse: true, +}) +EventSchema.index({ 'deleted_at': 1 }, { + background: true, + sparse: true, +}) + +export const buildMongoFilter = ( + filters: SubscriptionFilter[], +) => { + const now = Math.floor(Date.now() / 1000) + const filterQueries = filters.map( + (filter) => { + const filterQuery: FilterQuery = {} + + if (filter?.ids?.length) { + filterQuery.event_id = { + $in: filter.ids.map(toBuffer), + } + } + + if (filter?.authors?.length) { + const authors = filter.authors.map(toBuffer) + filterQuery.$or = [ + { event_pubkey: { $in: authors } }, + { event_delegator: { $in: authors } }, + ] + } + + if (filter?.kinds?.length) { + filterQuery.event_kind = { $in: filter.kinds } + } + + if (filter?.since) { + filterQuery.event_created_at = { $gte: filter.since } + } + + if (filter?.until) { + if (!filterQuery.event_created_at) { + filterQuery.event_created_at = {} + } + filterQuery.event_created_at.$lte = filter.until + } + + const tagFilters = Object.entries(filter) + .filter(([filterName]) => isGenericTagQuery(filterName)) + .map(([tagName, tagValues]) => ({ + event_tags: { + $elemMatch: { + '0': tagName.slice(1), + '1': { $in: Array.isArray(tagValues) ? [...tagValues] : [] }, + }, + }, + })) + if (tagFilters.length > 0) { + filterQuery.$and = tagFilters + } + + if (!filterQuery.$and) { + filterQuery.$and = [] + } + + // deletion events + const deletionQueries = { + deleted_at: { + $eq: null, + }, + } + + filterQuery.$and.push({ + $or: [ + { + expires_at: { + $exists: true, + $gte: now, + }, + ...deletionQueries, + }, + { + expires_at: { + $eq: null, + $exists: true, + }, + ...deletionQueries, + }, + ], + }) + + return filterQuery + }, + ) + + return filterQueries.length === 1 ? filterQueries[0] : { + $or: filterQueries, + } +} + +EventSchema.static('findBySubscriptionFilter', function (filters: SubscriptionFilter[], maxLimit: number) { + const query = buildMongoFilter(filters) + const defaultLimit = 500 + let sort = Sort.ASC + let limit = Math.max(...filters.map((filter) => { + if (typeof filter.limit !== 'undefined') { + sort = Sort.DESC + } + return filter.limit ?? defaultLimit + })) + if (limit > maxLimit) { + limit = maxLimit + } + + return this.find(query).limit(limit).sort({ created_at: sort }) +}) + +EventSchema.static('countBySubscriptionFilter', function (filters: SubscriptionFilter[]) { + const query = buildMongoFilter(filters) + return this.countDocuments(query) +}) + +export const EventsModelName = 'Events' +export const EventsCollectionName = 'events' export const EventsModel = (dbClient: mongoose.Connection) => - dbClient.model( - 'Events', + dbClient.model( + EventsModelName, EventSchema, - 'events', + EventsCollectionName, ) export const masterEventsModel = EventsModel(getMasterDbClient()) diff --git a/src/database/models/InstanceStatus.ts b/src/database/models/InstanceStatus.ts deleted file mode 100644 index 11177a10..00000000 --- a/src/database/models/InstanceStatus.ts +++ /dev/null @@ -1,81 +0,0 @@ -import mongoose from 'npm:mongoose' - -import { getMasterDbClient } from '../client.ts' - -export interface InstanceStatusInput { - name: string - pid: number - extraInformation: { - host: string - port: string | number - tcpPort?: number - os: { - vendor: string - platform: string - arch: string - uptime: number - loadavg: number[] - totalmem: number - freemem: number - cpus: number - usageMemory: { - rss: number - heapTotal: number - heapUsed: number - external: number - } - } - denoVersion: string - } -} - -export interface InstanceStatusDocument extends InstanceStatusInput, mongoose.Document { - _createdAt: Date - _updatedAt: Date -} - -const InstanceStatusSchema = new mongoose.Schema({ - _id: String, - name: String, - pid: Number, - extraInformation: { - host: { - type: String, - required: true, - default: '127.0.0.1', - }, - port: { - type: mongoose.Schema.Types.Mixed, - required: true, - default: '8008', - }, - tcpPort: Number, - os: { - vendor: String, - platform: String, - arch: String, - uptime: Number, - loadavg: Array, - totalmem: Number, - freemem: Number, - cpus: Number, - usageMemory: { - rss: Number, - heapTotal: Number, - heapUsed: Number, - external: Number, - }, - }, - denoVersion: String, - }, -}) - -InstanceStatusSchema.index({ 'extraInformation.tcpPort': 1 }, { - background: true, -}) - -export const InstanceStatusModel = getMasterDbClient().model( - 'InstanceStatus', - InstanceStatusSchema, - 'instances', -) diff --git a/src/database/models/Users.ts b/src/database/models/Users.ts index 9cede161..861615a1 100644 --- a/src/database/models/Users.ts +++ b/src/database/models/Users.ts @@ -1,4 +1,4 @@ -import mongoose from 'npm:mongoose' +import mongoose from 'mongoose' import { getMasterDbClient, getReadReplicaDbClient } from '../client.ts' import { Buffer } from 'Buffer' @@ -33,11 +33,14 @@ UserSchema.index({ 'balance': 1 }, { background: true, }) +export const UsersModelName = 'Users' +export const UsersCollectionName = 'users' + export const UsersModel = (dbClient: mongoose.Connection) => dbClient.model( - 'Users', + UsersModelName, UserSchema, - 'users', + UsersCollectionName, ) export const masterUsersModel = UsersModel(getMasterDbClient()) diff --git a/src/database/models/index.ts b/src/database/models/index.ts deleted file mode 100644 index b288ac35..00000000 --- a/src/database/models/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './Events.ts' -export * from './Users.ts' diff --git a/src/database/types/IEvent.ts b/src/database/types/IEvent.ts deleted file mode 100644 index d6cca0a9..00000000 --- a/src/database/types/IEvent.ts +++ /dev/null @@ -1,18 +0,0 @@ -import type { Tag } from '../../@types/base.ts' -import type { IRecord } from './IRecord.ts' -import { Buffer } from 'Buffer' - -export interface IEvent extends IRecord { - event_id: Buffer - event_pubkey: Buffer - event_kind: number - event_created_at: number - event_content: string - event_tags: Tag[][] - event_signature: Buffer - event_delegator?: Buffer | null - event_deduplication?: (string | number)[] | null - first_seen: Date - deleted_at?: Date - expires_at?: number -} diff --git a/src/database/types/IInstanceStatus.ts b/src/database/types/IInstanceStatus.ts deleted file mode 100644 index a66ff91b..00000000 --- a/src/database/types/IInstanceStatus.ts +++ /dev/null @@ -1,29 +0,0 @@ -import type { IRecord } from './IRecord.ts' - -export interface IInstanceStatus extends IRecord { - name: string - pid: number - extraInformation: { - host: string - version: string - port: string | number - tcpPort: number - os: { - vendor: string - platform: string - arch: string - uptime: number - loadavg: number[] - totalmem: number - freemem: number - cpus: number - usageMemory: { - rss: number - heapTotal: number - heapUsed: number - external: number - } - } - denoVersion: string - } -} diff --git a/src/database/types/IRecord.ts b/src/database/types/IRecord.ts deleted file mode 100644 index 42685b03..00000000 --- a/src/database/types/IRecord.ts +++ /dev/null @@ -1,5 +0,0 @@ -export interface IRecord { - _id: string - created_at: Date - updated_at: Date -} diff --git a/src/database/types/IUser.ts b/src/database/types/IUser.ts deleted file mode 100644 index 2bddc70b..00000000 --- a/src/database/types/IUser.ts +++ /dev/null @@ -1,8 +0,0 @@ -import type { IRecord } from './IRecord.ts' -import { Buffer } from 'Buffer' - -export interface IUser extends IRecord { - pubkey: Buffer - is_admitted: boolean - balance: bigint -} diff --git a/src/database/types/index.ts b/src/database/types/index.ts deleted file mode 100644 index 12843f06..00000000 --- a/src/database/types/index.ts +++ /dev/null @@ -1,4 +0,0 @@ -export * from './IEvent.ts' -export * from './IInstanceStatus.ts' -export * from './IRecord.ts' -export * from './IUser.ts' diff --git a/src/database/watchCollections.ts b/src/database/watchCollections.ts index 6ab75738..5b6fd7c1 100644 --- a/src/database/watchCollections.ts +++ b/src/database/watchCollections.ts @@ -1,5 +1,5 @@ -import { masterEventsModel } from './models/index.ts' +import { EventsCollectionName } from './models/Events.ts' export default [ - masterEventsModel.collection.collectionName, + EventsCollectionName, ] diff --git a/src/database/watchers.ts b/src/database/watchers.ts index 50a34b2d..e1adb0bd 100644 --- a/src/database/watchers.ts +++ b/src/database/watchers.ts @@ -1,10 +1,10 @@ -import { ChangeStreamDocument } from 'npm:mongodb' -import mongoose from 'npm:mongoose' +import { ChangeStreamDocument } from 'mongodb' +import mongoose from 'mongoose' import type { EventSignatures } from '../core-services/index.ts' import { DatabaseWatcher } from './DatabaseWatcher.ts' -import { masterEventsModel } from './models/index.ts' -import type { IEvent } from './types/index.ts' +import { EventsCollectionName } from './models/Events.ts' +import type { DBEvent } from '../@types/event.ts' import { createLogger } from '../factories/logger-factory.ts' export type Watcher = ( @@ -12,7 +12,7 @@ export type Watcher = ( fn: (event: ChangeStreamDocument) => void | Promise, ) => void -export type ClientAction = 'inserted' | 'updated' | 'removed' | 'changed' +export type ClientAction = 'inserted' | 'updated' | 'removed' export type BroadcastCallback = ( event: T, @@ -25,10 +25,10 @@ export function initWatchers( watcher: DatabaseWatcher, broadcast: BroadcastCallback, ): void { - watcher.on(masterEventsModel.collection.collectionName, (event) => { + watcher.on(EventsCollectionName, (event) => { debug('events %o', event) const { clientAction, data, diff, id } = event - broadcast('WebSocketServer.broadcast', { clientAction, data, diff, id }) + broadcast('events.broadcast', { clientAction, data, diff, id }) }) } diff --git a/src/factories/core-services-factory.ts b/src/factories/core-services-factory.ts index 85c94d19..128b562f 100644 --- a/src/factories/core-services-factory.ts +++ b/src/factories/core-services-factory.ts @@ -1,11 +1,10 @@ /* eslint-disable @typescript-eslint/ban-ts-comment */ -// deno-lint-ignore-file ban-ts-comment import { api } from '../core-services/index.ts' import { getMasterDbClient, getReadReplicaDbClient } from '../database/client.ts' import { DatabaseWatcher } from '../database/DatabaseWatcher.ts' import { initWatchers } from '../database/watchers.ts' -export let watcher: DatabaseWatcher +// export let watcher: DatabaseWatcher export const coreServicesFactory = async () => { const primaryConn = getMasterDbClient() @@ -14,10 +13,8 @@ export const coreServicesFactory = async () => { const secondaryConn = getReadReplicaDbClient() await secondaryConn.asPromise() - watcher = new DatabaseWatcher({ + const watcher = new DatabaseWatcher({ db: primaryConn.db, - // @ts-ignore - _oplogHandle: primaryConn?._oplogHandle, }) watcher.watch().catch((err: Error) => { console.error(err, 'Fatal error occurred when watching database') diff --git a/src/factories/payments-processor-factory.ts b/src/factories/payments-processor-factory.ts index 65a8a675..5ee4b5dc 100644 --- a/src/factories/payments-processor-factory.ts +++ b/src/factories/payments-processor-factory.ts @@ -1,4 +1,4 @@ -import axios, { CreateAxiosDefaults } from 'npm:axios@1.2.6' +import axios, { CreateAxiosDefaults } from 'axios' import { path } from 'ramda' import { IPaymentsProcessor } from '../@types/clients.ts' diff --git a/src/handlers/event-strategies/default-event-strategy.ts b/src/handlers/event-strategies/default-event-strategy.ts index 06a92ea2..9cb2c19d 100644 --- a/src/handlers/event-strategies/default-event-strategy.ts +++ b/src/handlers/event-strategies/default-event-strategy.ts @@ -21,9 +21,5 @@ export class DefaultEventStrategy implements IEventStrategy WebSocketAdapterEvent.Message, createCommandResult(event.id, true, (count) ? '' : 'duplicate:'), ) - - if (count) { - this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event) - } } } diff --git a/src/handlers/event-strategies/delete-event-strategy.ts b/src/handlers/event-strategies/delete-event-strategy.ts index 36b94ba2..2d49b9bc 100644 --- a/src/handlers/event-strategies/delete-event-strategy.ts +++ b/src/handlers/event-strategies/delete-event-strategy.ts @@ -47,9 +47,5 @@ export class DeleteEventStrategy implements IEventStrategy> WebSocketAdapterEvent.Message, createCommandResult(event.id, true, (count) ? '' : 'duplicate:'), ) - - if (count) { - this.webSocket.emit(WebSocketAdapterEvent.Broadcast, event) - } } } diff --git a/src/handlers/event-strategies/parameterized-replaceable-event-strategy.ts b/src/handlers/event-strategies/parameterized-replaceable-event-strategy.ts index 98d6c812..31999762 100644 --- a/src/handlers/event-strategies/parameterized-replaceable-event-strategy.ts +++ b/src/handlers/event-strategies/parameterized-replaceable-event-strategy.ts @@ -33,9 +33,5 @@ export class ParameterizedReplaceableEventStrategy implements IEventStrategy Settings) {} - public findByFilters(filters: SubscriptionFilter[]): mongoose.Aggregate { + public findByFilters(filters: SubscriptionFilter[]): mongoose.Aggregate { debug('querying for %o', filters) if (!Array.isArray(filters) || !filters.length) { throw new Error('Filters cannot be empty') @@ -35,95 +33,8 @@ export class EventRepository implements IEventRepository { const subscriptionLimits = this.settings().limits?.client?.subscription const maxLimit = subscriptionLimits?.maxLimit ?? 0 - const $match: any = {} - const $or: any[] = [] - const $sort = { event_created_at: 1 } - const limit = { - $limit: maxLimit, - } - const pipelines: any[] = [ - { - $match, - }, - { - $sort, - }, - limit, - ] - - filters.forEach((currentFilter: SubscriptionFilter) => { - const subFilter: any = {} - const subFilterOr: any = [] - for (const [filterName, filterValue] of Object.entries(currentFilter)) { - const isGenericTag = isGenericTagQuery(filterName) - if (isGenericTag) { - if (Array.isArray(filterValue) && filterValue.length > 0) { - subFilterOr.push({ event_tags: { $size: 0 } }) - subFilterOr.push({ - event_tags: { - $elemMatch: { - $elemMatch: { - $in: [filterName[1], ...filterValue], - }, - }, - }, - }) - } - } else { - const fieldNames = ['kinds', 'limit', 'until', 'since'] - if (fieldNames.includes(filterName)) { - if (filterName === 'kinds' && Array.isArray(filterValue)) { - subFilter['event_kind'] = { $in: filterValue } - } - - if (filterName === 'since' && typeof filterValue === 'number') { - subFilter['event_created_at'] = { $gte: filterValue } - } - - if (filterName === 'until' && typeof filterValue === 'number') { - subFilter['event_created_at'] = { $lte: filterValue } - } - - if (filterName === 'limit' && typeof filterValue === 'number' && filterValue > 0) { - if (filterValue >= maxLimit) { - limit.$limit = maxLimit - } else { - limit.$limit = filterValue - } - $sort.event_created_at = -1 - } - } - } - } - - forEachObjIndexed( - (tableFields: string[], fieldName: any) => { - const filterValue = currentFilter[fieldName] - if (filterValue) { - tableFields.forEach((field: any) => { - subFilterOr.push({ [field]: { $in: filterValue.map(toBuffer) } }) - }) - } - }, - )({ - authors: ['event_pubkey', 'event_delegator'], - ids: ['event_id'], - }) - - if (subFilterOr.length > 0) { - subFilter['$or'] = subFilterOr - } - - if (Object.keys(subFilter).length > 0) { - $or.push(subFilter) - } - }) - - if ($or.length > 0) { - $match.$or = $or - } - - return readReplicaEventsModel.aggregate(pipelines) + // @ts-ignore: Model static method has been added + return readReplicaEventsModel.findBySubscriptionFilter(filters, maxLimit) } public async create(event: Event): Promise { @@ -171,7 +82,7 @@ export class EventRepository implements IEventRepository { public async upsert(event: Event): Promise { debug('upserting event: %o', event) - const row: IEvent = applySpec({ + const row: DBEvent = applySpec({ event_id: pipe(prop('id'), toBuffer), event_pubkey: pipe(prop('pubkey'), toBuffer), event_created_at: prop('created_at'), @@ -285,6 +196,7 @@ export class EventRepository implements IEventRepository { return ignoreUpdateConflicts(query) } } + async function ignoreUpdateConflicts(query: any) { try { const result = await query diff --git a/src/repositories/user-repository.ts b/src/repositories/user-repository.ts index 5ee62be5..249b83a6 100644 --- a/src/repositories/user-repository.ts +++ b/src/repositories/user-repository.ts @@ -2,7 +2,7 @@ import { Pubkey } from '../@types/base.ts' import { IUserRepository } from '../@types/repositories.ts' import { User } from '../@types/user.ts' import { Settings } from '../@types/settings.ts' -import { masterUsersModel } from '../database/models/index.ts' +import { masterUsersModel } from '../database/models/Users.ts' import { createLogger } from '../factories/logger-factory.ts' import { fromDBUser, toBuffer } from '../utils/transform.ts' diff --git a/src/tor/client.ts b/src/tor/client.ts index c719157e..d58d3b1b 100644 --- a/src/tor/client.ts +++ b/src/tor/client.ts @@ -1,8 +1,8 @@ -import { readFile, writeFile } from 'node:fs/promises' -import { homedir } from 'node:os' -import { join } from 'node:path' +import { readFile, writeFile } from 'fs/promises' +import { homedir } from 'os' +import { join } from 'path' -import { Tor } from 'npm:tor-control-ts@1.0.0' +import { Tor } from 'tor-control-ts' import { TorConfig } from '../@types/tor.ts' import Config from '../config/index.ts' diff --git a/src/utils/event.ts b/src/utils/event.ts index fe7e7684..aae32870 100644 --- a/src/utils/event.ts +++ b/src/utils/event.ts @@ -3,7 +3,7 @@ import { applySpec, converge, curry, mergeLeft, nth, omit, pipe, prop, reduceBy import * as secp256k1 from 'secp256k1' import { EventId, Pubkey, Tag } from '../@types/base.ts' -import { CanonicalEvent, Event, UnidentifiedEvent, UnsignedEvent } from '../@types/event.ts' +import { CanonicalEvent, DBEvent, Event, UnidentifiedEvent, UnsignedEvent } from '../@types/event.ts' import { EventKindsRange } from '../@types/settings.ts' import { SubscriptionFilter } from '../@types/subscription.ts' import Config from '../config/index.ts' @@ -14,7 +14,6 @@ import { getLeadingZeroBits } from './proof-of-work.ts' import { RuneLike } from './runes/rune-like.ts' import { deriveFromSecret } from './secret.ts' import { fromBuffer } from './transform.ts' -import { IEvent } from '../database/types/IEvent.ts' export const serializeEvent = (event: UnidentifiedEvent): CanonicalEvent => [ 0, @@ -25,7 +24,7 @@ export const serializeEvent = (event: UnidentifiedEvent): CanonicalEvent => [ event.content, ] -export const toNostrEvent: (event: IEvent) => Event = applySpec({ +export const toNostrEvent: (event: DBEvent) => Event = applySpec({ id: pipe(prop('event_id') as () => Buffer, fromBuffer), kind: prop('event_kind') as () => number, pubkey: pipe(prop('event_pubkey') as () => Buffer, fromBuffer), diff --git a/src/utils/settings.ts b/src/utils/settings.ts index 78ef140f..bbb9b21d 100644 --- a/src/utils/settings.ts +++ b/src/utils/settings.ts @@ -1,7 +1,7 @@ -import fs from 'node:fs' -import { extname, join } from 'node:path' +import fs from 'fs' +import { extname, join } from 'path' -import yaml from 'npm:js-yaml@4.1.0' +import yaml from 'js-yaml' import { mergeDeepRight } from 'ramda' import { Settings } from '../@types/settings.ts' diff --git a/src/utils/stream.ts b/src/utils/stream.ts index c376cbdd..021e3843 100644 --- a/src/utils/stream.ts +++ b/src/utils/stream.ts @@ -1,4 +1,4 @@ -import { PassThrough, Transform } from 'node:stream' +import { PassThrough, Transform } from 'stream' export const streamMap = (fn: (chunk) => any) => new Transform({ diff --git a/src/utils/transform.ts b/src/utils/transform.ts index cf826ed9..39fae439 100644 --- a/src/utils/transform.ts +++ b/src/utils/transform.ts @@ -1,4 +1,4 @@ -import { bech32 } from 'npm:bech32@2.0.0' +import { bech32 } from 'bech32' import { Buffer } from 'Buffer' import { always, applySpec, ifElse, is, isNil, path, pipe, prop, propSatisfies } from 'ramda' diff --git a/test/integration/features/helpers.ts b/test/integration/features/helpers.ts index 0bd22c32..cedde1b5 100644 --- a/test/integration/features/helpers.ts +++ b/test/integration/features/helpers.ts @@ -1,8 +1,8 @@ import { Buffer } from 'Buffer' import { createHash, createHmac } from 'crypto' -import { EventEmitter } from 'node:events' +import { EventEmitter } from 'events' -import { Observable } from 'npm:rxjs@7.8.0' +import { Observable } from 'rxjs' import * as secp256k1 from 'secp256k1' import { Event } from '../../../src/@types/event.ts' @@ -182,8 +182,7 @@ export async function waitForNextEvent( return new Promise((resolve, reject) => { const observable = streams.get(ws) as Observable - observable.subscribe(async (message: OutgoingMessage) => { - await new Promise((a) => setTimeout(a, 100)) + observable.subscribe((message: OutgoingMessage) => { if (message[0] === MessageType.EVENT && message[1] === subscription) { const event = message[2] as Event if (typeof content !== 'string' || event.content === content) { diff --git a/test/integration/features/shared.ts b/test/integration/features/shared.ts index e2ee04a3..a1833aba 100644 --- a/test/integration/features/shared.ts +++ b/test/integration/features/shared.ts @@ -1,9 +1,9 @@ // deno-lint-ignore-file import { Buffer } from 'Buffer' -import fs from 'node:fs' +import fs from 'fs' import { assocPath, pipe } from 'ramda' -import { fromEvent, map, Observable, ReplaySubject, Subject, takeUntil } from 'npm:rxjs@7.8.0' +import { fromEvent, map, Observable, ReplaySubject, Subject, takeUntil } from 'rxjs' import Sinon from 'sinon' import { afterAll, beforeAll, describe, it } from 'jest' import { DatabaseClient1 as DatabaseClient } from '../../../src/@types/base.ts' @@ -16,7 +16,9 @@ import { SettingsStatic } from '../../../src/utils/settings.ts' import type { IWorld } from './types.ts' import { connect, createIdentity, createSubscription, sendEvent, WebSocketWrapper } from './helpers.ts' import { masterEventsModel } from '../../../src/database/models/Events.ts' -import { resolve } from 'node:path' +import { api } from '../../../src/core-services/index.ts' +import { DatabaseWatcher } from '../../../src/database/DatabaseWatcher.ts' +import { initWatchers } from '../../../src/database/watchers.ts' export const isDraft = Symbol('draft') @@ -45,6 +47,7 @@ let worker: AppWorker let dbClient: DatabaseClient let rrDbClient: DatabaseClient +let watcher: DatabaseWatcher export const streams = new WeakMap>() @@ -132,9 +135,19 @@ export const startTest = async (pathUrl: string, registerEvent: Function) => { Config.RELAY_PORT = '18808' Config.SECRET = Math.random().toString().repeat(6) - rrDbClient = rrDbClient = getReadReplicaDbClient() + rrDbClient = getReadReplicaDbClient() await rrDbClient.asPromise() + watcher = new DatabaseWatcher({ + db: dbClient.db, + }) + watcher.watch().catch((err: Error) => { + console.error(err, 'Fatal error occurred when watching database') + Deno.exit(1) + }) + + initWatchers(watcher, api.broadcastLocal.bind(api)) + Sinon.stub(SettingsStatic, 'watchSettings') const settings = SettingsStatic.createSettings() @@ -153,6 +166,7 @@ export const startTest = async (pathUrl: string, registerEvent: Function) => { afterAll(async function () { worker.close(async () => { try { + await watcher.close() await Promise.all([ dbClient.destroy(true), rrDbClient.destroy(true), diff --git a/test/unit/handlers/delegated-event-message-handler.test.ts b/test/unit/handlers/delegated-event-message-handler.test.ts index f1e37978..1bc3bc45 100644 --- a/test/unit/handlers/delegated-event-message-handler.test.ts +++ b/test/unit/handlers/delegated-event-message-handler.test.ts @@ -1,4 +1,4 @@ -import EventEmitter from 'node:events' +import EventEmitter from 'events' import chai from 'chai' import chaiAsPromised from 'chai-as-promised' diff --git a/test/unit/handlers/event-message-handler.test.ts b/test/unit/handlers/event-message-handler.test.ts index f6ed41af..5e3e7006 100644 --- a/test/unit/handlers/event-message-handler.test.ts +++ b/test/unit/handlers/event-message-handler.test.ts @@ -1,4 +1,4 @@ -import EventEmitter from 'node:events' +import EventEmitter from 'events' import chai from 'chai' import chaiAsPromised from 'chai-as-promised' diff --git a/test/unit/handlers/event-strategies/default-event-strategy.test.ts b/test/unit/handlers/event-strategies/default-event-strategy.test.ts index a2640775..94937b5a 100644 --- a/test/unit/handlers/event-strategies/default-event-strategy.test.ts +++ b/test/unit/handlers/event-strategies/default-event-strategy.test.ts @@ -81,15 +81,15 @@ describe({ expect(eventRepositoryCreateStub).to.have.been.calledOnceWithExactly( event, ) - expect(webSocketEmitStub).to.have.been.calledTwice + // expect(webSocketEmitStub).to.have.been.calledTwice expect(webSocketEmitStub).to.have.been.calledWithExactly( WebSocketAdapterEvent.Message, [MessageType.OK, 'id', true, ''], ) - expect(webSocketEmitStub).to.have.been.calledWithExactly( - WebSocketAdapterEvent.Broadcast, - event, - ) + // expect(webSocketEmitStub).to.have.been.calledWithExactly( + // WebSocketAdapterEvent.Broadcast, + // event, + // ) }) it('does not broadcast event if event is duplicate', async () => { diff --git a/test/unit/handlers/event-strategies/delete-event-strategy.test.ts b/test/unit/handlers/event-strategies/delete-event-strategy.test.ts index d41c794c..a50e6394 100644 --- a/test/unit/handlers/event-strategies/delete-event-strategy.test.ts +++ b/test/unit/handlers/event-strategies/delete-event-strategy.test.ts @@ -130,15 +130,15 @@ describe({ expect(eventRepositoryCreateStub).to.have.been.calledOnceWithExactly( event, ) - expect(webSocketEmitStub).to.have.been.calledTwice + // expect(webSocketEmitStub).to.have.been.calledTwice expect(webSocketEmitStub).to.have.been.calledWithExactly( WebSocketAdapterEvent.Message, [MessageType.OK, 'id', true, ''], ) - expect(webSocketEmitStub).to.have.been.calledWithExactly( - WebSocketAdapterEvent.Broadcast, - event, - ) + // expect(webSocketEmitStub).to.have.been.calledWithExactly( + // WebSocketAdapterEvent.Broadcast, + // event, + // ) }) it('does not broadcast event if duplicate', async () => { diff --git a/test/unit/handlers/event-strategies/parameterized-replaceable-event-strategy.test.ts b/test/unit/handlers/event-strategies/parameterized-replaceable-event-strategy.test.ts index fbab5530..f403a46f 100644 --- a/test/unit/handlers/event-strategies/parameterized-replaceable-event-strategy.test.ts +++ b/test/unit/handlers/event-strategies/parameterized-replaceable-event-strategy.test.ts @@ -116,15 +116,15 @@ describe({ expect(eventRepositoryUpsertStub).to.have.been.calledOnceWithExactly( event, ) - expect(webSocketEmitStub).to.have.been.calledTwice + // expect(webSocketEmitStub).to.have.been.calledTwice expect(webSocketEmitStub).to.have.been.calledWithExactly( WebSocketAdapterEvent.Message, [MessageType.OK, 'id', true, ''], ) - expect(webSocketEmitStub).to.have.been.calledWithExactly( - WebSocketAdapterEvent.Broadcast, - event, - ) + // expect(webSocketEmitStub).to.have.been.calledWithExactly( + // WebSocketAdapterEvent.Broadcast, + // event, + // ) }) it('does not broadcast event if event is duplicate', async () => { diff --git a/test/unit/handlers/event-strategies/replaceable-event-strategy.test.ts b/test/unit/handlers/event-strategies/replaceable-event-strategy.test.ts index 92b8b31b..7ddd24f9 100644 --- a/test/unit/handlers/event-strategies/replaceable-event-strategy.test.ts +++ b/test/unit/handlers/event-strategies/replaceable-event-strategy.test.ts @@ -82,15 +82,15 @@ describe({ expect(eventRepositoryUpsertStub).to.have.been.calledOnceWithExactly( event, ) - expect(webSocketEmitStub).to.have.been.calledTwice + // expect(webSocketEmitStub).to.have.been.calledTwice expect(webSocketEmitStub).to.have.been.calledWithExactly( WebSocketAdapterEvent.Message, [MessageType.OK, 'id', true, ''], ) - expect(webSocketEmitStub).to.have.been.calledWithExactly( - WebSocketAdapterEvent.Broadcast, - event, - ) + // expect(webSocketEmitStub).to.have.been.calledWithExactly( + // WebSocketAdapterEvent.Broadcast, + // event, + // ) }) it('does not broadcast event if event is duplicate', async () => { diff --git a/test/unit/handlers/subscribe-message-handler.test.ts b/test/unit/handlers/subscribe-message-handler.test.ts index 36edda4b..e2d81679 100644 --- a/test/unit/handlers/subscribe-message-handler.test.ts +++ b/test/unit/handlers/subscribe-message-handler.test.ts @@ -1,5 +1,5 @@ -import EventEmitter from 'node:events' -import { PassThrough } from 'node:stream' +import EventEmitter from 'events' +import { PassThrough } from 'stream' import { Buffer } from 'Buffer' import chai from 'chai' diff --git a/test/unit/tor/onion.test.ts b/test/unit/tor/onion.test.ts index 0a788ba8..85333583 100644 --- a/test/unit/tor/onion.test.ts +++ b/test/unit/tor/onion.test.ts @@ -1,10 +1,10 @@ -// import fs from 'node:fs/promises' -// import { hostname } from 'node:os' +// import fs from 'fs/promises' +// import { hostname } from 'os' // import { expect } from 'chai' // import { afterEach, beforeEach, describe, it } from 'jest' // import Sinon from 'sinon' -// import { hiddenService, Tor } from 'npm:tor-control-ts@1.0.0' +// import { hiddenService, Tor } from 'tor-control-ts' // import Config from '../../../src/config/index.ts' // import { addOnion, closeTorClient, createTorConfig, getTorClient } from '../../../src/tor/client.ts' diff --git a/test/unit/utils/settings.test.ts b/test/unit/utils/settings.test.ts index 25a581bf..05a33211 100644 --- a/test/unit/utils/settings.test.ts +++ b/test/unit/utils/settings.test.ts @@ -1,5 +1,5 @@ -import fs from 'node:fs' -import { join } from 'node:path' +import fs from 'fs' +import { join } from 'path' import chai, { expect } from 'chai' import { afterEach, beforeEach, describe, it } from 'jest'