Skip to content

Commit

Permalink
fix(mis): mis-server启动时,不完整同步一次封锁状态 (#1374)
Browse files Browse the repository at this point in the history
mis-server启动时,不完整同步一次封锁状态,而是只在后台运行,以减少mis-server启动时间。

UI和API修改:

- `AdminService.UpdateBlockStatus`
API行为改变:当调用API时后台正在进行一次同步,那么调用此API会返回`ALREADY_EXISTS`
- UI上,当用户手动点击同步封锁状态时,如果后台正在进行一次同步,那么会报错
  • Loading branch information
ddadaal authored Jul 24, 2024
1 parent 6428d9b commit c214bd2
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 31 deletions.
5 changes: 5 additions & 0 deletions .changeset/ninety-zoos-think.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@scow/grpc-api": patch
---

当 mis-server 正在进行一次封锁状态同步时,调用 server/AdminService.UpdateBlockStatus API 会抛出`AlreadyExists`错误
6 changes: 6 additions & 0 deletions .changeset/shaggy-maps-hunt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@scow/mis-server": patch
"@scow/mis-web": patch
---

mis-server 启动时,不完整运行一次封锁状态同步
3 changes: 3 additions & 0 deletions apps/mis-server/config/mis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ db:
password: mysqlrootpassword
dbName: scow_server_${JEST_WORKER_ID}

periodicSyncUserAccountBlockStatus:
enabled: false


2 changes: 0 additions & 2 deletions apps/mis-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,5 @@ export async function createServer() {
await server.register(misConfigServiceServer);
await server.register(exportServiceServer);

await server.ext.syncBlockStatus.sync();

return server;
}
58 changes: 34 additions & 24 deletions apps/mis-server/src/plugins/syncBlockStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,43 @@ export interface SyncBlockStatusPlugin {
stop: () => void;
schedule: string;
lastSyncTime: () => Date | null;
sync: () => Promise<SyncBlockStatusResponse>;
run: () => Promise<SyncBlockStatusResponse | undefined>;
}
}

export const syncBlockStatusPlugin = plugin(async (f) => {
const synchronizeCron = misConfig.periodicSyncUserAccountBlockStatus?.cron ?? "0 4 * * *";
let synchronizeStarted = !!misConfig.periodicSyncUserAccountBlockStatus?.enabled;
let synchronizeEnabled = !!misConfig.periodicSyncUserAccountBlockStatus?.enabled;
let synchronizeIsRunning = false;

const logger = f.logger.child({ plugin: "syncBlockStatus" });
logger.info("misConfig.periodicSyncStatus?.cron: %s", misConfig.periodicSyncUserAccountBlockStatus?.cron);

const trigger = () => {
if (synchronizeIsRunning) return;
const trigger = async () => {

const sublogger = logger.child({ time: new Date() });

if (synchronizeIsRunning) {
sublogger.info("Sync is already running.");
return Promise.resolve(undefined);
}

synchronizeIsRunning = true;
return synchronizeBlockStatus(f.ext.orm.em.fork(), logger, f.ext)
.finally(() => { synchronizeIsRunning = false; });
sublogger.info("Sync starts to run.");

try {
return await synchronizeBlockStatus(f.ext.orm.em.fork(), sublogger, f.ext);
} finally {
synchronizeIsRunning = false;
}
};

const task = cron.schedule(
synchronizeCron,
() => { void trigger(); },
{
timezone: "Asia/Shanghai",
scheduled: misConfig.periodicSyncUserAccountBlockStatus?.enabled,
scheduled: synchronizeEnabled,
},
);

Expand All @@ -60,27 +71,26 @@ export const syncBlockStatusPlugin = plugin(async (f) => {
});

f.addExtension("syncBlockStatus", ({
started: () => synchronizeStarted,
started: () => synchronizeEnabled,
start: () => {
if (synchronizeStarted) {
logger.info("Sync is requested to start but already started");
} else {
task.start();
synchronizeStarted = true;
logger.info("Sync started");
}
logger.info("Sync is started");
synchronizeEnabled = true;
task.start();
},
stop: () => {
if (!synchronizeStarted) {
logger.info("Sync is requested to stop but already stopped");
} else {
task.stop();
synchronizeStarted = false;
logger.info("Sync stopped");
}
logger.info("Sync is started");
synchronizeEnabled = false;
task.stop();
},
schedule: synchronizeCron,
lastSyncTime: () => lastSyncTime,
sync: trigger,
} as SyncBlockStatusPlugin["syncBlockStatus"]));
run: trigger,
} satisfies SyncBlockStatusPlugin["syncBlockStatus"]));

if (synchronizeEnabled) {
logger.info("Started a new synchronization");
void trigger();
} else {
logger.info("Account/Account block sychronization is disabled.");
}
});
10 changes: 8 additions & 2 deletions apps/mis-server/src/services/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
*/

