From bf9f7a0f10463cc3e2fa353e7986d34ef2f819cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 7 Oct 2024 06:33:11 +0000 Subject: [PATCH 1/7] Rewrote conversation creation to fix some bugs and to use ILP to update relavant activities --- .../apps/activities_worker/src/activities.ts | 7 +- .../src/activities/createConversations.ts | 235 +++++++++++------- .../src/workflows/createConversations.ts | 4 +- .../data-access-layer/src/activities/ilp.ts | 6 +- .../data-access-layer/src/activities/sql.ts | 91 ++++--- .../src/conversations/ilp.ts | 6 +- 6 files changed, 199 insertions(+), 150 deletions(-) diff --git a/services/apps/activities_worker/src/activities.ts b/services/apps/activities_worker/src/activities.ts index cad0a0e97..c35892bd6 100644 --- a/services/apps/activities_worker/src/activities.ts +++ b/services/apps/activities_worker/src/activities.ts @@ -1,6 +1,3 @@ -import { - createConversations, - linkActivitiesToConversations, -} from './activities/createConversations' +import { createConversations } from './activities/createConversations' -export { createConversations, linkActivitiesToConversations } +export { createConversations } diff --git a/services/apps/activities_worker/src/activities/createConversations.ts b/services/apps/activities_worker/src/activities/createConversations.ts index 13468da30..1902e9346 100644 --- a/services/apps/activities_worker/src/activities/createConversations.ts +++ b/services/apps/activities_worker/src/activities/createConversations.ts @@ -1,140 +1,191 @@ -import { generateUUIDv4 } from '@crowd/common' -import { PlatformType } from '@crowd/types' -import { insertConversations } from '@crowd/data-access-layer' +import { generateUUIDv4, partition } from '@crowd/common' import { ConversationService } from '@crowd/conversations' +import { + ALL_COLUMNS_TO_SELECT, + insertActivities, + insertConversations, + mapActivityRowToResult, +} from '@crowd/data-access-layer' import { IDbConversationCreateData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/conversation.data' import { svc } from '../main' -type row = { - tenantId: string - segmentId: string - conversationId: string | null - child_id: string - child_sourceId: string - child_sourceParentId: string - child_conversationId: string | null - child_title: string | null - child_attributes: object - child_timestamp: string - parent_id: string - parent_sourceId: string - parent_sourceParentId: string | null - parent_conversationId: string | null - parent_platform: PlatformType - parent_title: string | null - parent_body: string - parent_attributes: object - parent_timestamp: string -} +/* eslint-disable @typescript-eslint/no-explicit-any */ const conversationService = new ConversationService(svc.postgres.writer, svc.questdbSQL, svc.log) -export async function createConversations(): Promise> { +export interface ICreateConversationsResult { + conversationsCreated: number + activitiesAddedToConversations: number +} + +export async function createConversations(): Promise { // Find all activities, and their parent activities - const rows: row[] = await svc.questdbSQL.query(` + const query = ` WITH activities_to_check_for_parentId AS ( - SELECT - id, - sourceId, - sourceParentId, - conversationId, - title, - attributes, - timestamp, - createdAt + SELECT * FROM activities child - WHERE sourceParentId IS NOT NULL + WHERE deletedAt IS NULL + AND sourceParentId IS NOT NULL AND conversationId IS NULL - AND createdAt >= dateadd('d', -1, now()) + AND createdAt >= dateadd('M', -1, now()) ) - SELECT - parent.tenantId AS tenantId, - parent.segmentId AS segmentId, conversation.id AS conversationId, - child.id AS child_id, - child.sourceId AS child_sourceId, - child.sourceParentId AS child_sourceParentId, - child.conversationId AS child_conversationId, - child.title AS child_title, - child.attributes AS child_attributes, - parent.id AS parent_id, - parent.sourceId AS parent_sourceId, - parent.sourceParentId AS parent_sourceParentId, - parent.conversationId AS parent_conversationId, - parent.platform AS parent_platform, - parent.title AS parent_title, - parent.body AS parent_body, - parent.attributes AS parent_attributes, - parent.timestamp AS parent_timestamp + ${ALL_COLUMNS_TO_SELECT.map((c) => `parent.${c} as parent_${c}`).join(', \n')}, + ${ALL_COLUMNS_TO_SELECT.map((c) => `child.${c} as child_${c}`).join(', \n')} FROM activities parent JOIN activities_to_check_for_parentId child ON parent.sourceId = child.sourceParentId LEFT JOIN conversations conversation ON parent.conversationId = conversation.id ORDER BY child.createdAt ASC LIMIT 5000; - `) + ` + const rows: any[] = await svc.questdbSQL.query(query) // For all rows found, store the conversation created for the source parent ID. const conversationsToCreate: Record = {} - const activitiesToLinkToConversation: Record = {} - for (const row of rows) { - if (row.conversationId) { - activitiesToLinkToConversation[row.parent_sourceId] = row.conversationId + + // conversationId -> [activityData] + const activitiesToLinkToConversation: Record = {} + + // conversationId -> parentActivityId + const conversationToParentActivity: Record = {} + + const linkActivityAndConversation = (a: any, cId: string) => { + if (activitiesToLinkToConversation[cId]) { + activitiesToLinkToConversation[cId].push(a) } else { - if (!activitiesToLinkToConversation[row.parent_sourceId]) { - const conversationId = generateUUIDv4() + activitiesToLinkToConversation[cId] = [a] + } + } + + for (const row of rows) { + // map parent and child activities to objects + const parent: any = {} + const child: any = {} + for (const key of Object.keys(row)) { + if (key.startsWith('parent_')) { + parent[key.replace('parent_', '')] = row[key] + delete row[key] + } else if (key.startsWith('child_')) { + child[key.replace('child_', '')] = row[key] + delete row[key] + } + } + + row.parent = parent + row.child = child + + // check first if parent activity already has a conversation created + if (!row.conversationId) { + // check if we already prepared the conversation + let conversationId: string + if (!conversationsToCreate[row.parent.sourceId]) { + // prepare new conversation + // if not then create a new conversation + conversationId = generateUUIDv4() const conversationTitle = await conversationService.generateTitle( - row.tenantId, - row.segmentId, - row.parent_title || row.parent_body, - ConversationService.hasHtmlActivities(row.parent_platform), + row.parent.tenantId, + row.parent.egmentId, + row.parent.title || row.parent.body, + ConversationService.hasHtmlActivities(row.parent.platform), ) - activitiesToLinkToConversation[row.parent_sourceId] = conversationId - conversationsToCreate[row.parent_sourceId] = { + // prepare the conversation data + conversationsToCreate[row.parent.sourceId] = { id: conversationId, title: conversationTitle, slug: await conversationService.generateSlug( - row.tenantId, - row.segmentId, + row.parent.tenantId, + row.parent.segmentId, conversationTitle, ), published: true, - timestamp: row.parent_timestamp || row.child_timestamp, - tenantId: row.tenantId, - segmentId: row.segmentId, + timestamp: row.parent.timestamp || row.child.timestamp, + tenantId: row.parent.tenantId, + segmentId: row.parent.segmentId, createdById: null, updatedById: null, } + + conversationToParentActivity[conversationId] = row.parent.id + } else { + conversationId = conversationsToCreate[row.parent.sourceId].id + conversationToParentActivity[conversationId] = row.parent.id } + + // link it with the parent activity + linkActivityAndConversation(row.parent, conversationId) + // link it with the child activity + linkActivityAndConversation(row.child, conversationId) + } else if (!row.child.conversationId) { + conversationToParentActivity[row.conversationId] = row.parent.id + // link conversation of the parent activity with the child activity + linkActivityAndConversation(row.child, row.conversationId) } } // Create all conversations not yet existing. - if (Object.values(conversationsToCreate).length > 0) { - await insertConversations(Object.values(conversationsToCreate)) + let conversationsCreated = 0 + const toCreate = Object.values(conversationsToCreate) + if (toCreate.length > 0) { + for (const batch of partition(toCreate, 100)) { + try { + const results = await insertConversations(batch) + conversationsCreated += results.length + } catch (err) { + svc.log.error(err, 'Error creating conversations') + throw err + } + } } - return activitiesToLinkToConversation -} + // link activities and conversations + let activitiesAddedToConversations = 0 + const toUpdate: any[] = [] -// Update all rows with the appropriate conversationId. -export async function linkActivitiesToConversations( - relations: Record, -): Promise { - const queries: string[] = [] - for (const [sourceParentId, conversationId] of Object.entries(relations)) { - queries.push(` - UPDATE activities SET conversationId = '${conversationId}' - WHERE sourceParentId = '${sourceParentId}'; - `) + for (const conversationId of Object.keys(activitiesToLinkToConversation)) { + for (const activity of activitiesToLinkToConversation[conversationId]) { + const mapped = mapActivityRowToResult(activity, ALL_COLUMNS_TO_SELECT) + + if (activity.member_isBot !== undefined && activity.member_isBot !== null) { + mapped.isBotActivity = activity.member_isBot + } + if (activity.member_isTeamMember !== undefined && activity.member_isTeamMember !== null) { + mapped.isTeamMemberActivity = activity.member_isBot + } + + let parentId: string | undefined + if (activity.sourceParentId) { + parentId = conversationToParentActivity[conversationId] + if (!parentId) { + throw new Error('Parent activity ID not found!') + } + } + + toUpdate.push({ + ...mapped, + parentId, + conversationId, + }) + } } - await Promise.all( - queries.map(async (query) => { - return await svc.questdbSQL.query(query) - }), - ) + if (toUpdate.length > 0) { + for (const batch of partition(toUpdate, 1)) { + try { + const results = await insertActivities(batch) + activitiesAddedToConversations += results.length + } catch (err) { + svc.log.error(err, 'Error linking activities to conversations') + throw err + } + } + } + + return { + conversationsCreated, + activitiesAddedToConversations, + } } diff --git a/services/apps/activities_worker/src/workflows/createConversations.ts b/services/apps/activities_worker/src/workflows/createConversations.ts index 5eceae93b..87000fba5 100644 --- a/services/apps/activities_worker/src/workflows/createConversations.ts +++ b/services/apps/activities_worker/src/workflows/createConversations.ts @@ -7,7 +7,5 @@ const activity = proxyActivities({ }) export async function createConversations(): Promise { - const relations = await activity.createConversations() - - await activity.linkActivitiesToConversations(relations) + await activity.createConversations() } diff --git a/services/libs/data-access-layer/src/activities/ilp.ts b/services/libs/data-access-layer/src/activities/ilp.ts index 71905bc0f..bb77ed393 100644 --- a/services/libs/data-access-layer/src/activities/ilp.ts +++ b/services/libs/data-access-layer/src/activities/ilp.ts @@ -27,11 +27,7 @@ export async function insertActivities(activities: IDbActivityCreateData[]): Pro activity.createdAt ? new Date(activity.createdAt).getTime() : now, 'ms', ) - .timestampColumn( - 'updatedAt', - activity.updatedAt ? new Date(activity.updatedAt).getTime() : now, - 'ms', - ) + .timestampColumn('updatedAt', now, 'ms') .stringColumn('attributes', objectToBytes(activity.attributes)) .booleanColumn('member_isTeamMember', activity.isTeamMemberActivity || false) .booleanColumn('member_isBot', activity.isBotActivity || false) diff --git a/services/libs/data-access-layer/src/activities/sql.ts b/services/libs/data-access-layer/src/activities/sql.ts index b65203b9f..0657195a2 100644 --- a/services/libs/data-access-layer/src/activities/sql.ts +++ b/services/libs/data-access-layer/src/activities/sql.ts @@ -39,10 +39,10 @@ import { } from './types' import merge from 'lodash.merge' +import { IMemberSegmentAggregates } from '../members/types' import { IPlatforms } from '../old/apps/cache_worker/types' -import { checkUpdateRowCount } from '../utils' import { IDbOrganizationAggregateData } from '../organizations' -import { IMemberSegmentAggregates } from '../members/types' +import { checkUpdateRowCount } from '../utils' const s3Url = `https://${ process.env['CROWD_S3_MICROSERVICES_ASSETS_BUCKET'] @@ -455,6 +455,17 @@ export const DEFAULT_COLUMNS_TO_SELECT: ActivityColumn[] = [ 'url', ] +export const ALL_COLUMNS_TO_SELECT: ActivityColumn[] = DEFAULT_COLUMNS_TO_SELECT.concat([ + 'member_isBot', + 'member_isTeamMember', + 'gitIsMainBranch', + 'gitIsIndirectFork', + 'gitLines', + 'gitInsertions', + 'gitDeletions', + 'gitIsMerge', +]) + export async function queryActivities( qdbConn: DbConnOrTx, arg: IQueryActivitiesParameters, @@ -632,52 +643,52 @@ export async function queryActivities( count = countResults[0] ? countResults[0].count : 0 } - const results: any[] = [] - - for (const a of activities) { - const sentiment: IActivitySentiment | null = - a.sentimentLabel && - a.sentimentScore && - a.sentimentScoreMixed && - a.sentimentScoreNeutral && - a.sentimentScoreNegative && - a.sentimentScorePositive - ? { - label: a.sentimentLabel, - sentiment: a.sentimentScore, - mixed: a.sentimentScoreMixed, - neutral: a.sentimentScoreNeutral, - negative: a.sentimentScoreNegative, - positive: a.sentimentScorePositive, - } - : null + const results: any[] = activities.map((a) => mapActivityRowToResult(a, columns)) - const data: any = {} - for (const column of columns) { - if (column.startsWith('sentiment')) { - continue - } + return { + count: Number(count), + rows: results, + limit: arg.limit, + offset: arg.offset, + } +} - if (column === 'attributes') { - data[column] = JSON.parse(a[column]) - } else { - data[column] = a[column] - } - } +export function mapActivityRowToResult(a: any, columns: string[]): any { + const sentiment: IActivitySentiment | null = + a.sentimentLabel && + a.sentimentScore && + a.sentimentScoreMixed && + a.sentimentScoreNeutral && + a.sentimentScoreNegative && + a.sentimentScorePositive + ? { + label: a.sentimentLabel, + sentiment: a.sentimentScore, + mixed: a.sentimentScoreMixed, + neutral: a.sentimentScoreNeutral, + negative: a.sentimentScoreNegative, + positive: a.sentimentScorePositive, + } + : null - if (sentiment) { - data.sentiment = sentiment + const data: any = {} + for (const column of columns) { + if (column.startsWith('sentiment')) { + continue } - results.push(data) + if (column === 'attributes') { + data[column] = JSON.parse(a[column]) + } else { + data[column] = a[column] + } } - return { - count: Number(count), - rows: results, - limit: arg.limit, - offset: arg.offset, + if (sentiment) { + data.sentiment = sentiment } + + return data } export async function findTopActivityTypes( diff --git a/services/libs/data-access-layer/src/conversations/ilp.ts b/services/libs/data-access-layer/src/conversations/ilp.ts index a3d793e97..74882097d 100644 --- a/services/libs/data-access-layer/src/conversations/ilp.ts +++ b/services/libs/data-access-layer/src/conversations/ilp.ts @@ -30,11 +30,7 @@ export async function insertConversations( conversation.createdAt ? new Date(conversation.createdAt).getTime() : now, 'ms', ) - .timestampColumn( - 'updatedAt', - conversation.updatedAt ? new Date(conversation.updatedAt).getTime() : now, - 'ms', - ) + .timestampColumn('updatedAt', now, 'ms') if (conversation.deletedAt) { row.timestampColumn('deletedAt', new Date(conversation.updatedAt).getTime()) From d0dfe77cf22fbdb51cdd55d9d66ea5dd253f25fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 7 Oct 2024 10:58:57 +0000 Subject: [PATCH 2/7] testing --- .../src/activities/createConversations.ts | 4 ++- .../data-access-layer/src/activities/ilp.ts | 29 +++++++++++++++---- .../src/conversations/ilp.ts | 18 ++++++++++-- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/services/apps/activities_worker/src/activities/createConversations.ts b/services/apps/activities_worker/src/activities/createConversations.ts index 1902e9346..f99182e32 100644 --- a/services/apps/activities_worker/src/activities/createConversations.ts +++ b/services/apps/activities_worker/src/activities/createConversations.ts @@ -130,7 +130,7 @@ LIMIT 5000; let conversationsCreated = 0 const toCreate = Object.values(conversationsToCreate) if (toCreate.length > 0) { - for (const batch of partition(toCreate, 100)) { + for (const batch of partition(toCreate, 1)) { try { const results = await insertConversations(batch) conversationsCreated += results.length @@ -163,6 +163,8 @@ LIMIT 5000; throw new Error('Parent activity ID not found!') } } + // TODO: uros remove - test + delete mapped.sentiment toUpdate.push({ ...mapped, diff --git a/services/libs/data-access-layer/src/activities/ilp.ts b/services/libs/data-access-layer/src/activities/ilp.ts index bb77ed393..e30536362 100644 --- a/services/libs/data-access-layer/src/activities/ilp.ts +++ b/services/libs/data-access-layer/src/activities/ilp.ts @@ -4,9 +4,12 @@ import { getClientILP } from '@crowd/questdb' import { IDbActivityCreateData } from '../old/apps/data_sink_worker/repo/activity.data' import { Sender } from '@questdb/nodejs-client' +import { getServiceChildLogger } from '@crowd/logging' const ilp: Sender = getClientILP() +const log = getServiceChildLogger('data-access-layer/activities/ilp.ts') + export async function insertActivities(activities: IDbActivityCreateData[]): Promise { const ids: string[] = [] const now = Date.now() @@ -16,17 +19,22 @@ export async function insertActivities(activities: IDbActivityCreateData[]): Pro const id = activity.id || generateUUIDv4() ids.push(id) + let createdAt + if (activity.createdAt) { + const res = new Date(activity.createdAt) + log.info({ createdAt: res }, 'insertActivities.createdAt') + createdAt = res.getTime() + } else { + createdAt = now + } + const row = ilp .table('activities') .symbol('tenantId', activity.tenantId) .symbol('segmentId', activity.segmentId) .symbol('platform', activity.platform) .stringColumn('id', id) - .timestampColumn( - 'createdAt', - activity.createdAt ? new Date(activity.createdAt).getTime() : now, - 'ms', - ) + .timestampColumn('createdAt', createdAt, 'ms') .timestampColumn('updatedAt', now, 'ms') .stringColumn('attributes', objectToBytes(activity.attributes)) .booleanColumn('member_isTeamMember', activity.isTeamMemberActivity || false) @@ -170,7 +178,16 @@ export async function insertActivities(activities: IDbActivityCreateData[]): Pro row.stringColumn('updatedById', activity.updatedById) } - await row.at(activity.timestamp ? new Date(activity.timestamp).getTime() : now, 'ms') + let timestamp + if (activity.timestamp) { + const res = new Date(activity.timestamp) + log.info({ timestamp: res }, 'insertActivities.timestamp') + timestamp = res.getTime() + } else { + timestamp = now + } + + await row.at(timestamp, 'ms') } } diff --git a/services/libs/data-access-layer/src/conversations/ilp.ts b/services/libs/data-access-layer/src/conversations/ilp.ts index 74882097d..926034b1b 100644 --- a/services/libs/data-access-layer/src/conversations/ilp.ts +++ b/services/libs/data-access-layer/src/conversations/ilp.ts @@ -4,6 +4,9 @@ import { generateUUIDv4 } from '@crowd/common' import { getClientILP } from '@crowd/questdb' import { IDbConversationCreateData } from '../old/apps/data_sink_worker/repo/conversation.data' +import { getServiceChildLogger } from '@crowd/logging' + +const log = getServiceChildLogger('data-access-layer/conversations/ilp.ts') const ilp: Sender = getClientILP() export async function insertConversations( @@ -33,7 +36,9 @@ export async function insertConversations( .timestampColumn('updatedAt', now, 'ms') if (conversation.deletedAt) { - row.timestampColumn('deletedAt', new Date(conversation.updatedAt).getTime()) + const res = new Date(conversation.updatedAt) + log.info({ deletedAt: res }, 'insertConversations.deletedAt') + row.timestampColumn('deletedAt', res.getTime(), 'ms') } if (conversation.createdById) { @@ -44,7 +49,16 @@ export async function insertConversations( row.stringColumn('updatedById', conversation.updatedById) } - await row.at(conversation.timestamp ? new Date(conversation.timestamp).getTime() : now, 'ms') + let timestamp + if (conversation.timestamp) { + const res = new Date(conversation.timestamp) + log.info({ timestamp: res }, 'insertConversations.timestamp') + timestamp = res.getTime() + } else { + timestamp = now + } + + await row.at(timestamp, 'ms') } } From eace455253116af8a234754ddc81dd9b54d63bd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 8 Oct 2024 06:45:33 +0000 Subject: [PATCH 3/7] fixes --- .../activities_worker/src/activities/createConversations.ts | 2 -- services/libs/data-access-layer/src/activities/ilp.ts | 4 ++-- services/libs/data-access-layer/src/conversations/ilp.ts | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/services/apps/activities_worker/src/activities/createConversations.ts b/services/apps/activities_worker/src/activities/createConversations.ts index f99182e32..f6e80ae2f 100644 --- a/services/apps/activities_worker/src/activities/createConversations.ts +++ b/services/apps/activities_worker/src/activities/createConversations.ts @@ -163,8 +163,6 @@ LIMIT 5000; throw new Error('Parent activity ID not found!') } } - // TODO: uros remove - test - delete mapped.sentiment toUpdate.push({ ...mapped, diff --git a/services/libs/data-access-layer/src/activities/ilp.ts b/services/libs/data-access-layer/src/activities/ilp.ts index e30536362..4c4038467 100644 --- a/services/libs/data-access-layer/src/activities/ilp.ts +++ b/services/libs/data-access-layer/src/activities/ilp.ts @@ -22,7 +22,7 @@ export async function insertActivities(activities: IDbActivityCreateData[]): Pro let createdAt if (activity.createdAt) { const res = new Date(activity.createdAt) - log.info({ createdAt: res }, 'insertActivities.createdAt') + // log.info({ createdAt: res }, 'insertActivities.createdAt') createdAt = res.getTime() } else { createdAt = now @@ -181,7 +181,7 @@ export async function insertActivities(activities: IDbActivityCreateData[]): Pro let timestamp if (activity.timestamp) { const res = new Date(activity.timestamp) - log.info({ timestamp: res }, 'insertActivities.timestamp') + // log.info({ timestamp: res }, 'insertActivities.timestamp') timestamp = res.getTime() } else { timestamp = now diff --git a/services/libs/data-access-layer/src/conversations/ilp.ts b/services/libs/data-access-layer/src/conversations/ilp.ts index 926034b1b..88980f64d 100644 --- a/services/libs/data-access-layer/src/conversations/ilp.ts +++ b/services/libs/data-access-layer/src/conversations/ilp.ts @@ -52,7 +52,7 @@ export async function insertConversations( let timestamp if (conversation.timestamp) { const res = new Date(conversation.timestamp) - log.info({ timestamp: res }, 'insertConversations.timestamp') + // log.info({ timestamp: res }, 'insertConversations.timestamp') timestamp = res.getTime() } else { timestamp = now From 53a7d0b67bfa60dc6e4ead564c5cd135c5bf4f2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 8 Oct 2024 07:31:36 +0000 Subject: [PATCH 4/7] lint fixes and updated param for ILP functions --- .../database/repositories/memberRepository.ts | 22 +--------- .../src/activities/createConversations.ts | 2 +- .../src/service/activity.service.ts | 44 +++++++++---------- .../data-access-layer/src/activities/ilp.ts | 18 +++++--- .../src/conversations/ilp.ts | 11 ++++- 5 files changed, 47 insertions(+), 50 deletions(-) diff --git a/backend/src/database/repositories/memberRepository.ts b/backend/src/database/repositories/memberRepository.ts index 7d90024b8..e806d0398 100644 --- a/backend/src/database/repositories/memberRepository.ts +++ b/backend/src/database/repositories/memberRepository.ts @@ -1325,10 +1325,6 @@ class MemberRepository { options: IRepositoryOptions, segmentId?: string, ): Promise { - const transaction = SequelizeRepository.getTransaction(options) - const seq = SequelizeRepository.getSequelize(options) - const currentTenant = SequelizeRepository.getCurrentTenant(options) - if (segmentId) { // we load data for a specific segment (can be leaf, parent or grand parent id) const member = ( @@ -1353,22 +1349,7 @@ class MemberRepository { } } - const segmentIds = ( - await seq.query( - ` - select id from segments where "tenantId" = :tenantId and "parentSlug" is not null and "grandparentSlug" is not null - `, - { - replacements: { - tenantId: currentTenant.id, - }, - type: QueryTypes.SELECT, - transaction, - }, - ) - ).map((r: any) => r.id) - - const results = await getMemberAggregates(options.qdb, memberId, segmentIds) + const results = await getMemberAggregates(options.qdb, memberId) if (results.length > 0) { return results[0] @@ -1504,7 +1485,6 @@ class MemberRepository { segments: true, onlySubProjects: true, maintainers: true, - attributes: false, ...include, }, }, diff --git a/services/apps/activities_worker/src/activities/createConversations.ts b/services/apps/activities_worker/src/activities/createConversations.ts index f6e80ae2f..07ae1a65e 100644 --- a/services/apps/activities_worker/src/activities/createConversations.ts +++ b/services/apps/activities_worker/src/activities/createConversations.ts @@ -175,7 +175,7 @@ LIMIT 5000; if (toUpdate.length > 0) { for (const batch of partition(toUpdate, 1)) { try { - const results = await insertActivities(batch) + const results = await insertActivities(batch, true) activitiesAddedToConversations += results.length } catch (err) { svc.log.error(err, 'Error linking activities to conversations') diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 1a3f156ac..4ec68b49f 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1,6 +1,6 @@ import { EDITION, escapeNullByte, isObjectEmpty, singleOrDefault } from '@crowd/common' import { SearchSyncWorkerEmitter } from '@crowd/common_services' -import { IQueryActivityResult, insertActivities } from '@crowd/data-access-layer' +import { insertActivities } from '@crowd/data-access-layer' import { DbStore, arePrimitivesDbEqual } from '@crowd/data-access-layer/src/database' import { IDbActivity, @@ -304,27 +304,27 @@ export default class ActivityService extends LoggerBase { }) if (updated) { - const activityToProcess: IQueryActivityResult = { - id: id, - tenantId: tenantId, - segmentId: segmentId, - type: toUpdate.type || original.type, - isContribution: toUpdate.isContribution || original.isContribution, - score: toUpdate.score || original.score, - sourceId: toUpdate.sourceId || original.sourceId, - sourceParentId: toUpdate.sourceParentId || original.sourceParentId, - memberId: toUpdate.memberId || original.memberId, - username: toUpdate.username || original.username, - sentiment: toUpdate.sentiment || original.sentiment, - attributes: toUpdate.attributes || original.attributes, - body: escapeNullByte(toUpdate.body || original.body), - title: escapeNullByte(toUpdate.title || original.title), - channel: toUpdate.channel || original.channel, - url: toUpdate.url || original.url, - organizationId: toUpdate.organizationId || original.organizationId, - platform: toUpdate.platform || (original.platform as PlatformType), - timestamp: original.timestamp, - } + // const activityToProcess: IQueryActivityResult = { + // id: id, + // tenantId: tenantId, + // segmentId: segmentId, + // type: toUpdate.type || original.type, + // isContribution: toUpdate.isContribution || original.isContribution, + // score: toUpdate.score || original.score, + // sourceId: toUpdate.sourceId || original.sourceId, + // sourceParentId: toUpdate.sourceParentId || original.sourceParentId, + // memberId: toUpdate.memberId || original.memberId, + // username: toUpdate.username || original.username, + // sentiment: toUpdate.sentiment || original.sentiment, + // attributes: toUpdate.attributes || original.attributes, + // body: escapeNullByte(toUpdate.body || original.body), + // title: escapeNullByte(toUpdate.title || original.title), + // channel: toUpdate.channel || original.channel, + // url: toUpdate.url || original.url, + // organizationId: toUpdate.organizationId || original.organizationId, + // platform: toUpdate.platform || (original.platform as PlatformType), + // timestamp: original.timestamp, + // } if (fireSync) { await this.searchSyncWorkerEmitter.triggerMemberSync( diff --git a/services/libs/data-access-layer/src/activities/ilp.ts b/services/libs/data-access-layer/src/activities/ilp.ts index 4c4038467..ac023752e 100644 --- a/services/libs/data-access-layer/src/activities/ilp.ts +++ b/services/libs/data-access-layer/src/activities/ilp.ts @@ -4,13 +4,13 @@ import { getClientILP } from '@crowd/questdb' import { IDbActivityCreateData } from '../old/apps/data_sink_worker/repo/activity.data' import { Sender } from '@questdb/nodejs-client' -import { getServiceChildLogger } from '@crowd/logging' const ilp: Sender = getClientILP() -const log = getServiceChildLogger('data-access-layer/activities/ilp.ts') - -export async function insertActivities(activities: IDbActivityCreateData[]): Promise { +export async function insertActivities( + activities: IDbActivityCreateData[], + update = false, +): Promise { const ids: string[] = [] const now = Date.now() @@ -28,6 +28,14 @@ export async function insertActivities(activities: IDbActivityCreateData[]): Pro createdAt = now } + let updatedAt + if (update || !activity.updatedAt) { + updatedAt = now + } else { + const res = new Date(activity.updatedAt) + updatedAt = res.getTime() + } + const row = ilp .table('activities') .symbol('tenantId', activity.tenantId) @@ -35,7 +43,7 @@ export async function insertActivities(activities: IDbActivityCreateData[]): Pro .symbol('platform', activity.platform) .stringColumn('id', id) .timestampColumn('createdAt', createdAt, 'ms') - .timestampColumn('updatedAt', now, 'ms') + .timestampColumn('updatedAt', updatedAt, 'ms') .stringColumn('attributes', objectToBytes(activity.attributes)) .booleanColumn('member_isTeamMember', activity.isTeamMemberActivity || false) .booleanColumn('member_isBot', activity.isBotActivity || false) diff --git a/services/libs/data-access-layer/src/conversations/ilp.ts b/services/libs/data-access-layer/src/conversations/ilp.ts index 88980f64d..3dafeb4a8 100644 --- a/services/libs/data-access-layer/src/conversations/ilp.ts +++ b/services/libs/data-access-layer/src/conversations/ilp.ts @@ -11,6 +11,7 @@ const log = getServiceChildLogger('data-access-layer/conversations/ilp.ts') const ilp: Sender = getClientILP() export async function insertConversations( conversations: IDbConversationCreateData[], + update = false, ): Promise { const ids: string[] = [] const now = Date.now() @@ -20,6 +21,14 @@ export async function insertConversations( const id = conversation.id || generateUUIDv4() ids.push(id) + let updatedAt + if (update || !conversation.updatedAt) { + updatedAt = now + } else { + const res = new Date(conversation.updatedAt) + updatedAt = res.getTime() + } + const row = ilp .table('conversations') .symbol('tenantId', conversation.tenantId) @@ -33,7 +42,7 @@ export async function insertConversations( conversation.createdAt ? new Date(conversation.createdAt).getTime() : now, 'ms', ) - .timestampColumn('updatedAt', now, 'ms') + .timestampColumn('updatedAt', updatedAt, 'ms') if (conversation.deletedAt) { const res = new Date(conversation.updatedAt) From f136b3514935655ea2835b122a2ba9b328c7a9dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 8 Oct 2024 07:44:56 +0000 Subject: [PATCH 5/7] bigger batch --- .../activities_worker/src/activities/createConversations.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/apps/activities_worker/src/activities/createConversations.ts b/services/apps/activities_worker/src/activities/createConversations.ts index 07ae1a65e..d800c5ae1 100644 --- a/services/apps/activities_worker/src/activities/createConversations.ts +++ b/services/apps/activities_worker/src/activities/createConversations.ts @@ -130,7 +130,7 @@ LIMIT 5000; let conversationsCreated = 0 const toCreate = Object.values(conversationsToCreate) if (toCreate.length > 0) { - for (const batch of partition(toCreate, 1)) { + for (const batch of partition(toCreate, 100)) { try { const results = await insertConversations(batch) conversationsCreated += results.length @@ -173,7 +173,7 @@ LIMIT 5000; } if (toUpdate.length > 0) { - for (const batch of partition(toUpdate, 1)) { + for (const batch of partition(toUpdate, 100)) { try { const results = await insertActivities(batch, true) activitiesAddedToConversations += results.length From 6d6bc7f2c63f788ed6e39606c19763e002419757 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 8 Oct 2024 13:45:12 +0000 Subject: [PATCH 6/7] fetching relevant activities based on timestamp for performance --- .../src/activities/createConversations.ts | 112 ++++++++++++++---- 1 file changed, 88 insertions(+), 24 deletions(-) diff --git a/services/apps/activities_worker/src/activities/createConversations.ts b/services/apps/activities_worker/src/activities/createConversations.ts index d800c5ae1..93e75cfff 100644 --- a/services/apps/activities_worker/src/activities/createConversations.ts +++ b/services/apps/activities_worker/src/activities/createConversations.ts @@ -6,8 +6,8 @@ import { insertConversations, mapActivityRowToResult, } from '@crowd/data-access-layer' +import { DbConnOrTx } from '@crowd/data-access-layer/src/database' import { IDbConversationCreateData } from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/conversation.data' - import { svc } from '../main' /* eslint-disable @typescript-eslint/no-explicit-any */ @@ -20,28 +20,37 @@ export interface ICreateConversationsResult { } export async function createConversations(): Promise { - // Find all activities, and their parent activities - const query = ` - WITH - activities_to_check_for_parentId AS ( - SELECT * - FROM activities child - WHERE deletedAt IS NULL - AND sourceParentId IS NOT NULL - AND conversationId IS NULL - AND createdAt >= dateadd('M', -1, now()) - ) -SELECT - conversation.id AS conversationId, - ${ALL_COLUMNS_TO_SELECT.map((c) => `parent.${c} as parent_${c}`).join(', \n')}, - ${ALL_COLUMNS_TO_SELECT.map((c) => `child.${c} as child_${c}`).join(', \n')} -FROM activities parent -JOIN activities_to_check_for_parentId child ON parent.sourceId = child.sourceParentId -LEFT JOIN conversations conversation ON parent.conversationId = conversation.id -ORDER BY child.createdAt ASC -LIMIT 5000; - ` - const rows: any[] = await svc.questdbSQL.query(query) + // find the timestamp of the oldest activity so we can limit the query + const minTimestamp = await getMinActivityTimestamp(svc.questdbSQL) + if (!minTimestamp) { + return { + conversationsCreated: 0, + activitiesAddedToConversations: 0, + } + } + + const limit = new Date(minTimestamp) + + let current = new Date() + + const results = [] + while (limit < current) { + // fetch results for a timeframe of one week + // Find all activities, and their parent activities + const currentResults = await getRows(svc.questdbSQL, current) + if (currentResults.length > 0) { + results.push(...currentResults) + + if (results.length >= 1000) { + svc.log.info('Reached 1000 results...') + break + } + } + + current = subtractOneWeek(current) + } + + svc.log.info(`Found ${results.length} activities to process...`) // For all rows found, store the conversation created for the source parent ID. const conversationsToCreate: Record = {} @@ -60,7 +69,7 @@ LIMIT 5000; } } - for (const row of rows) { + for (const row of results) { // map parent and child activities to objects const parent: any = {} const child: any = {} @@ -189,3 +198,58 @@ LIMIT 5000; activitiesAddedToConversations, } } + +function subtractOneWeek(date: Date): Date { + const newDate = new Date(date) + newDate.setDate(date.getDate() - 7) + return newDate +} + +async function getRows(qdbConn: DbConnOrTx, current: Date): Promise { + const query = ` + WITH + activities_to_check_for_parentId AS ( + SELECT * + FROM activities child + WHERE deletedAt IS NULL + AND sourceParentId IS NOT NULL + AND conversationId IS NULL + AND timestamp > dateadd('w', -1, $(limit)) + AND timestamp <= $(limit) + ) +SELECT + conversation.id AS conversationId, + ${ALL_COLUMNS_TO_SELECT.map((c) => `parent.${c} as parent_${c}`).join(', \n')}, + ${ALL_COLUMNS_TO_SELECT.map((c) => `child.${c} as child_${c}`).join(', \n')} +FROM activities parent +JOIN activities_to_check_for_parentId child ON parent.sourceId = child.sourceParentId +LEFT JOIN conversations conversation ON parent.conversationId = conversation.id +ORDER BY child.createdAt ASC +LIMIT 1000; + ` + + const results = await qdbConn.query(query, { + limit: current, + }) + + return results +} + +async function getMinActivityTimestamp(qdbConn: DbConnOrTx): Promise { + const query = ` + select min(timestamp) as minTimestamp + from activities + where + deletedAt is null and + sourceParentId is not null and + conversationId is null; + ` + + const result = await qdbConn.oneOrNone(query) + + if (!result) { + return null + } + + return result.minTimestamp +} From 4008dff1060f9a9ad0fb3fa2cc85a8518d669d2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Thu, 10 Oct 2024 16:43:16 +0000 Subject: [PATCH 7/7] performance improvements --- .../src/activities/createConversations.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/services/apps/activities_worker/src/activities/createConversations.ts b/services/apps/activities_worker/src/activities/createConversations.ts index 93e75cfff..6dbc9c394 100644 --- a/services/apps/activities_worker/src/activities/createConversations.ts +++ b/services/apps/activities_worker/src/activities/createConversations.ts @@ -87,7 +87,7 @@ export async function createConversations(): Promise row.child = child // check first if parent activity already has a conversation created - if (!row.conversationId) { + if (!row.parent.conversationId) { // check if we already prepared the conversation let conversationId: string if (!conversationsToCreate[row.parent.sourceId]) { @@ -129,9 +129,9 @@ export async function createConversations(): Promise // link it with the child activity linkActivityAndConversation(row.child, conversationId) } else if (!row.child.conversationId) { - conversationToParentActivity[row.conversationId] = row.parent.id + conversationToParentActivity[row.parent.conversationId] = row.parent.id // link conversation of the parent activity with the child activity - linkActivityAndConversation(row.child, row.conversationId) + linkActivityAndConversation(row.child, row.parent.conversationId) } } @@ -218,13 +218,12 @@ async function getRows(qdbConn: DbConnOrTx, current: Date): Promise { AND timestamp <= $(limit) ) SELECT - conversation.id AS conversationId, ${ALL_COLUMNS_TO_SELECT.map((c) => `parent.${c} as parent_${c}`).join(', \n')}, ${ALL_COLUMNS_TO_SELECT.map((c) => `child.${c} as child_${c}`).join(', \n')} FROM activities parent JOIN activities_to_check_for_parentId child ON parent.sourceId = child.sourceParentId -LEFT JOIN conversations conversation ON parent.conversationId = conversation.id -ORDER BY child.createdAt ASC +-- WHERE parent.timestamp > dateadd('y', -1, $(limit)) +ORDER BY child.timestamp ASC LIMIT 1000; ` @@ -237,7 +236,7 @@ LIMIT 1000; async function getMinActivityTimestamp(qdbConn: DbConnOrTx): Promise { const query = ` - select min(timestamp) as minTimestamp + select first(timestamp) as minTimestamp from activities where deletedAt is null and