-
Notifications
You must be signed in to change notification settings - Fork 5
Conversation
src/ingestion/kafka-queue.ts
Outdated
for (const removedOffset of offsetMap.values()) { | ||
resolveOffset(removedOffset) | ||
await heartbeat() | ||
await commitOffsetsIfNecessary() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes almost nothing from the current situation, as it only resolves offsets of events that were discarded after the offsets of not discarded ones. It doesn't make much sense to resolve offsets completely out of order like this, only the last resolveOffset
in this will even actually matter. This is just
resolveOffset(rawEvents[rawEvents.length-1].kafka_offset)
await commitOffsetsIfNecessary()
with extra steps.
The reason for doing resolveOffset
is to keep track of processed events INSIDE the for (const event of processedEvents)
loop, so that if we have, say, 5 events, and for some reason the plugin server exits while processing event no. 4, only managing to save 1, 2, and 3 to CH, it will know to then restart from event no. 4. The problem is, if in the current case if events 2 and 3 are discarded, neither of their offsets will be resolved, and after restarting the server will start processing with event no. 2, in effect processing 2 and 3 twice.
This change doesn't improve that. It only fixes the problem in between batches if the last n events of the batch are discarded, which #123 handles too, except more deliberately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So actually we should just enable eachBatchAutoResolve: true
? From the docs:
eachBatchAutoResolve
configures auto-resolve of batch processing. If set to true, KafkaJS will automatically commit the last offset of the batch ifeachBatch
doesn't throw an error. Default: true.
Am I missing something? I see you had a long comment there about some dragons ahead.
The way understand it, If we crash, it'll commit whatever was marked with resolveOffset
. Otherwise it'll always commit the last offset of the batch, even if the last event was discarded. Sounds like what we need? In the worst case, when crashing at event "6" in the sequence "1", "2", "skip", "skip", "skip", "6", it'll retry events 3-5... but it's highly likely they will be skipped again... and I'm OK with more straightforward code here vs more accurate skipping. In practice, most batches contain just an event or two anyway.
Both #123 and this implemented one thing wrong though. The events might be returned from await this.processEventBatch
in any order (and indeed with random events added to the mix). They could just be reversed. I now added some logic to ingest the events in the original order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh and holy macaroni, all tests are green??
const processedEvents = await this.processEventBatch(pluginEvents) | ||
|
||
// Sort in the original order that the events came in, putting any randomly added events to the end. | ||
// This is so we would resolve the correct kafka offsets in order. | ||
processedEvents.sort( | ||
(a, b) => (uuidOrder.get(a.uuid!) || pluginEvents.length) - (uuidOrder.get(b.uuid!) || pluginEvents.length) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice that the tacked on events are last!
Though as for the non-null assertion, as far as I understand, processEventBatch
can return an array of practically anything, including non-objects falsy or truthy. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should always return this or crash: https://github.com/PostHog/plugin-server/pull/124/files#diff-5d37292cf21755714262793e7553ee317a12b157d0601e27a0ca26e096a60c00L267
Though, yeah, there's no real sanitization. The plugins could return just { "n00b": "lololololol" }
as an event and it would make its way here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose the get
is going to return undefined
in any such case, so this should be fine even if the assertion is too optimistic.
Side note, I think we make this assumption in some more dangerous places too, so probably will have to take a look at some event structure validation in another PR.
// TODO: eachBatchAutoResolve: false, // don't autoresolve whole batch in case we exit it early | ||
// The issue is right now we'd miss some messages and not resolve them as processEventBatch COMPETELY | ||
// discards some events, leaving us with no kafka_offset to resolve when in fact it should be resolved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment was a bit more cryptic than it should have been, but the concern was with potentially resolving offsets for unsaved events in two cases:
- if we return early because of the
isRunning
check - however now my understanding is that if the consumer is not running, then it won't be possible to commit offsets, not completely certain though - if we return early because of the
isStale
check - may happen ifconsumer.seek()
or something like that is used to move partition offset somewhere, this is a bit more concerning, but also I'm unsure about the specifics of such a situation
In the end though, just to control this behavior without any doubt (and we should always prefer at-least-once over at-most-once), I'd rather uncomment this (setting eachBatchAutoResolve
to false
) and instead just resolve the offset deliberately.
Here's the relevant comment from KafkaJS source:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
uuidOrder.set(event.uuid, index) | ||
uuidOffset.set(event.uuid, message.offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so this still doesn't address cases like: we've got events a
, b
, c
; b
and c
gets discarded; we never resolve b
's and c
's offsets; in case of restart b
and c
, get processed again. But then also it's true that plugins can return whatever the hell they want in any order, so no method can be completely trusted here (though in most cases there won't be shenanigans). Even sorting by UUIDs (which, the ones generated by us, are time-sortable with UUIDT), the uuid
property may be randomized, so… Alright, whatever. In any case it's better to process an event at-least-once than at-most-once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this can go deep...
* Integrate fastify-postgres and fastify-kafka * Explicitly add node-rdkafka for its types * Port UUIDT from Python * Consume events from Kafka * Update yarn.lock * Port handle_timestamp * Enhance startFastifyInstance with config * Prettier * Handle dates with Luxon for similarity to Python's datetime * Fix UUIDT stringification * Port _capture_ee * Create EventsProcessor with plugins server access * Rework ingestion to separate from Fastify, port over more Django * Update utils.test.ts * Remove fastify-kafka and fastify-postgres * Add protobuf * Add types * Update yarn.lock * Improve info * Update server.ts * Port element functions * Use Kafka producer stream and make overall optimizations * Add session recording * Add plugin processing * Fix castTimestampOrNow * Update yarn.lock * Improve typing and consume event messages in batches of 100 * Improve code clarity * Add timing with StatsD * Update style * Format * Merge and alias * Are last stuff * Reimplement Protobuf with protos compiled to JS+TS * Fix things * Fix UUID#toString implementation * Update UUID number handling * Unify * Prettier * Unify utils * Add newline * Prettier but correctly * Fix types * Improve types * Bump version to 0.5.0 * Update worker.test.ts * Update types.ts * Update yarn.lock * Update compile:protobuf with eslint --fix * Fix typing * Don't bump version * Fix style * Use @posthog/node-rdkafka for Worker thread support * Improve logging * Unify Redis and Kafka approach to queuing events * Fix consuming from Kafka * Make some optimizations * Don't introduce KAFKA_EVENTS_HANDOFF * Consume 1 kiloevent / 50 ms = 20 kiloevents / s * Don't start web server by default We don't need it at all yet. * Improve Redis logging * Fix connecting to dockerized Redis No idea why this fixes the issue, but it does. * Update yarn.lock * Fix merge * Clear Kafka consumption interval on graceful exit * Improve logging and error handling * Smooth out ingestion errors * Fix StatsD prefixing * Move @types/luxon to devDependencies * Use Kafka.js Producer instead of node-rdkafka * Use EventProto#encodeDelimited instead of #encode * Start UUIDT series from 0 instead of 1 * Use event UUID from Django server * Make some fixes and improvements * Remove console.logs * Use plugin-scaffold 0.2.7 * Fix style camelCase FTW * Simplify compile:typescript * await startQueue * Clean up castTimestampOrNow * Don't use pesky ||= * Change consumer group to 'group1' from main repo * Revert "await startQueue" This reverts commit 1ca29ba. * Exit if queue failed to start * Clean up elements handling * Don't commit compiled protos * Rename consumer group to clickhouse-ingestion * Consume from topic events_ingestion_handoff * ee dev script * Backport set_once and event name sanitization * Add direct ClickHouse support base * Upgrade plugin-scaffold to 0.2.8 * Add clickhouse to server object * Support ClickHouse persons and person distinct IDs * Update README with ClickHouse settings * Fix sanitizeEventName * Await queue start * Prettier README * Try ClickHouse CI skeleton * Update yarn.lock * Possibly fix CI * Revert basic tests job to old name * Cache pip install * Fix ClickHouse teardown and add Yarn installation * Await startQueue in tests * Try existing test in new workflow * Don't cache pip requirements * Add Redis to CH CI * Make some env var changes * Try test_default as Django test DB name * Try test_posthog as Django test DB name * Don't tear down ClickHouse * Make env vars common to all tests-clickhouse steps * Debug Postgres * Debug Postgres actually * Revert "Make env vars common to all tests-clickhouse steps" This reverts commit 07bfd29. * Try fixing DB name discrepancy * Make some env vars common * Debug Postgres better * Remove Postgres debug code * Fix inserting * Fix posthog_team.app_urls * Catch insertRow errors * Add plugin.organization_id in base tests * DELETE FROM posthog_organization * Rework CI to use Django everywhere * Reorder tests for readability * Fix missing KAFKA_HOSTS in CI * Optimize createPosthog * Start separating Postgres- and ClickHouse-based suites * Fix regexes possibly * Fix regexes actually possibly * Improve fatal error handling * Fix type * Fix import * Debug Kafka queue crash * Debug consumer * Fix organizationId * Make some more suites Postgres-specific * Check out master branch of main repo of Django server in CI * Start schedule before queue * Clean up logging * Update Dockerfile * Refactor Postgres processing to DB class * Use more of DB methods * Quit Redis * Update test database name * Debug consumer * Don't mock KafkaJS! * Add new Kafka testing system incomplete * Clean up new testing system * Make prettier compatible with ESLint * Add compatibility with Plugin.latest_tag in test * Fix minor issues * Try not terminating program on Kafka consumer crash * Change KAFKA_ADVERTISED_HOST_NAME to localhost * Address feedback * Separate Postgres and DB classes * Consume from kafka_events topic in KafkaObserver * Fix process-event test file name * Check for UUID in handed off events * Return from EventsProcessor.processEvent * use `db.postgresQuery` instead of vague `db.query` * Run the plugins suite universally * Fix closeServer * javaScriptVariableCase * Capture handleTimestamp error with Sentry * Hand off message in test * Fix plugins suite imports * Run the vm suite universally * more javaScriptVariableCase * disconnect without waiting for pending replies (.diconnect() instead of .quit()) * dot quit seems to work better? * Add sanitizeSqlIdentifier * Fix updating person properties * Fix problems * Don't use status in piscina.js * Revert "Run the vm suite universally" This reverts commit 966056f. * Fix sanitizeSqlIdentifier test * Use waitForProcessedMessages * Optimize updatePersonProperties * Rework DummyPostHog to capturing internal server events efficiently * Fix nested test files not being ran * Run prettier over Markdown * Fix mock paths * Fix issues * Fix some * Don't truncate Kafka tables * Start kafkaObserver inside test * Support Team.is_demo * Increase process-event tests timeout to 60 s * Cache Python location in ClickHouse CI * Debug KafkaObserver * Cache pip in all CI jobs * Add bash script for running tests locally * Try out a different way of watching Kafka messages * Simplify DummyPostHog back again * Use posthog-js-lite in EventsProcessor * Sanitize SQL identifiers more strictly Remove all characters in identifiers that are neither letter, digit or underscore. * Don't resolve KafkaObserver start before connection * Add ee.tasks.webhooks_ee.post_event_to_webhook_ee * Expect Client to have been called twice * Adjust resetTestDatabase for Organization.personalization * Update Cliient call asserts * Postgres ingestion + process event tests (#116) * start with the postgres event ingestion process event tests * get first test to work * remove siteUrl * pass partial event to test that it still works and retain parity with the django test * another test * refactor * add more tests * opt out of posthog in test mode * add first alias test * always use UTC times when talking to postgres * prevent a crash * bit of clarity to help debug * fix bug with table name * fix bug with passing object instead of id * add some alias tests (all green now) * save merged properties * few more tests * more missing tests * fix test * team event properties test * fix bug * another test (partial) * clarify postgres magic * different timestamp format for creating event in clickhouse & postgresql * make element tests fail * capture first team event test * insert session recording events * generate element hashes * create elements and element groups * "key in object" only works with objects, not arrays * test an extra thing * add missing awaits that caused things to be done out of order * few extra tests * another test * await for things to happen * fix to work with latest master * client is called twice - it's initialized for sending celery tasks on webhooks as well * check webhook celery client queue * split into postgres & shared process event test * add query counter * clickhouse process event tests v0.1 * fix vm test * fix "Your test suite must contain at least one test." error for shared tests * also run non-ingestion tests under test:postgres * Clean up utils.ts * Clean up TimestampFormat * Put get* type Postgres functions in DB class Co-authored-by: Michael Matloka <[email protected]> * postgres e2e ingestion test (#120) * Kafka e2e tests (#121) * kafka e2e test * kafka e2e test * kafka host from env * get some ch process event tests working * more working clickhouse tests * wait a bit longer * add chainToElements, fix elementsToString bug * remove quotes from inside sanitizeSqlIdentifier to also work with clickhouse * split dev setup command * fetch elements from clickhouse * more elements * bugfix * refactor and reuse wait function * ingest kafka events with the right structure * remove leftover * fix clickhouse timestamp * simplify process event creation and call the methods directly * fix uuid test * refactor delayed event fetching to support session recording events * catch bad uuids * wait for session recording events in test * use right timestamp for session recording events * use the same database as posthog (the app) * use local db * deserialize clickhouse session recording events * split dev scripts * try to make tests work by specifying db * increase kafka log level * cleanup * pass idl protos to clickhouse in github actions * start the clickhouse container in another step * let's try like this * WIP * WIP * sudo * also alias zookeeper * export zookeeper * use docker-compose.ch.yml * detached * element group test * create tests * debug test * remove some redundancy * reduce some noise * try to make topics * compatible with posthog migration 0122 * hide error * try to close e2e open handles * reuse kafkaProducer on server * Add DB.clickhouseQuery * Put isUUIDFormat on the UUID class Co-authored-by: Michael Matloka <[email protected]> * Update README.md with PLUGIN_SERVER_INGESTION * rename CLICKHOUSE_USERNAME --> CLICKHOUSE_USER for consistency with the rest of the app * Delete run-tests-locally.sh * Resolve all kafka offsets (#124) * resolve all kafka offsets * remove a few lines * clean up * sort events according to offsets * add sort info * simplify offset line * add comment * Type pluginEvents as PluginEvent[] * Commit last offset of batch manually Co-authored-by: Michael Matloka <[email protected]> * Add KAFKA_INCOMING_TOPIC and clean up unused code (#125) * ingest to clickhouse only if PLUGIN_SERVER_INGESTION is enabled * KAFKA_INCOMING_TOPIC * test clickhouse connection * remove some unused code * Replace _INCOMING_ with _CONSUMPTION_ * Add KAFKA_CONSUMPTION_TOPIC to README.md * add // Co-authored-by: Michael Matloka <[email protected]> * Postgres parity tests (#126) * create postgres parity tests, fix some bugs * create all topics * person and distinct id parity tests * add TODOs * fetch distinct ids from clickhouse * create a specialised function for moving distinct ids and fix postgres/clickhouse person_id difference (number vs string) * test for updating distinct ids * createPerson, updatePerson and deletePerson * add a debug line to help debug flaky github action * remove falsehood * Remove "_handoff" from Kafka topic (#127) * remove _HANDOFF from topic * add plugin_ to plugin server ingestion topic * User merge test (#129) * merge test and postgres query simplification * postgres fix * remove "_HANDOFF" * small comment * Switch ClickHouse driver (#128) * Use driver @PostHog/clickhouse instead of clickhouse * Update CH querying * Fix clickhouseQuery usage * Don't quote 64-bit ints in JSON from CH Co-authored-by: Marius Andra <[email protected]>
Changes
Checklist