Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Commit

Permalink
Postgres ingestion + process event tests (#116)
Browse files Browse the repository at this point in the history
* start with the postgres event ingestion process event tests

* get first test to work

* remove siteUrl

* pass partial event to test that it still works and retain parity with the django test

* another test

* refactor

* add more tests

* opt out of posthog in test mode

* add first alias test

* always use UTC times when talking to postgres

* prevent a crash

* bit of clarity to help debug

* fix bug with table name

* fix bug with passing object instead of id

* add some alias tests (all green now)

* save merged properties

* few more tests

* more missing tests

* fix test

* team event properties test

* fix bug

* another test (partial)

* clarify postgres magic

* different timestamp format for creating event in clickhouse & postgresql

* make element tests fail

* capture first team event test

* insert session recording events

* generate element hashes

* create elements and element groups

* "key in object" only works with objects, not arrays

* test an extra thing

* add missing awaits that caused things to be done out of order

* few extra tests

* another test

* await for things to happen

* fix to work with latest master

* client is called twice - it's initialized for sending celery tasks on webhooks as well

* check webhook celery client queue

* split into postgres & shared process event test

* add query counter

* clickhouse process event tests v0.1

* fix vm test

* fix "Your test suite must contain at least one test." error for shared tests

* also run non-ingestion tests under test:postgres

* Clean up utils.ts

* Clean up TimestampFormat

* Put get* type Postgres functions in DB class

Co-authored-by: Michael Matloka <[email protected]>
  • Loading branch information
mariusandra and Twixes authored Feb 1, 2021
1 parent 9856cd4 commit 3090e4a
Show file tree
Hide file tree
Showing 15 changed files with 1,502 additions and 128 deletions.
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
"main": "dist/src/index.js",
"scripts": {
"test": "jest --runInBand tests/**/*.test.ts",
"test:postgres": "yarn test --testPathIgnorePatterns '.*/clickhouse'",
"test:clickhouse": "yarn test --testPathIgnorePatterns '.*/postgres'",
"test:postgres": "jest --runInBand tests/postgres/*.test.ts tests/*.test.ts",
"test:clickhouse": "jest --runInBand tests/clickhouse/*.test.ts",
"benchmark": "node --expose-gc node_modules/.bin/jest --runInBand benchmarks/",
"start": "yarn start:dev",
"start:dist": "node dist/src/index.js --base-dir ../posthog",
Expand All @@ -23,7 +23,9 @@
"prettier": "prettier --write .",
"prettier:check": "prettier --check .",
"prepare": "yarn compile:protobuf",
"prepublishOnly": "yarn build"
"prepublishOnly": "yarn build",
"setup:dev": "cd ../posthog && (dropdb test_posthog || echo 'no db to drop') && createdb test_posthog && source env/bin/activate && DATABASE_URL=postgres://localhost:5432/test_posthog DEBUG=1 python manage.py migrate",
"setup:dev:ee": "export DEBUG=1 PRIMARY_DB=clickhouse DATABASE_URL=postgres://posthog:posthog@localhost:5439/test_posthog PGPASSWORD=posthog && cd ../posthog && (dropdb -p 5439 -h localhost -U posthog test_posthog || echo 'no db to drop') && createdb -p 5439 -h localhost -U posthog test_posthog && source env/bin/activate && python manage.py migrate && python manage.py migrate_clickhouse"
},
"bin": {
"posthog-plugin-server": "bin/posthog-plugin-server"
Expand Down
70 changes: 63 additions & 7 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@ import { DateTime } from 'luxon'
import { Pool, QueryConfig, QueryResult, QueryResultRow } from 'pg'
import { KAFKA_PERSON, KAFKA_PERSON_UNIQUE_ID } from './ingestion/topics'
import { unparsePersonPartial } from './ingestion/utils'
import { Person, PersonDistinctId, RawPerson, RawOrganization } from './types'
import { castTimestampOrNow, sanitizeSqlIdentifier } from './utils'
import {
Person,
PersonDistinctId,
RawPerson,
RawOrganization,
Team,
PostgresSessionRecordingEvent,
Event,
} from './types'
import { castTimestampOrNow, sanitizeSqlIdentifier, UUIDT } from './utils'

/** The recommended way of accessing the database. */
export class DB {
Expand All @@ -30,6 +38,13 @@ export class DB {
return this.postgres.query(queryTextOrConfig, values)
}

// Person

public async fetchPersons(): Promise<Person[]> {
const result = await this.postgresQuery('SELECT * FROM posthog_person')
return result.rows as Person[]
}

public async fetchPerson(teamId: number, distinctId: string): Promise<Person | undefined> {
const selectResult = await this.postgresQuery(
`SELECT
Expand All @@ -45,8 +60,10 @@ export class DB {
AND posthog_persondistinctid.distinct_id = $2`,
[teamId, distinctId]
)
const rawPerson: RawPerson = selectResult.rows[0]
return { ...rawPerson, created_at: DateTime.fromISO(rawPerson.created_at) }
if (selectResult.rows.length > 0) {
const rawPerson: RawPerson = selectResult.rows[0]
return { ...rawPerson, created_at: DateTime.fromISO(rawPerson.created_at) }
}
}

public async createPerson(
Expand All @@ -55,7 +72,8 @@ export class DB {
teamId: number,
isUserId: number | null,
isIdentified: boolean,
uuid: string
uuid: string,
distinctIds?: string[]
): Promise<Person> {
const insertResult = await this.postgresQuery(
'INSERT INTO posthog_person (created_at, properties, team_id, is_user_id, is_identified, uuid) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *',
Expand All @@ -75,16 +93,22 @@ export class DB {
messages: [{ value: Buffer.from(JSON.stringify(data)) }],
})
}

for (const distinctId of distinctIds || []) {
await this.addDistinctId(personCreated, distinctId)
}

return personCreated
}

public async updatePerson(person: Person, update: Partial<Person>): Promise<Person> {
const updatedPerson: Person = { ...person, ...update }
const values = [...Object.values(unparsePersonPartial(update)), person.id]
await this.postgresQuery(
`UPDATE posthog_person SET ${Object.keys(update).map(
(field, index) => sanitizeSqlIdentifier(field) + ' = $' + (index + 1)
)} WHERE id = $${Object.values(update).length + 1}`,
[...Object.values(unparsePersonPartial(update)), person.id]
values
)
if (this.kafkaProducer) {
const data = {
Expand All @@ -103,7 +127,7 @@ export class DB {
}

public async deletePerson(personId: number): Promise<void> {
await this.postgresQuery('DELETE FROM person_distinct_id WHERE person_id = $1', [personId])
await this.postgresQuery('DELETE FROM posthog_persondistinctid WHERE person_id = $1', [personId])
await this.postgresQuery('DELETE FROM posthog_person WHERE id = $1', [personId])
if (this.clickhouse) {
await this.clickhouse.query(`ALTER TABLE person DELETE WHERE id = ${personId}`).toPromise()
Expand All @@ -113,6 +137,16 @@ export class DB {
}
}

// PersonDistinctId

public async fetchDistinctIdValues(person: Person): Promise<string[]> {
const result = await this.postgresQuery(
'SELECT * FROM posthog_persondistinctid WHERE person_id=$1 and team_id=$2 ORDER BY id',
[person.id, person.team_id]
)
return (result.rows as PersonDistinctId[]).map((pdi) => pdi.distinct_id)
}

public async addDistinctId(person: Person, distinctId: string): Promise<void> {
const insertResult = await this.postgresQuery(
'INSERT INTO posthog_persondistinctid (distinct_id, person_id, team_id) VALUES ($1, $2, $3) RETURNING *',
Expand Down Expand Up @@ -146,11 +180,33 @@ export class DB {
}
}

// Organization

public async fetchOrganization(organizationId: string): Promise<RawOrganization | undefined> {
const selectResult = await this.postgresQuery(`SELECT * FROM posthog_organization WHERE id $1`, [
organizationId,
])
const rawOrganization: RawOrganization = selectResult.rows[0]
return rawOrganization
}

// Event

public async fetchEvents(): Promise<Event[]> {
const result = await this.postgresQuery('SELECT * FROM posthog_event')
return result.rows as Event[]
}

// SessionRecordingEvent

public async fetchSessionRecordingEvents(): Promise<PostgresSessionRecordingEvent[]> {
const result = await this.postgresQuery('SELECT * FROM posthog_sessionrecordingevent')
return result.rows as PostgresSessionRecordingEvent[]
}

// Element

public async fetchElements(): Promise<Element[]> {
return (await this.postgresQuery('SELECT * FROM posthog_element')).rows
}
}
Loading

0 comments on commit 3090e4a

Please sign in to comment.