import { asyncClientCall } from "@ddadaal/tsgrpc-client";
import { ServiceError } from "@ddadaal/tsgrpc-common";
import { plugin } from "@ddadaal/tsgrpc-server";
import { ServiceError } from "@grpc/grpc-js";
import { Status } from "@grpc/grpc-js/build/src/constants";
import { libCheckActivatedClusters } from "@scow/lib-server/build/misCommon/clustersActivation";
import {
Expand Down Expand Up @@ -230,7 +230,13 @@ export const adminServiceServer = plugin((server) => {
// check whether there is activated cluster in SCOW
// cause syncBlockStatus in plugin will skip the check
await getActivatedClusters(em, logger);
const reply = await server.ext.syncBlockStatus.sync();
const reply = await server.ext.syncBlockStatus.run();
if (!reply) {
throw new ServiceError({
code: Status.ALREADY_EXISTS,
message: "Sync is already running. Please wait for its completion before starting a new one.",
});
}
return [reply];
},

Expand Down
3 changes: 2 additions & 1 deletion apps/mis-server/tests/admin/updateBlockStatus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ afterEach(async () => {
await server.close();
});

it("test whether the block update time exists at startup", async () => {
// Test server will not sync block status at startup
it.skip("test whether the block update time exists at startup", async () => {
const em = server.ext.orm.em.fork();
const updateTime = await em.findOne(SystemState, { key: SystemState.KEYS.UPDATE_SLURM_BLOCK_STATUS });
expect(updateTime).not.toBeNull();
Expand Down
2 changes: 2 additions & 0 deletions apps/mis-web/src/i18n/en.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,8 @@ export default {
alertInfo: "SCOW will regularly synchronize the blocking status of accounts and users to the scheduler. "
+ "You can click Sync Now to perform a manual synchronization.",
periodicSyncUserAccountBlockStatusInfo: "Periodically Synchronize Scheduler Account And User Blocked Status",
syncAlreadyStarted:
"Synchronization is already started. Please wait for its completion before starting a new run.",
turnedOn: "Turned On",
paused: "Paused",
stopSync: "Stop Synchronization",
Expand Down
2 changes: 1 addition & 1 deletion apps/mis-web/src/i18n/zh_cn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ export default {
slurmBlockStatus: {
syncUserAccountBlockingStatus: "用户账户封锁状态同步",
alertInfo: "SCOW会定期向调度器同步SCOW数据库中账户和用户的封锁状态,您可以点击立刻同步执行一次手动同步",

syncAlreadyStarted: "正在进行一次同步。请等待本次同步执行完成后,再重新同步。",
periodicSyncUserAccountBlockStatusInfo:"周期性同步调度器账户和用户的封锁状态",
turnedOn: "已开启",
paused: "已暂停",
Expand Down
3 changes: 3 additions & 0 deletions apps/mis-web/src/pages/admin/systemDebug/slurmBlockStatus.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ export const SlurmBlockStatusPage: NextPage = requireAuth((u) => u.platformRoles
onClick={() => {
setFetching(true);
api.syncBlockStatus({})
.httpError(409, () => {
message.error(t(p("syncAlreadyStarted")));
})
.then(({ blockedFailedAccounts, unblockedFailedAccounts, blockedFailedUserAccounts }) => {
if (!(blockedFailedAccounts.length || unblockedFailedAccounts.length
|| blockedFailedUserAccounts.length)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@

import { typeboxRouteSchema } from "@ddadaal/next-typed-api-routes-runtime";
import { asyncClientCall } from "@ddadaal/tsgrpc-client";
import { status } from "@grpc/grpc-js";
import { AdminServiceClient } from "@scow/protos/build/server/admin";
import { Type } from "@sinclair/typebox";
import { authenticate } from "src/auth/server";
import { PlatformRole } from "src/models/User";
import { getClient } from "src/utils/client";
import { route } from "src/utils/route";
import { handlegRPCError } from "src/utils/server";

export const SyncBlockStatusSchema = typeboxRouteSchema({
method: "PUT",
Expand All @@ -31,6 +33,7 @@ export const SyncBlockStatusSchema = typeboxRouteSchema({
})),
unblockedFailedAccounts: Type.Array(Type.String()),
}),
409: Type.Null(),
},
});
const auth = authenticate((info) => info.platformRoles.includes(PlatformRole.PLATFORM_ADMIN));
Expand All @@ -43,6 +46,10 @@ export default route(SyncBlockStatusSchema,

const client = getClient(AdminServiceClient);

return await asyncClientCall(client, "syncBlockStatus", {}).then((x) => ({ 200: x }));
return await asyncClientCall(client, "syncBlockStatus", {})
.then((x) => ({ 200: x }))
.catch(handlegRPCError({
[status.ALREADY_EXISTS]: () => ({ 409: null }),
}));

});
5 changes: 5 additions & 0 deletions protos/server/admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ service AdminService {
rpc GetFetchInfo(GetFetchInfoRequest) returns (GetFetchInfoResponse);
rpc SetFetchState(SetFetchStateRequest) returns (SetFetchStateResponse);
rpc FetchJobs(FetchJobsRequest) returns (FetchJobsResponse);

/*
* Synchronize block status of account and account user to the backing scheduler
* If the synchronization is already running when the API is called, it throws ALREADY_EXISTS
*/
rpc UpdateBlockStatus(UpdateBlockStatusRequest)
returns (UpdateBlockStatusResponse);
rpc GetAdminInfo(GetAdminInfoRequest) returns (GetAdminInfoResponse) {
Expand Down

0 comments on commit c214bd2

Please sign in to comment.