mission map management with defered delete

This commit is contained in:
Julian Krauser 2025-03-03 11:00:55 +01:00
parent 584e017cd7
commit 50c48746df
4 changed files with 67 additions and 5 deletions

View file

@ -24,6 +24,7 @@ export default abstract class MissionDocHelper {
awareness.setLocalState(null); awareness.setLocalState(null);
MissionMap.write(missionId, { missionId, doc, awareness, timestamp: 0 }); MissionMap.write(missionId, { missionId, doc, awareness, timestamp: 0 });
console.log(`created local doc ${missionId}`);
} }
public static async saveDoc(missionId: string) { public static async saveDoc(missionId: string) {

View file

@ -41,8 +41,10 @@ httpServer.listen(process.env.NODE_ENV ? SERVER_PORT : 5000, () => {
import schedule from "node-schedule"; import schedule from "node-schedule";
import RefreshCommandHandler from "./command/refreshCommandHandler"; import RefreshCommandHandler from "./command/refreshCommandHandler";
import { MissionMap } from "./storage/missionMap";
const job = schedule.scheduleJob("0 0 * * *", async () => { const job = schedule.scheduleJob("0 0 * * *", async () => {
console.log(`${new Date().toISOString()}: running Cron`); console.log(`${new Date().toISOString()}: running Cron`);
await RefreshCommandHandler.deleteExpired(); await RefreshCommandHandler.deleteExpired();
await BackupHelper.createBackupOnInterval(); await BackupHelper.createBackupOnInterval();
MissionMap.deleteExpired();
}); });

View file

@ -1,5 +1,6 @@
import * as Y from "yjs"; import * as Y from "yjs";
import * as AwarenessProtocol from "y-protocols/dist/awareness.cjs"; import * as AwarenessProtocol from "y-protocols/dist/awareness.cjs";
import ms from "ms";
export interface missionStoreModel { export interface missionStoreModel {
missionId: string; missionId: string;
@ -13,38 +14,93 @@ export interface missionStoreModel {
*/ */
export abstract class MissionMap { export abstract class MissionMap {
private static store = new Map<string, missionStoreModel>(); private static store = new Map<string, missionStoreModel>();
private static timer = new Map<string, NodeJS.Timeout>();
private static usage = new Map<string, number>();
public static write(identifier: string, data: missionStoreModel, overwrite: boolean = false): void { public static write(identifier: string, data: missionStoreModel, overwrite: boolean = false): void {
if (!this.exists(identifier) || overwrite) this.store.set(identifier, data); this.monitorAccess(identifier);
if (!this.exists(identifier) || overwrite) {
this.store.set(identifier, data);
}
} }
public static updateState(identifier: string, data: Uint8Array): void { public static updateState(identifier: string, data: Uint8Array): void {
this.monitorAccess(identifier);
let mission = this.read(identifier); let mission = this.read(identifier);
Y.applyUpdate(mission.doc, data); Y.applyUpdate(mission.doc, data);
this.write(identifier, mission, true); this.write(identifier, mission, true);
} }
public static updateAwareness(identifier: string, data: Uint8Array, socketId: string): void { public static updateAwareness(identifier: string, data: Uint8Array, socketId: string): void {
this.monitorAccess(identifier);
let mission = this.read(identifier); let mission = this.read(identifier);
AwarenessProtocol.applyAwarenessUpdate(mission.awareness, data, socketId); AwarenessProtocol.applyAwarenessUpdate(mission.awareness, data, socketId);
this.write(identifier, mission, true); this.write(identifier, mission, true);
} }
public static updateTimestamp(identifier: string, data: number): void { public static updateTimestamp(identifier: string, data: number): void {
this.monitorAccess(identifier);
let mission = this.read(identifier); let mission = this.read(identifier);
mission.timestamp = data; mission.timestamp = data;
this.write(identifier, mission, true); this.write(identifier, mission, true);
} }
public static read(identifier: string): missionStoreModel { public static read(identifier: string): missionStoreModel {
this.monitorAccess(identifier);
return this.store.get(identifier); return this.store.get(identifier);
} }
public static exists(identifier: string): boolean { public static exists(identifier: string): boolean {
this.monitorAccess(identifier);
return this.store.has(identifier); return this.store.has(identifier);
} }
public static delete(identifier: string): void { public static delete(identifier: string): void {
this.abortDelete(identifier);
const copyIdentifier = identifier;
const timer = setTimeout(() => {
this.finalDelete(copyIdentifier);
}, ms("10s"));
this.timer.set(identifier, timer);
}
public static deleteExpired(): void {
this.usage.forEach((val, key) => {
if (Date.now() - val > ms("10s")) {
this.finalDelete(key);
}
});
}
private static monitorAccess(identifier: string): void {
this.usage.set(identifier, Date.now());
this.abortDelete(identifier);
}
private static abortDelete(identifier: string): void {
const timer = this.timer.get(identifier);
if (timer) {
clearTimeout(timer);
this.timer.delete(identifier);
}
}
private static finalDelete(identifier: string): void {
const time = this.usage.get(identifier);
if (Date.now() - time < ms("10s")) return;
this.store.delete(identifier); this.store.delete(identifier);
this.timer.delete(identifier);
this.usage.delete(identifier);
console.log(`cleared local doc ${identifier}`);
} }
} }

View file

@ -18,10 +18,12 @@ export default abstract class SocketServer {
credentials: true, credentials: true,
}, },
}); });
instrument(this.io, { if (process.env.NODE_ENV) {
auth: false, instrument(this.io, {
mode: "development", auth: false,
}); mode: "development",
});
}
this.io.engine.use(helmet()); this.io.engine.use(helmet());
@ -29,6 +31,7 @@ export default abstract class SocketServer {
.of("/") .of("/")
.use(authenticateSocket) .use(authenticateSocket)
.on("connection", (socket) => { .on("connection", (socket) => {
console.log("socket connection: ", socket.id);
socket.use((packet, next) => authenticateSocket(socket, next)); socket.use((packet, next) => authenticateSocket(socket, next));
socket.use((packet, next) => perRequestCheck(socket, packet, next)); socket.use((packet, next) => perRequestCheck(socket, packet, next));