Skip to content

Commit

Permalink
extended reset all to include resetting licenses on accounts + AI ser…
Browse files Browse the repository at this point in the history
…ver; moved migration to be last
  • Loading branch information
techsmyth committed Nov 8, 2024
1 parent 2d41f91 commit e4c0df1
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/domain/space/account/account.resolver.mutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ export class AccountResolverMutations {
agentInfo,
account.authorization,
AuthorizationPrivilege.LICENSE_RESET,
`reset license definition on Space: ${agentInfo.email}`
`reset license definition on Account: ${agentInfo.userID}`
);
const accountLicenses = await this.accountLicenseService.applyLicensePolicy(
account.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { MigrationInterface, QueryRunner } from 'typeorm';
import { randomUUID } from 'crypto';
import { query } from 'express';

export class LicenseEntitlements1729511643666 implements MigrationInterface {
name = 'LicenseEntitlements1729511643666';
export class LicenseEntitlements1730877510666 implements MigrationInterface {
name = 'LicenseEntitlements1730877510666';

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
Expand Down
4 changes: 2 additions & 2 deletions src/services/auth-reset/auth-reset.payload.interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AUTH_RESET_EVENT_TYPE } from './event.type';
import { RESET_EVENT_TYPE } from './reset.event.type';

/**
* The payload type of the event
Expand All @@ -7,7 +7,7 @@ export interface AuthResetEventPayload {
/**
* the type of the event
*/
type: AUTH_RESET_EVENT_TYPE;
type: RESET_EVENT_TYPE;
/**
* the uuid of the entity
*/
Expand Down
6 changes: 0 additions & 6 deletions src/services/auth-reset/event.type.ts

This file was deleted.

77 changes: 61 additions & 16 deletions src/services/auth-reset/publisher/auth-reset.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Organization } from '@domain/community/organization';
import { AUTH_RESET_SERVICE } from '@common/constants';
import { AlkemioErrorStatus, LogContext } from '@common/enums';
import { TaskService } from '@services/task/task.service';
import { AUTH_RESET_EVENT_TYPE } from '../event.type';
import { RESET_EVENT_TYPE } from '../reset.event.type';
import { AuthResetEventPayload } from '../auth-reset.payload.interface';
import { BaseException } from '@common/exceptions/base.exception';
import { Account } from '@domain/space/account/account.entity';
Expand All @@ -28,10 +28,13 @@ export class AuthResetService {
const task = taskId ? { id: taskId } : await this.taskService.create();

try {
await this.publishAllAccountsReset(task.id);
await this.publishAllOrganizationsReset(task.id);
await this.publishAllUsersReset(task.id);
await this.publishPlatformReset();
await this.publishAuthorizationResetAllAccounts(task.id);
await this.publishAuthorizationResetAllOrganizations(task.id);
await this.publishAuthorizatinoResetAllUsers(task.id);
await this.publishAuthorizationResetPlatform();
await this.publishAuthorizationResetAiServer();
// And reset licenses
await this.publishLicenseResetAllAccounts(task.id);
} catch (error) {
throw new BaseException(
`Error while initializing authorization reset: ${error}`,
Expand All @@ -43,7 +46,7 @@ export class AuthResetService {
return task.id;
}

public async publishAllAccountsReset(taskId?: string) {
public async publishAuthorizationResetAllAccounts(taskId?: string) {
const accounts = await this.manager.find(Account, {
select: { id: true },
});
Expand All @@ -54,15 +57,42 @@ export class AuthResetService {

accounts.forEach(({ id }) =>
this.authResetQueue.emit<any, AuthResetEventPayload>(
AUTH_RESET_EVENT_TYPE.ACCOUNT,
{ id, type: AUTH_RESET_EVENT_TYPE.ACCOUNT, task: task.id }
RESET_EVENT_TYPE.AUTHORIZATION_RESET_ACCOUNT,
{
id,
type: RESET_EVENT_TYPE.AUTHORIZATION_RESET_ACCOUNT,
task: task.id,
}
)
);

return task.id;
}

public async publishAllUsersReset(taskId?: string) {
public async publishLicenseResetAllAccounts(taskId?: string) {
const accounts = await this.manager.find(Account, {
select: { id: true },
});

const task = taskId
? { id: taskId }
: await this.taskService.create(accounts.length);

accounts.forEach(({ id }) =>
this.authResetQueue.emit<any, AuthResetEventPayload>(
RESET_EVENT_TYPE.LICENSE_RESET_ACCOUNT,
{
id,
type: RESET_EVENT_TYPE.LICENSE_RESET_ACCOUNT,
task: task.id,
}
)
);

return task.id;
}

public async publishAuthorizatinoResetAllUsers(taskId?: string) {
const users = await this.manager.find(User, {
select: { id: true },
});
Expand All @@ -73,10 +103,10 @@ export class AuthResetService {

users.forEach(({ id }) =>
this.authResetQueue.emit<any, AuthResetEventPayload>(
AUTH_RESET_EVENT_TYPE.USER,
RESET_EVENT_TYPE.AUTHORIZATION_RESET_USER,
{
id,
type: AUTH_RESET_EVENT_TYPE.USER,
type: RESET_EVENT_TYPE.AUTHORIZATION_RESET_USER,
task: task.id,
}
)
Expand All @@ -85,7 +115,7 @@ export class AuthResetService {
return task.id;
}

public async publishAllOrganizationsReset(taskId?: string) {
public async publishAuthorizationResetAllOrganizations(taskId?: string) {
const organizations = await this.manager.find(Organization, {
select: { id: true },
});
Expand All @@ -96,16 +126,31 @@ export class AuthResetService {

organizations.forEach(({ id }) =>
this.authResetQueue.emit<any, AuthResetEventPayload>(
AUTH_RESET_EVENT_TYPE.ORGANIZATION,
{ id, type: AUTH_RESET_EVENT_TYPE.ORGANIZATION, task: task.id }
RESET_EVENT_TYPE.AUTHORIZATION_RESET_ORGANIZATION,
{
id,
type: RESET_EVENT_TYPE.AUTHORIZATION_RESET_ORGANIZATION,
task: task.id,
}
)
);

return task.id;
}

public async publishPlatformReset() {
public async publishAuthorizationResetPlatform() {
// does not need a task
this.authResetQueue.emit<any, any>(
RESET_EVENT_TYPE.AUTHORIZATION_RESET_PLATFORM,
{}
);
}

public async publishAuthorizationResetAiServer() {
// does not need a task
this.authResetQueue.emit<any, any>(AUTH_RESET_EVENT_TYPE.PLATFORM, {});
this.authResetQueue.emit<any, any>(
RESET_EVENT_TYPE.AUTHORIZATION_RESET_AI_SERVER,
{}
);
}
}
8 changes: 8 additions & 0 deletions src/services/auth-reset/reset.event.type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export enum RESET_EVENT_TYPE {
AUTHORIZATION_RESET_ACCOUNT = 'auth-reset-account',
AUTHORIZATION_RESET_USER = 'auth-reset-user',
AUTHORIZATION_RESET_ORGANIZATION = 'auth-reset-organization',
AUTHORIZATION_RESET_PLATFORM = 'auth-reset-platform',
AUTHORIZATION_RESET_AI_SERVER = 'auth-reset-ai-server',
LICENSE_RESET_ACCOUNT = 'license-reset-account',
}
110 changes: 105 additions & 5 deletions src/services/auth-reset/subscriber/auth-reset.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ import { OrganizationService } from '@domain/community/organization/organization
import { OrganizationAuthorizationService } from '@domain/community/organization/organization.service.authorization';
import { UserService } from '@domain/community/user/user.service';
import { UserAuthorizationService } from '@domain/community/user/user.service.authorization';
import { AUTH_RESET_EVENT_TYPE } from '../event.type';
import { RESET_EVENT_TYPE } from '../reset.event.type';
import { TaskService } from '@services/task/task.service';
import { AuthResetEventPayload } from '../auth-reset.payload.interface';
import { AccountAuthorizationService } from '@domain/space/account/account.service.authorization';
import { AccountService } from '@domain/space/account/account.service';
import { AuthorizationPolicyService } from '@domain/common/authorization-policy/authorization.policy.service';
import { AccountLicenseService } from '@domain/space/account/account.service.license';
import { LicenseService } from '@domain/common/license/license.service';
import { AiServerAuthorizationService } from '@services/ai-server/ai-server/ai.server.service.authorization';

const MAX_RETRIES = 5;
const RETRY_HEADER = 'x-retry-count';
Expand All @@ -30,16 +33,19 @@ export class AuthResetController {
private readonly logger: LoggerService,
private accountService: AccountService,
private authorizationPolicyService: AuthorizationPolicyService,
private licenseService: LicenseService,
private accountAuthorizationService: AccountAuthorizationService,
private accountLicenseService: AccountLicenseService,
private platformAuthorizationService: PlatformAuthorizationService,
private organizationAuthorizationService: OrganizationAuthorizationService,
private userAuthorizationService: UserAuthorizationService,
private organizationService: OrganizationService,
private aiServerAuthorizationService: AiServerAuthorizationService,
private userService: UserService,
private taskService: TaskService
) {}

@EventPattern(AUTH_RESET_EVENT_TYPE.ACCOUNT, Transport.RMQ)
@EventPattern(RESET_EVENT_TYPE.AUTHORIZATION_RESET_ACCOUNT, Transport.RMQ)
public async authResetAccount(
@Payload() payload: AuthResetEventPayload,
@Ctx() context: RmqContext
Expand Down Expand Up @@ -88,7 +94,54 @@ export class AuthResetController {
}
}

@EventPattern(AUTH_RESET_EVENT_TYPE.PLATFORM, Transport.RMQ)
@EventPattern(RESET_EVENT_TYPE.LICENSE_RESET_ACCOUNT, Transport.RMQ)
public async licenseResetAccount(
@Payload() payload: AuthResetEventPayload,
@Ctx() context: RmqContext
) {
this.logger.verbose?.(
`Starting reset of license for account with id ${payload.id}.`,
LogContext.AUTH_POLICY
);
const channel: Channel = context.getChannelRef();
const originalMsg = context.getMessage() as Message;

const retryCount = originalMsg.properties.headers?.[RETRY_HEADER] ?? 0;

try {
const account = await this.accountService.getAccountOrFail(payload.id);
const updatedLicenses =
await this.accountLicenseService.applyLicensePolicy(account.id);
await this.licenseService.saveAll(updatedLicenses);

const message = `Finished resetting license for account with id ${payload.id}.`;
this.logger.verbose?.(message, LogContext.AUTH_POLICY);
this.taskService.updateTaskResults(payload.task, message);
channel.ack(originalMsg);
} catch (error: any) {
if (retryCount >= MAX_RETRIES) {
const message = `Resetting license for account with id ${payload.id} failed! Max retries reached. Rejecting message.`;
this.logger.error(message, error?.stack, LogContext.AUTH);
this.taskService.updateTaskErrors(payload.task, message);

channel.reject(originalMsg, false); // Reject and don't requeue
} else {
this.logger.warn(
`Processing license reset for account with id ${
payload.id
} failed. Retrying (${retryCount + 1}/${MAX_RETRIES})`,
LogContext.AUTH
);
channel.publish('', MessagingQueue.AUTH_RESET, originalMsg.content, {
headers: { [RETRY_HEADER]: retryCount + 1 },
persistent: true, // Make the message durable
});
channel.ack(originalMsg); // Acknowledge the original message
}
}
}

@EventPattern(RESET_EVENT_TYPE.AUTHORIZATION_RESET_PLATFORM, Transport.RMQ)
public async authResetPlatform(@Ctx() context: RmqContext) {
this.logger.verbose?.(
'Starting reset of authorization for platform',
Expand Down Expand Up @@ -132,7 +185,51 @@ export class AuthResetController {
}
}

@EventPattern(AUTH_RESET_EVENT_TYPE.USER, Transport.RMQ)
@EventPattern(RESET_EVENT_TYPE.AUTHORIZATION_RESET_AI_SERVER, Transport.RMQ)
public async authResetAiServer(@Ctx() context: RmqContext) {
this.logger.verbose?.(
'Starting reset of authorization for AI Server',
LogContext.AUTH_POLICY
);
const channel: Channel = context.getChannelRef();
const originalMsg = context.getMessage() as Message;

const retryCount = originalMsg.properties.headers?.[RETRY_HEADER] ?? 0;

try {
const authorizations =
await this.aiServerAuthorizationService.applyAuthorizationPolicy();
await this.authorizationPolicyService.saveAll(authorizations);
this.logger.verbose?.(
'Finished resetting authorization for AI Server.',
LogContext.AUTH_POLICY
);
channel.ack(originalMsg);
} catch (error: any) {
if (retryCount >= MAX_RETRIES) {
this.logger.error(
'Resetting authorization for AI Server failed! Max retries reached. Rejecting message.',
error?.stack,
LogContext.AUTH
);
channel.reject(originalMsg, false); // Reject and don't requeue
} else {
this.logger.warn(
`Processing authorization reset for AI Server failed. Retrying (${
retryCount + 1
}/${MAX_RETRIES})`,
LogContext.AUTH
);
channel.publish('', MessagingQueue.AUTH_RESET, originalMsg.content, {
headers: { [RETRY_HEADER]: retryCount + 1 },
persistent: true, // Make the message durable
});
channel.ack(originalMsg); // Acknowledge the original message
}
}
}

@EventPattern(RESET_EVENT_TYPE.AUTHORIZATION_RESET_USER, Transport.RMQ)
public async authResetUser(
@Payload() payload: AuthResetEventPayload,
@Ctx() context: RmqContext
Expand Down Expand Up @@ -182,7 +279,10 @@ export class AuthResetController {
}
}

@EventPattern(AUTH_RESET_EVENT_TYPE.ORGANIZATION, Transport.RMQ)
@EventPattern(
RESET_EVENT_TYPE.AUTHORIZATION_RESET_ORGANIZATION,
Transport.RMQ
)
public async authResetOrganization(
@Payload() payload: AuthResetEventPayload,
@Ctx() context: RmqContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { OrganizationModule } from '@domain/community/organization/organization.
import { TaskModule } from '@services/task/task.module';
import { AccountModule } from '@domain/space/account/account.module';
import { AuthorizationPolicyModule } from '@domain/common/authorization-policy/authorization.policy.module';
import { AiServerModule } from '@services/ai-server/ai-server/ai.server.module';
import { LicenseModule } from '@domain/common/license/license.module';

@Global()
@Module({
Expand All @@ -18,6 +20,8 @@ import { AuthorizationPolicyModule } from '@domain/common/authorization-policy/a
PlatformModule,
OrganizationModule,
TaskModule,
AiServerModule,
LicenseModule,
],
controllers: [AuthResetController],
})
Expand Down

0 comments on commit e4c0df1

Please sign in to comment.