diff --git a/services/apps/cache_worker/src/activities.ts b/services/apps/cache_worker/src/activities.ts index b8e672adb..96a4701ca 100644 --- a/services/apps/cache_worker/src/activities.ts +++ b/services/apps/cache_worker/src/activities.ts @@ -6,9 +6,7 @@ import { } from './activities/computeAggs/organization' import { findNewActivityPlatforms, - getActiveMembersNumber, getActiveMembersTimeseries, - getActiveOrganizationsNumber, getActiveOrganizationsTimeseries, getActivePlatforms, getActivitiesBySentiment, @@ -17,9 +15,7 @@ import { getActivitiesTimeseries, getDashboardCacheLastRefreshedAt, getDefaultSegment, - getNewMembersNumber, getNewMembersTimeseries, - getNewOrganizationsNumber, getNewOrganizationsTimeseries, saveToCache, updateMemberMergeSuggestionsLastGeneratedAt, @@ -39,13 +35,9 @@ export { getProjectGroupLeafSegments, getDashboardCacheLastRefreshedAt, getDefaultSegment, - getNewMembersNumber, getNewMembersTimeseries, - getActiveMembersNumber, getActiveMembersTimeseries, - getNewOrganizationsNumber, getNewOrganizationsTimeseries, - getActiveOrganizationsNumber, getActiveOrganizationsTimeseries, getActivitiesNumber, getActivitiesTimeseries, diff --git a/services/apps/cache_worker/src/activities/dashboard-cache/refreshDashboardCache.ts b/services/apps/cache_worker/src/activities/dashboard-cache/refreshDashboardCache.ts index 0fb241c12..12f85ad6b 100644 --- a/services/apps/cache_worker/src/activities/dashboard-cache/refreshDashboardCache.ts +++ b/services/apps/cache_worker/src/activities/dashboard-cache/refreshDashboardCache.ts @@ -1,15 +1,9 @@ import { - IActiveMembersTimeseriesResult, - IActivityBySentimentMoodResult, - IActivityByTypeAndPlatformResult, - IActivityTimeseriesResult, activitiesBySentiment, activitiesByTypeAndPlatform, activitiesTimeseries, - countMembersWithActivities, - countOrganizationsWithActivities, - getNumberOfNewMembers, getTimeseriesOfActiveMembers, + getTimeseriesOfNewMembers, queryActivities, } from '@crowd/data-access-layer' import { DbStore } from '@crowd/data-access-layer/src/database' @@ -18,21 +12,21 @@ import IntegrationRepository from '@crowd/data-access-layer/src/old/apps/cache_w import SegmentRepository from '@crowd/data-access-layer/src/old/apps/cache_worker/segment.repo' import { ISegment } from '@crowd/data-access-layer/src/old/apps/cache_worker/types' import { - IActiveOrganizationsTimeseriesResult, - getNumberOfActiveOrganizations, - getNumberOfNewOrganizations, getTimeseriesOfActiveOrganizations, + getTimeseriesOfNewOrganizations, } from '@crowd/data-access-layer/src/organizations' +import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor' import { RedisCache } from '@crowd/redis' -import { DashboardTimeframe } from '@crowd/types' - -import { svc } from '../../main' import { + DashboardTimeframe, + IActivityBySentimentMoodResult, + IActivityByTypeAndPlatformResult, IDashboardData, - IGraphQueryParams, - INewMembersTimeseriesResult, - INewOrganizationsTimeseriesResult, -} from '../../types' + IQueryTimeseriesParams, + ITimeseriesDatapoint, +} from '@crowd/types' + +import { svc } from '../../main' const qdb = new DbStore(svc.log, svc.questdbSQL) @@ -66,188 +60,31 @@ export async function updateMemberMergeSuggestionsLastGeneratedAt( await segmentRepo.updateDashboardCacheLastRefreshedAt(segmentId) } -export async function getNewMembersNumber(params: IGraphQueryParams): Promise { - let result = 0 - try { - result = await getNumberOfNewMembers(svc.postgres.reader, { - tenantId: params.tenantId, - segmentIds: params.segmentIds, - after: params.startDate, - before: params.endDate, - platform: params.platform, - }) - } catch (err) { - throw new Error(err) - } - - return result -} - export async function getNewMembersTimeseries( - params: IGraphQueryParams, -): Promise { - let result: INewMembersTimeseriesResult[] - - try { - const rows = await countMembersWithActivities(svc.questdbSQL, { - tenantId: params.tenantId, - segmentIds: params.segmentIds, - timestampFrom: params.startDate, - timestampTo: params.endDate, - platform: params.platform, - groupBy: 'day', - }) - - const mapped: Record = {} - rows.forEach((row) => { - if (!mapped[row.date]) { - mapped[row.date] = { - date: row.date, - count: Number(row.count), - } - } else { - mapped[row.date]['count'] = Number(mapped[row.date]['count']) + Number(row.count) - } - }) - - result = Object.values(mapped) - } catch (err) { - throw new Error(err) - } - - return result -} - -export async function getActiveMembersNumber(params: IGraphQueryParams): Promise { - let result = 0 - try { - const rows = await countMembersWithActivities(svc.questdbSQL, { - tenantId: params.tenantId, - segmentIds: params.segmentIds, - timestampFrom: params.startDate, - timestampTo: params.endDate, - platform: params.platform, - groupBy: 'day', - }) - - rows.forEach((row) => { - result += Number(row.count) - }) - } catch (err) { - throw new Error(err) - } - - return result + params: IQueryTimeseriesParams, +): Promise { + return getTimeseriesOfNewMembers(dbStoreQx(svc.postgres.reader), params) } export async function getActiveMembersTimeseries( - params: IGraphQueryParams, -): Promise { - let result: IActiveMembersTimeseriesResult[] - try { - result = await getTimeseriesOfActiveMembers(qdb, { - tenantId: params.tenantId, - segmentIds: params.segmentIds, - after: params.startDate, - before: params.endDate, - platform: params.platform, - }) - } catch (err) { - throw new Error(err) - } - - return result -} - -export async function getNewOrganizationsNumber(params: IGraphQueryParams): Promise { - let result = 0 - try { - result = await getNumberOfNewOrganizations(svc.postgres.reader, { - tenantId: params.tenantId, - segmentIds: params.segmentIds, - after: params.startDate, - before: params.endDate, - platform: params.platform, - }) - } catch (err) { - throw new Error(err) - } - - return result + params: IQueryTimeseriesParams, +): Promise { + return getTimeseriesOfActiveMembers(dbStoreQx(qdb), params) } export async function getNewOrganizationsTimeseries( - params: IGraphQueryParams, -): Promise { - let result: INewOrganizationsTimeseriesResult[] - - try { - const rows = await countOrganizationsWithActivities(svc.questdbSQL, { - tenantId: params.tenantId, - segmentIds: params.segmentIds, - timestampFrom: params.startDate, - timestampTo: params.endDate, - platform: params.platform, - groupBy: 'day', - }) - - const mapped: Record = {} - rows.forEach((row) => { - if (!mapped[row.date]) { - mapped[row.date] = { - date: row.date, - count: Number(row.count), - } - } else { - mapped[row.date]['count'] = Number(mapped[row.date]['count']) + Number(row.count) - } - }) - - result = Object.values(mapped) - } catch (err) { - throw new Error(err) - } - - return result -} - -export async function getActiveOrganizationsNumber(params: IGraphQueryParams): Promise { - let result = 0 - try { - result = await getNumberOfActiveOrganizations(qdb, { - tenantId: params.tenantId, - segmentIds: params.segmentIds, - after: params.startDate, - before: params.endDate, - platform: params.platform, - }) - } catch (err) { - throw new Error(err) - } - - return result + params: IQueryTimeseriesParams, +): Promise { + return getTimeseriesOfNewOrganizations(dbStoreQx(svc.postgres.reader), params) } export async function getActiveOrganizationsTimeseries( - params: IGraphQueryParams, -): Promise { - let result: IActiveOrganizationsTimeseriesResult[] - try { - result = await getTimeseriesOfActiveOrganizations(qdb, { - tenantId: params.tenantId, - segmentIds: params.segmentIds, - after: params.startDate, - before: params.endDate, - platform: params.platform, - }) - } catch (err) { - throw new Error(err) - } - - return result + params: IQueryTimeseriesParams, +): Promise { + return getTimeseriesOfActiveOrganizations(dbStoreQx(qdb), params) } -export async function getActivitiesNumber(params: IGraphQueryParams): Promise { +export async function getActivitiesNumber(params: IQueryTimeseriesParams): Promise { let result = 0 try { // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -290,9 +127,9 @@ export async function getActivitiesNumber(params: IGraphQueryParams): Promise { - let result: IActivityTimeseriesResult[] + params: IQueryTimeseriesParams, +): Promise { + let result: ITimeseriesDatapoint[] try { result = await activitiesTimeseries(svc.questdbSQL, { @@ -309,7 +146,7 @@ export async function getActivitiesTimeseries( return result } export async function getActivitiesBySentiment( - params: IGraphQueryParams, + params: IQueryTimeseriesParams, ): Promise { let result: IActivityBySentimentMoodResult[] @@ -328,7 +165,7 @@ export async function getActivitiesBySentiment( return result } export async function getActivitiesByType( - params: IGraphQueryParams, + params: IQueryTimeseriesParams, ): Promise { let result: IActivityByTypeAndPlatformResult[] diff --git a/services/apps/cache_worker/src/types.ts b/services/apps/cache_worker/src/types.ts index b1c751c2c..1fb1c8fdf 100644 --- a/services/apps/cache_worker/src/types.ts +++ b/services/apps/cache_worker/src/types.ts @@ -1,71 +1,3 @@ -import { - IActivityBySentimentMoodResult, - IActivityByTypeAndPlatformResult, - IActivityTimeseriesResult, -} from '@crowd/data-access-layer' -import { IActiveOrganizationsTimeseriesResult } from '@crowd/data-access-layer/src/organizations' - -export interface IActiveMembersTimeseriesResult { - date: string - count: number -} - -export interface INewMembersTimeseriesResult { - date: string - count: number -} - -export interface INewOrganizationsTimeseriesResult { - date: string - count: number -} - -export interface IDashboardData { - activeMembers: { - total: number - previousPeriodTotal: number - timeseries: IActiveMembersTimeseriesResult[] - } - newMembers: { - total: number - previousPeriodTotal: number - timeseries: INewMembersTimeseriesResult[] - } - newOrganizations: { - total: number - previousPeriodTotal: number - timeseries: INewOrganizationsTimeseriesResult[] - } - activeOrganizations: { - total: number - previousPeriodTotal: number - timeseries: IActiveOrganizationsTimeseriesResult[] - } - activity: { - total: number - previousPeriodTotal: number - timeseries: IActivityTimeseriesResult[] - bySentimentMood: IActivityBySentimentMoodResult[] - byTypeAndPlatform: IActivityByTypeAndPlatformResult[] - } -} - -export interface ITimeframe { - startDate: Date - endDate: Date - previousPeriodStartDate: Date - previousPeriodEndDate: Date -} - -export interface IGraphQueryParams { - tenantId: string - segmentIds: string[] - startDate: Date - endDate: Date - platform?: string - groupBy?: string -} - export interface IProcessComputeOrgAggs { organizationId: string } diff --git a/services/apps/cache_worker/src/workflows/refreshDashboardCache.ts b/services/apps/cache_worker/src/workflows/refreshDashboardCache.ts index fde858eea..9375c866f 100644 --- a/services/apps/cache_worker/src/workflows/refreshDashboardCache.ts +++ b/services/apps/cache_worker/src/workflows/refreshDashboardCache.ts @@ -1,11 +1,17 @@ import { proxyActivities } from '@temporalio/workflow' import moment from 'moment' -import { IProcessRefreshDashboardCacheArgs } from '@crowd/types' +import { + IDashboardData, + IDashboardWidget, + IProcessRefreshDashboardCacheArgs, + IQueryTimeseriesParams, + ITimeframe, + ITimeseriesDatapoint, +} from '@crowd/types' import * as activities from '../activities/dashboard-cache/refreshDashboardCache' import { DashboardTimeframe } from '../enums' -import { IDashboardData, ITimeframe } from '../types' const activity = proxyActivities({ startToCloseTimeout: '3 minute', @@ -101,130 +107,98 @@ async function refreshDashboardCacheForAllTimeranges( } } +function generateDateRange(startDate: Date, endDate: Date): string[] { + const dates: string[] = [] + const currentDate = moment(startDate) + const end = moment(endDate) + + while (currentDate.isSameOrBefore(end)) { + dates.push(currentDate.format('YYYY-MM-DD')) + currentDate.add(1, 'day') + } + + return dates +} + +function convertFullTimeseriesToWidget( + rows: ITimeseriesDatapoint[], + timeframe: ITimeframe, +): IDashboardWidget { + const currentPeriodDates = generateDateRange( + timeframe.current.startDate, + timeframe.current.endDate, + ) + const previousPeriodDates = generateDateRange( + timeframe.previous.startDate, + timeframe.previous.endDate, + ) + + const currentPeriodMap = new Map( + rows.map((row) => [moment(row.date).format('YYYY-MM-DD'), Number(row.count)]), + ) + const previousPeriodMap = new Map( + rows.map((row) => [moment(row.date).format('YYYY-MM-DD'), Number(row.count)]), + ) + + const currentPeriodRows = currentPeriodDates.map((date) => ({ + date, + count: currentPeriodMap.get(date) || 0, + })) + + const previousPeriodRows = previousPeriodDates.map((date) => ({ + date, + count: previousPeriodMap.get(date) || 0, + })) + + const total = currentPeriodRows.reduce((sum, row) => sum + row.count, 0) + const previousPeriodTotal = previousPeriodRows.reduce((sum, row) => sum + row.count, 0) + + return { + total, + previousPeriodTotal, + timeseries: currentPeriodRows, + } +} + async function getDashboardCacheData( tenantId: string, segmentIds: string[], - timeframe: DashboardTimeframe, + dashboardTimeframe: DashboardTimeframe, platform?: string, ): Promise { // build dateranges - const { startDate, endDate, previousPeriodStartDate, previousPeriodEndDate } = - buildTimeframe(timeframe) - - // new members total - const newMembersTotal = await activity.getNewMembersNumber({ - tenantId, - segmentIds, - startDate, - endDate, - platform, - }) + const timeframe = buildTimeframe(dashboardTimeframe) - // new members previous period total - const newMembersPreviousPeriodTotal = await activity.getNewMembersNumber({ + const params: IQueryTimeseriesParams = { tenantId, segmentIds, - startDate: previousPeriodStartDate, - endDate: previousPeriodEndDate, + startDate: timeframe.previous.startDate, // it's intended to use previous period start date here ... + endDate: timeframe.current.endDate, // ... and current period end date here platform, - }) - - // new members timeseries - const newMembersTimeseries = await activity.getNewMembersTimeseries({ - tenantId, - segmentIds, - startDate, - endDate, - platform, - }) - - // active members total - const activeMembersTotal = await activity.getActiveMembersNumber({ - tenantId, - segmentIds, - startDate, - endDate, - platform, - }) - - // active members previous period total - const activeMembersPreviousPeriodTotal = await activity.getActiveMembersNumber({ - tenantId, - segmentIds, - startDate: previousPeriodStartDate, - endDate: previousPeriodEndDate, - platform, - }) - - // active members timeseries - const activeMembersTimeseries = await activity.getActiveMembersTimeseries({ - tenantId, - segmentIds, - startDate, - endDate, - platform, - }) - - // new organizations total - const newOrganizationsTotal = await activity.getNewOrganizationsNumber({ - tenantId, - segmentIds, - startDate, - endDate, - platform, - }) - - // new organizations previous period total - const newOrganizationsPreviousPeriodTotal = await activity.getNewOrganizationsNumber({ - tenantId, - segmentIds, - startDate: previousPeriodStartDate, - endDate: previousPeriodEndDate, - platform, - }) - - // new organizations timeseries - const newOrganizationsTimeseries = await activity.getNewOrganizationsTimeseries({ - tenantId, - segmentIds, - startDate, - endDate, - platform, - }) - - // active organizations total - const activeOrganizationsTotal = await activity.getActiveOrganizationsNumber({ - tenantId, - segmentIds, - startDate, - endDate, - platform, - }) - - // active organizations previous period total - const activeOrganizationsPreviousPeriodTotal = await activity.getActiveOrganizationsNumber({ - tenantId, - segmentIds, - startDate: previousPeriodStartDate, - endDate: previousPeriodEndDate, - platform, - }) + } - // active organizations timeseries - const activeOrganizationsTimeseries = await activity.getActiveOrganizationsTimeseries({ - tenantId, - segmentIds, - startDate, - endDate, - platform, - }) + const newMembers = convertFullTimeseriesToWidget( + await activity.getNewMembersTimeseries(params), + timeframe, + ) + const activeMembers = convertFullTimeseriesToWidget( + await activity.getActiveMembersTimeseries(params), + timeframe, + ) + const newOrganizations = convertFullTimeseriesToWidget( + await activity.getNewOrganizationsTimeseries(params), + timeframe, + ) + const activeOrganizations = convertFullTimeseriesToWidget( + await activity.getActiveOrganizationsTimeseries(params), + timeframe, + ) // activities total const activitiesTotal = await activity.getActivitiesNumber({ tenantId, segmentIds, - startDate, - endDate, + ...timeframe.current, platform, }) @@ -232,8 +206,7 @@ async function getDashboardCacheData( const activitiesPreviousPeriodTotal = await activity.getActivitiesNumber({ tenantId, segmentIds, - startDate: previousPeriodStartDate, - endDate: previousPeriodEndDate, + ...timeframe.previous, platform, }) @@ -241,8 +214,7 @@ async function getDashboardCacheData( const activitiesTimeseries = await activity.getActivitiesTimeseries({ tenantId, segmentIds, - startDate, - endDate, + ...timeframe.previous, platform, }) @@ -250,8 +222,7 @@ async function getDashboardCacheData( const activitiesBySentimentMood = await activity.getActivitiesBySentiment({ tenantId, segmentIds, - startDate, - endDate, + ...timeframe.previous, platform, }) @@ -259,32 +230,15 @@ async function getDashboardCacheData( const activitiesByTypeAndPlatform = await activity.getActivitiesByType({ tenantId, segmentIds, - startDate, - endDate, + ...timeframe.previous, platform, }) return { - newMembers: { - total: newMembersTotal, - previousPeriodTotal: newMembersPreviousPeriodTotal, - timeseries: newMembersTimeseries, - }, - activeMembers: { - total: activeMembersTotal, - previousPeriodTotal: activeMembersPreviousPeriodTotal, - timeseries: activeMembersTimeseries, - }, - newOrganizations: { - total: newOrganizationsTotal, - previousPeriodTotal: newOrganizationsPreviousPeriodTotal, - timeseries: newOrganizationsTimeseries, - }, - activeOrganizations: { - total: activeOrganizationsTotal, - previousPeriodTotal: activeOrganizationsPreviousPeriodTotal, - timeseries: activeOrganizationsTimeseries, - }, + newMembers, + activeMembers, + newOrganizations, + activeOrganizations, activity: { total: activitiesTotal, previousPeriodTotal: activitiesPreviousPeriodTotal, @@ -296,47 +250,38 @@ async function getDashboardCacheData( } function buildTimeframe(timeframe: DashboardTimeframe): ITimeframe { - if (timeframe === DashboardTimeframe.LAST_7_DAYS) { - const startDate = moment().subtract(6, 'days').startOf('day').toDate() - const endDate = moment().endOf('day').toDate() - const previousPeriodStartDate = moment().subtract(13, 'days').startOf('day').toDate() - const previousPeriodEndDate = moment().subtract(7, 'days').endOf('day').toDate() - - return { - startDate, - endDate, - previousPeriodStartDate, - previousPeriodEndDate, - } + const numDays = { + [DashboardTimeframe.LAST_7_DAYS]: 7, + [DashboardTimeframe.LAST_14_DAYS]: 14, + [DashboardTimeframe.LAST_30_DAYS]: 30, + }[timeframe] + + if (!numDays) { + throw new Error(`Unsupported timerange ${timeframe}!`) } - if (timeframe === DashboardTimeframe.LAST_14_DAYS) { - const startDate = moment().subtract(13, 'days').startOf('day').toDate() - const endDate = moment().endOf('day').toDate() - const previousPeriodStartDate = moment().subtract(27, 'days').startOf('day').toDate() - const previousPeriodEndDate = moment().subtract(14, 'days').endOf('day').toDate() + const startDate = moment() + .subtract(numDays - 1, 'days') + .startOf('day') + .toDate() + const endDate = moment().add(1, 'days').startOf('day').toDate() + const previousPeriodStartDate = moment() + .subtract(numDays * 2 - 1, 'days') + .startOf('day') + .toDate() + const previousPeriodEndDate = moment() + .subtract(numDays - 1, 'days') + .startOf('day') + .toDate() - return { - startDate, - endDate, - previousPeriodStartDate, - previousPeriodEndDate, - } - } - - if (timeframe === DashboardTimeframe.LAST_30_DAYS) { - const startDate = moment().subtract(29, 'days').startOf('day').toDate() - const endDate = moment().endOf('day').toDate() - const previousPeriodStartDate = moment().subtract(59, 'days').startOf('day').toDate() - const previousPeriodEndDate = moment().subtract(30, 'days').endOf('day').toDate() - - return { + return { + current: { startDate, endDate, - previousPeriodStartDate, - previousPeriodEndDate, - } + }, + previous: { + startDate: previousPeriodStartDate, + endDate: previousPeriodEndDate, + }, } - - throw new Error(`Unsupported timerange ${timeframe}!`) } diff --git a/services/apps/emails_worker/src/activities/weekly-analytics/buildEmailFromPostgreSQL.ts b/services/apps/emails_worker/src/activities/weekly-analytics/buildEmailFromPostgreSQL.ts index 2d1448ccf..fa631d90b 100644 --- a/services/apps/emails_worker/src/activities/weekly-analytics/buildEmailFromPostgreSQL.ts +++ b/services/apps/emails_worker/src/activities/weekly-analytics/buildEmailFromPostgreSQL.ts @@ -4,7 +4,6 @@ import { fetchSegments, fetchTenantUsers, } from '@crowd/data-access-layer/src/old/apps/emails_worker/tenants' -import { getNumberOfNewOrganizations } from '@crowd/data-access-layer/src/organizations' import { SegmentRawData } from '@crowd/types' import { svc } from '../../main' @@ -147,21 +146,9 @@ totalOrganizationsThisWeek is a Temporal activity that returns the number of organizations for a tenant as of the current week. */ export async function getTotalOrganizationsThisWeek( - input: InputAnalyticsWithTimes, + input: InputAnalyticsWithTimes, // eslint-disable-line @typescript-eslint/no-unused-vars ): Promise { - let result: number - - try { - result = await getNumberOfNewOrganizations(db, { - tenantId: input.tenantId, - after: new Date(Date.parse(input.unixEpoch)), - before: new Date(Date.parse(input.dateTimeEndThisWeek)), - }) - } catch (err) { - throw new Error(err) - } - - return result + return 0 } /* @@ -169,41 +156,19 @@ totalOrganizationsPreviousWeek is a Temporal activity that returns the number of organizations for a tenant as of the past week. */ export async function getTotalOrganizationsPreviousWeek( - input: InputAnalyticsWithTimes, + input: InputAnalyticsWithTimes, // eslint-disable-line @typescript-eslint/no-unused-vars ): Promise { - let result: number - - try { - result = await getNumberOfNewOrganizations(db, { - tenantId: input.tenantId, - after: new Date(Date.parse(input.unixEpoch)), - before: new Date(Date.parse(input.dateTimeEndPreviousWeek)), - }) - } catch (err) { - throw new Error(err) - } - - return result + return 0 } /* newOrganizationsThisWeek is a Temporal activity that returns the number of new organizations for a tenant of the current. */ -export async function getNewOrganizationsThisWeek(input: InputAnalyticsWithTimes): Promise { - let result: number - - try { - result = await getNumberOfNewOrganizations(db, { - tenantId: input.tenantId, - after: new Date(Date.parse(input.dateTimeStartThisWeek)), - before: new Date(Date.parse(input.dateTimeEndThisWeek)), - }) - } catch (err) { - throw new Error(err) - } - - return result +export async function getNewOrganizationsThisWeek( + input: InputAnalyticsWithTimes, // eslint-disable-line @typescript-eslint/no-unused-vars +): Promise { + return 0 } /* @@ -211,19 +176,7 @@ newOrganizationsPreviousWeek is a Temporal activity that returns the number of n organizations for a tenant of the past. */ export async function getNewOrganizationsPreviousWeek( - input: InputAnalyticsWithTimes, + input: InputAnalyticsWithTimes, // eslint-disable-line @typescript-eslint/no-unused-vars ): Promise { - let result: number - - try { - result = await getNumberOfNewOrganizations(db, { - tenantId: input.tenantId, - after: new Date(Date.parse(input.dateTimeStartPreviousWeek)), - before: new Date(Date.parse(input.dateTimeEndPreviousWeek)), - }) - } catch (err) { - throw new Error(err) - } - - return result + return 0 } diff --git a/services/libs/data-access-layer/src/activities/sql.ts b/services/libs/data-access-layer/src/activities/sql.ts index 4b56238f8..6967f1515 100644 --- a/services/libs/data-access-layer/src/activities/sql.ts +++ b/services/libs/data-access-layer/src/activities/sql.ts @@ -6,7 +6,10 @@ import { DbConnOrTx } from '@crowd/database' import { ActivityDisplayService } from '@crowd/integrations' import { ActivityDisplayVariant, + IActivityBySentimentMoodResult, + IActivityByTypeAndPlatformResult, IMemberIdentity, + ITimeseriesDatapoint, MemberIdentityType, PageData, PlatformType, @@ -25,10 +28,7 @@ import { ActivityType, IActiveMemberData, IActiveOrganizationData, - IActivityBySentimentMoodResult, - IActivityByTypeAndPlatformResult, IActivitySentiment, - IActivityTimeseriesResult, IMemberSegment, INewActivityPlatforms, INumberOfActivitiesPerMember, @@ -40,7 +40,6 @@ import { IQueryDistinctParameters, IQueryGroupedActivitiesParameters, IQueryNumberOfActiveMembersParameters, - IQueryNumberOfActiveOrganizationsParameters, IQueryTopActivitiesParameters, } from './types' @@ -1075,49 +1074,10 @@ export async function countMembersWithActivities( }) } -export async function countOrganizationsWithActivities( - qdbConn: DbConnOrTx, - arg: IQueryNumberOfActiveOrganizationsParameters, -): Promise<{ count: number; segmentId: string; date?: string }[]> { - let query = ` - SELECT COUNT_DISTINCT("organizationId") AS count, "timestamp" AS date - FROM activities - WHERE "deletedAt" IS NULL - AND "tenantId" = $(tenantId) - AND "organizationId" IS NOT NULL - ` - - if (arg.segmentIds) { - query += ' AND "segmentId" IN ($(segmentIds:csv))' - } - - if (arg.platform) { - query += ' AND platform = $(platform)' - } - - if (arg.timestampFrom && arg.timestampTo) { - query += ' AND timestamp BETWEEN $(after) AND $(before)' - } - - if (arg.groupBy === 'day') { - query += ' SAMPLE BY 1d FILL(0) ALIGN TO CALENDAR' - } - - query += ' ORDER BY "date" ASC;' - - return await qdbConn.any(query, { - tenantId: arg.tenantId, - segmentIds: arg.segmentIds, - after: arg.timestampFrom, - before: arg.timestampTo, - platform: arg.platform, - }) -} - export async function activitiesTimeseries( qdbConn: DbConnOrTx, arg: IQueryGroupedActivitiesParameters, -): Promise { +): Promise { let query = ` SELECT COUNT_DISTINCT(id) AS count, "timestamp" AS date FROM activities @@ -1142,7 +1102,7 @@ export async function activitiesTimeseries( ORDER BY "date" ASC; ` - const rows: IActivityTimeseriesResult[] = await qdbConn.query(query, { + const rows: ITimeseriesDatapoint[] = await qdbConn.query(query, { tenantId: arg.tenantId, segmentIds: arg.segmentIds, platform: arg.platform, diff --git a/services/libs/data-access-layer/src/activities/types.ts b/services/libs/data-access-layer/src/activities/types.ts index 9859dc355..db9eeeae2 100644 --- a/services/libs/data-access-layer/src/activities/types.ts +++ b/services/libs/data-access-layer/src/activities/types.ts @@ -161,31 +161,6 @@ export interface IQueryActiveOrganizationsParameters { offset: number } -export interface IQueryNumberOfActiveOrganizationsParameters { - tenantId: string - segmentIds?: string[] - timestampFrom?: Date - timestampTo?: Date - platform?: string - groupBy?: undefined | 'day' -} - -export interface IActivityTimeseriesResult { - date: string - count: number -} - -export interface IActivityBySentimentMoodResult { - sentimentLabel: string - count: number -} - -export interface IActivityByTypeAndPlatformResult { - type: string - platform: string - count: number -} - export interface INewActivityPlatforms { segmentIds: string[] after: Date diff --git a/services/libs/data-access-layer/src/members/dashboard.ts b/services/libs/data-access-layer/src/members/dashboard.ts index f53ed0552..831cd25b9 100644 --- a/services/libs/data-access-layer/src/members/dashboard.ts +++ b/services/libs/data-access-layer/src/members/dashboard.ts @@ -1,8 +1,10 @@ import { getEnv } from '@crowd/common' import { DbStore } from '@crowd/database' -import { IMember } from '@crowd/types' +import { IMember, IQueryTimeseriesParams, ITimeseriesDatapoint } from '@crowd/types' -import { IQueryNumberOfNewMembers, IQueryTimeseriesOfNewMembers } from './types' +import { QueryExecutor } from '../queryExecutor' + +import { IQueryNumberOfNewMembers } from './types' const s3Url = `https://${ process.env['CROWD_S3_MICROSERVICES_ASSETS_BUCKET'] @@ -79,43 +81,51 @@ export interface IActiveMembersTimeseriesResult { count: number } +export async function getTimeseriesOfNewMembers( + qx: QueryExecutor, + params: IQueryTimeseriesParams, +): Promise { + const query = ` + SELECT + COUNT(DISTINCT m.id) AS count, + TO_CHAR(m."joinedAt", 'YYYY-MM-DD') AS "date" + FROM members AS m + ${params.segmentIds ? 'INNER JOIN "memberSegmentsAgg" msa on msa."memberId" = m.id' : ''} -- if segments + WHERE m."tenantId" = $(tenantId) + AND m."joinedAt" >= $(startDate) + AND m."joinedAt" < $(endDate) + AND (COALESCE((((m.attributes -> 'isBot'::text) -> 'default'::text))::boolean, false)) IS FALSE + AND (COALESCE((((m.attributes -> 'isTeamMember'::text) -> 'default'::text))::boolean, false)) IS FALSE + AND (COALESCE((((m.attributes -> 'isOrganization'::text) -> 'default'::text))::boolean, false)) IS FALSE + AND m."deletedAt" IS NULL + ${params.segmentIds ? 'AND msa."segmentId" IN ($(segmentIds:csv))' : ''} + ${params.platform ? 'AND $(platform) = ANY(msa."activeOn")' : ''} + GROUP BY 2 + ORDER BY 2 + ` + + return qx.select(query, params) +} + export async function getTimeseriesOfActiveMembers( - db: DbStore, - arg: IQueryTimeseriesOfNewMembers, -): Promise { - let query = ` - SELECT COUNT_DISTINCT("memberId") AS count, timestamp + qx: QueryExecutor, + params: IQueryTimeseriesParams, +): Promise { + const query = ` + SELECT + COUNT_DISTINCT("memberId") AS count, + DATE_TRUNC('day', timestamp) AS "date" FROM activities WHERE tenantId = $(tenantId) - AND "memberId" IS NOT NULL - AND timestamp BETWEEN $(after) AND $(before) + AND "deletedAt" IS NULL + AND "memberId" IS NOT NULL + ${params.segmentIds ? 'AND "segmentId" IN ($(segmentIds:csv))' : ''} + AND timestamp >= $(startDate) + AND timestamp < $(endDate) + ${params.platform ? 'AND "platform" = $(platform)' : ''} + GROUP BY 2 + ORDER BY 2 ` - if (arg.segmentIds) { - query += ` AND "segmentId" IN ($(segmentIds:csv))` - } - - if (arg.platform) { - query += ` AND "platform" = $(platform)` - } - - query += ' SAMPLE BY 1d FILL(0) ALIGN TO CALENDAR ORDER BY timestamp ASC;' - - const rows = await db.connection().query(query, { - tenantId: arg.tenantId, - segmentIds: arg.segmentIds, - after: arg.after, - before: arg.before, - platform: arg.platform, - }) - - const results: IActiveMembersTimeseriesResult[] = [] - rows.forEach((row) => { - results.push({ - date: row.timestamp, - count: Number(row.count), - }) - }) - - return results + return qx.select(query, params) } diff --git a/services/libs/data-access-layer/src/organizations/organizations.ts b/services/libs/data-access-layer/src/organizations/organizations.ts index f882a8428..632adaa0d 100644 --- a/services/libs/data-access-layer/src/organizations/organizations.ts +++ b/services/libs/data-access-layer/src/organizations/organizations.ts @@ -1,19 +1,20 @@ import { generateUUIDv1 } from '@crowd/common' -import { DbStore } from '@crowd/database' -import { IMemberOrganization, IOrganizationIdSource, SyncStatus } from '@crowd/types' +import { + IMemberOrganization, + IOrganizationIdSource, + IQueryTimeseriesParams, + ITimeseriesDatapoint, + SyncStatus, +} from '@crowd/types' import { QueryExecutor } from '../queryExecutor' import { prepareSelectColumns } from '../utils' import { - IActiveOrganizationsTimeseriesResult, IDbOrgIdentity, IDbOrganization, IDbOrganizationInput, IEnrichableOrganizationData, - IQueryNumberOfActiveOrganizations, - IQueryNumberOfNewOrganizations, - IQueryTimeseriesOfNewOrganizations, } from './types' const ORG_SELECT_COLUMNS = [ @@ -395,107 +396,47 @@ export async function updateOrganization( }) } -export async function getNumberOfNewOrganizations( - db: DbStore, - arg: IQueryNumberOfNewOrganizations, -): Promise { - let query = ` - SELECT COUNT(distinct osa."organizationId") as count - FROM "organizationSegmentsAgg" osa - JOIN organizations o on osa."organizationId" = o.id - WHERE osa."tenantId" = $(tenantId) - AND o."createdAt" BETWEEN $(after) AND $(before) +export async function getTimeseriesOfNewOrganizations( + qx: QueryExecutor, + params: IQueryTimeseriesParams, +): Promise { + const query = ` + SELECT + COUNT(DISTINCT o.id) AS count, + TO_CHAR(osa."joinedAt", 'YYYY-MM-DD') AS "date" + FROM organizations AS o + JOIN "organizationSegmentsAgg" osa ON osa."organizationId" = o.id + WHERE o."tenantId" = $(tenantId) + AND osa."joinedAt" >= $(startDate) + AND osa."joinedAt" < $(endDate) + ${params.segmentIds ? 'AND osa."segmentId" IN ($(segmentIds:csv))' : 'AND osa."segmentId" IS NULL'} + ${params.platform ? 'AND $(platform) = ANY(osa."activeOn")' : ''} + GROUP BY 2 + ORDER BY 2 ` - if (arg.segmentIds) { - query += ` AND osa."segmentId" IN ($(segmentIds:csv))` - } - - if (arg.platform) { - query += ` AND $(platform) = ANY(osa."activeOn")` - } - - const rows: { count: number }[] = await db.connection().query(query, { - tenantId: arg.tenantId, - after: arg.after, - before: arg.before, - segmentIds: arg.segmentIds, - platform: arg.platform, - }) - return Number(rows[0].count) || 0 -} - -export async function getNumberOfActiveOrganizations( - db: DbStore, - arg: IQueryNumberOfActiveOrganizations, -): Promise { - let query = ` - SELECT COUNT_DISTINCT("organizationId") as count - FROM activities - WHERE "tenantId" = $(tenantId) - AND "organizationId" IS NOT NULL - AND timestamp BETWEEN $(after) AND $(before) - AND "deletedAt" IS NULL` - - if (arg.platform) { - query += ` AND "platform" = $(platform)` - } - - if (arg.segmentIds) { - query += ` AND "segmentId" IN ($(segmentIds:csv))` - } - - query += ';' - - const rows: { count: number }[] = await db.connection().query(query, { - tenantId: arg.tenantId, - segmentIds: arg.segmentIds, - after: arg.after, - before: arg.before, - platform: arg.platform, - }) - - return Number(rows[0].count) || 0 + return qx.select(query, params) } export async function getTimeseriesOfActiveOrganizations( - db: DbStore, - arg: IQueryTimeseriesOfNewOrganizations, -): Promise { - let query = ` - SELECT COUNT_DISTINCT("organizationId") AS count, timestamp + qx: QueryExecutor, + params: IQueryTimeseriesParams, +): Promise { + const query = ` + SELECT + COUNT_DISTINCT("organizationId") AS count, + DATE_TRUNC('day', timestamp) FROM activities WHERE tenantId = $(tenantId) - AND "organizationId" IS NOT NULL - AND timestamp BETWEEN $(after) AND $(before) - AND deletedAt IS NULL + AND "deletedAt" IS NULL + AND "organizationId" IS NOT NULL + ${params.segmentIds ? 'AND "segmentId" IN ($(segmentIds:csv))' : ''} + AND timestamp >= $(startDate) + AND timestamp < $(endDate) + ${params.platform ? 'AND "platform" = $(platform)' : ''} + GROUP BY 2 + ORDER BY 2 ` - if (arg.segmentIds) { - query += ` AND "segmentId" IN ($(segmentIds:csv))` - } - - if (arg.platform) { - query += ` AND "platform" = $(platform)` - } - - query += ' SAMPLE BY 1d ALIGN TO CALENDAR ORDER BY timestamp ASC;' - - const rows = await db.connection().query(query, { - tenantId: arg.tenantId, - segmentIds: arg.segmentIds, - after: arg.after, - before: arg.before, - platform: arg.platform, - }) - - const results: IActiveOrganizationsTimeseriesResult[] = [] - rows.forEach((row) => { - results.push({ - date: row.timestamp, - count: Number(row.count), - }) - }) - - return results + return qx.select(query, params) } diff --git a/services/libs/data-access-layer/src/organizations/types.ts b/services/libs/data-access-layer/src/organizations/types.ts index 30bcb156b..87226464c 100644 --- a/services/libs/data-access-layer/src/organizations/types.ts +++ b/services/libs/data-access-layer/src/organizations/types.ts @@ -137,8 +137,3 @@ export interface IQueryNumberOfActiveOrganizations { before: Date platform?: string } - -export interface IActiveOrganizationsTimeseriesResult { - date: string - count: number -} diff --git a/services/libs/questdb/src/sql.ts b/services/libs/questdb/src/sql.ts index 38f84cbaf..52f98f88c 100644 --- a/services/libs/questdb/src/sql.ts +++ b/services/libs/questdb/src/sql.ts @@ -22,6 +22,7 @@ export const getClientSQL = async (): Promise> => { async error(err: any, e: pgpromise.IEventContext): Promise { if (e.cn) { telemetry.increment('questdb.connection_error', 1) + telemetry.flush() log.fatal(err, { cn: e.cn }, 'QuestDB connection error. Stopping process') // logs don't have flush: await new Promise((resolve) => setTimeout(resolve, 100)) diff --git a/services/libs/types/src/dashboard/index.ts b/services/libs/types/src/dashboard/index.ts new file mode 100644 index 000000000..308a56e27 --- /dev/null +++ b/services/libs/types/src/dashboard/index.ts @@ -0,0 +1,50 @@ +export interface IActivityBySentimentMoodResult { + sentimentLabel: string + count: number +} + +export interface IActivityByTypeAndPlatformResult { + type: string + platform: string + count: number +} + +export interface IQueryTimeseriesParams { + tenantId: string + segmentIds?: string[] + startDate: Date + endDate: Date + platform?: string +} + +export interface ITimeseriesDatapoint { + date: string + count: number +} + +export interface IDashboardWidget { + total: number + previousPeriodTotal: number + timeseries: ITimeseriesDatapoint[] +} + +interface ITimePeriod { + startDate: Date + endDate: Date +} + +export interface ITimeframe { + current: ITimePeriod + previous: ITimePeriod +} + +export interface IDashboardData { + activeMembers: IDashboardWidget + newMembers: IDashboardWidget + newOrganizations: IDashboardWidget + activeOrganizations: IDashboardWidget + activity: IDashboardWidget & { + bySentimentMood: IActivityBySentimentMoodResult[] + byTypeAndPlatform: IActivityByTypeAndPlatformResult[] + } +} diff --git a/services/libs/types/src/index.ts b/services/libs/types/src/index.ts index cbc4e1f45..55c81e4dc 100644 --- a/services/libs/types/src/index.ts +++ b/services/libs/types/src/index.ts @@ -7,6 +7,8 @@ export * from './queue/data_sink_worker' export * from './queue/search_sync_worker' export * from './queue/integration_sync_worker' +export * from './dashboard' + export * from './integrations' export * from './members'