8000 Add background jobs to merge document & node updates by hakanshehu · Pull Request #90 · colanode/colanode · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add background jobs to merge document & node updates #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { sql, Migration } from 'kysely';

export const removeNodeUpdateRevisionTrigger: Migration = {
up: async (db) => {
await sql`
DROP TRIGGER IF EXISTS trg_update_node_update_revision ON node_updates;
DROP FUNCTION IF EXISTS update_node_update_revision();
`.execute(db);
},
down: async (db) => {
await sql`
CREATE OR REPLACE FUNCTION update_node_update_revision() RETURNS TRIGGER AS $$
BEGIN
NEW.revision = nextval('node_updates_revision_sequence');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_update_node_update_revision
BEFORE UPDATE ON node_updates
FOR EACH ROW
EXECUTE FUNCTION update_node_update_revision();
`.execute(db);
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { sql, Migration } from 'kysely';

export const removeDocumentUpdateRevisionTrigger: Migration = {
up: async (db) => {
await sql`
DROP TRIGGER IF EXISTS trg_update_document_update_revision ON document_updates;
DROP FUNCTION IF 10000 EXISTS update_document_update_revision();
`.execute(db);
},
down: async (db) => {
await sql`
CREATE OR REPLACE FUNCTION update_document_update_revision() RETURNS TRIGGER AS $$
BEGIN
NEW.revision = nextval('document_updates_revision_sequence');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_update_document_update_revision
BEFORE UPDATE ON document_updates
FOR EACH ROW
EXECUTE FUNCTION update_document_update_revision();
`.execute(db);
},
};
5 changes: 5 additions & 0 deletions apps/server/src/data/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import { createWorkspaceUserCounterTriggers } from './00023-create-workspace-use
import { createWorkspaceNodeCounterTriggers } from './00024-create-workspace-node-counter-triggers';
import { createWorkspaceUploadCounterTriggers } from './00025-create-workspace-upload-counter-triggers';
import { createUserUploadCounterTriggers } from './00026-create-user-upload-counter-triggers';
import { removeNodeUpdateRevisionTrigger } from './00027-remove-node-update-revision-trigger';
import { removeDocumentUpdateRevisionTrigger } from './00028-remove-document-update-revision-trigger';

export const databaseMigrations: Record<string, Migration> = {
'00001_create_accounts_table': createAccountsTable,
Expand Down Expand Up @@ -57,4 +59,7 @@ export const databaseMigrations: Record<string, Migration> = {
'00025_create_workspace_upload_counter_triggers':
createWorkspaceUploadCounterTriggers,
'00026_create_user_upload_counter_triggers': createUserUploadCounterTriggers,
'00027_remove_node_update_revision_trigger': removeNodeUpdateRevisionTrigger,
'00028_remove_document_update_revision_trigger':
removeDocumentUpdateRevisionTrigger,
};
224 changes: 224 additions & 0 deletions apps/server/src/jobs/document-updates-merge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import { createDebugger, UpdateMergeMetadata } from '@colanode/core';
import { mergeUpdates } from '@colanode/crdt';
import { database } from '@colanode/server/data/database';
import { SelectDocumentUpdate } from '@colanode/server/data/schema';
import { JobHandler } from '@colanode/server/jobs';
import { config } from '@colanode/server/lib/config';
import { fetchCounter, setCounter } from '@colanode/server/lib/counters';

const debug = createDebugger('server:job:document-updates-merge');

export type DocumentUpdatesMergeInput = {
type: 'document.updates.merge';
};

declare module '@colanode/server/jobs' {
interface JobMap {
'document.updates.merge': {
input: DocumentUpdatesMergeInput;
};
}
}

export const documentUpdatesMergeHandler: JobHandler<
DocumentUpdatesMergeInput
> = async () => {
if (!config.jobs.documentUpdatesMerge.enabled) {
return;
}

debug('Starting document updates merge job');

const cursor = await fetchCounter(database, 'document.updates.merge.cursor');

const cutoffTime = new Date();
cutoffTime.setTime(
cutoffTime.getTime() - config.jobs.documentUpdatesMerge.cutoffWindow * 1000
);

let mergedGroups = 0;
let deletedUpdates = 0;
let hasMore = true;
const currentCursor = cursor;

while (hasMore) {
const updates = await database
.selectFrom('document_updates')
.selectAll()
.where('revision', '>', currentCursor.toString())
.where('created_at', '<', cutoffTime)
.orderBy('revision', 'asc')
.limit(config.jobs.documentUpdatesMerge.batchSize)
.execute();

if (updates.length === 0) {
hasMore = false;
continue;
}

debug(`Processing batch of ${updates.length} updates`);

const documentIds = [
...new Set(updates.map((update) => update.document_id)),
];

const maxRevision = updates.reduce((max, update) => {
const rev = BigInt(update.revision);
return rev > max ? rev : max;
}, BigInt(0));

for (const documentId of documentIds) {
const result = await processDocumentUpdates(
documentId,
maxRevision,
cutoffTime,
config.jobs.documentUpdatesMerge.mergeWindow
);
mergedGroups += result.mergedGroups;
deletedUpdates += result.deletedUpdates;
}

await setCounter(database, 'document.updates.merge.cursor', maxRevision);

if (updates.length < config.jobs.documentUpdatesMerge.batchSize) {
hasMore = false;
}
}

debug(
`Document updates merge job completed. Merged ${mergedGroups} groups, deleted ${deletedUpdates} redundant updates`
);
};

const processDocumentUpdates = async (
documentId: string,
maxRevision: bigint,
cutoffTime: Date,
mergeWindow: number
): Promise<{ mergedGroups: number; deletedUpdates: number }> => {
const allDocumentUpdates = await database
.selectFrom('document_updates')
.selectAll()
.where('document_id', '=', documentId)
.where('revision', '<=', maxRevision.toString())
.where('created_at', '<', cutoffTime)
.orderBy('created_at', 'asc')
.execute();

if (allDocumentUpdates.length < 2) {
return { mergedGroups: 0, deletedUpdates: 0 };
}

const timeGroups = groupUpdatesByMergeWindow(allDocumentUpdates, mergeWindow);

let mergedGroups = 0;
let deletedUpdates = 0;

for (const timeGroup of timeGroups) {
if (timeGroup.length >= 2) {
const success = await mergeUpdatesGroup(documentId, timeGroup);
if (success) {
mergedGroups++;
deletedUpdates += timeGroup.length - 1;
}
}
}

return { mergedGroups, deletedUpdates };
};

const groupUpdatesByMergeWindow = (
updates: SelectDocumentUpdate[],
mergeWindow: number
): SelectDocumentUpdate[][] => {
if (updates.length === 0) return [];

const sortedUpdates = [...updates].sort(
(a, b) => a.created_at.getTime() - b.created_at.getTime()
);

const timeGroups: SelectDocumentUpdate[][] = [];
let currentGroup: SelectDocumentUpdate[] = [sortedUpdates[0]!];

for (let i = 1; i < sortedUpdates.length; i++) {
const currentUpdate = sortedUpdates[i]!;
const lastUpdateInGroup = currentGroup[currentGroup.length - 1]!;

const timeDiff =
currentUpdate.created_at.getTime() -
lastUpdateInGroup.created_at.getTime();
const timeDiffSeconds = timeDiff / 1000;

if (timeDiffSeconds <= mergeWindow) {
currentGroup.push(currentUpdate);
} else {
timeGroups.push(currentGroup);
currentGroup = [currentUpdate];
}
}

timeGroups.push(currentGroup);

return timeGroups;
};

const mergeUpdatesGroup = async (
documentId: string,
updates: SelectDocumentUpdate[]
): Promise<boolean> => {
if (updates.length < 2) {
return false;
}

try {
const sortedUpdates = [...updates].sort((a, b) => {
const revA = BigInt(a.revision);
const revB = BigInt(b.revision);
if (revA > revB) {
return 1;
} else if (revA < revB) {
return -1;
}
return 0;
});

const updateData = sortedUpdates.map((update) => update.data);
const mergedState = mergeUpdates(updateData);

const lastUpdate = sortedUpdates[sortedUpdates.length - 1]!;
const mergedUpdatesMetadata: UpdateMergeMetadata[] = sortedUpdates
.slice(0, -1) // All except the last
.map((update) => ({
id: update.id,
createdAt: update.created_at.toISOString(),
createdBy: update.created_by,
}));

await database.transaction().execute(async (trx) => {
await trx
.updateTable('document_updates')
.set({
data: mergedState,
merged_updates: JSON.stringify([
...(lastUpdate.merged_updates ?? []),
...mergedUpdatesMetadata,
]),
})
.where('id', '=', lastUpdate.id)
.execute();

const updateIdsToDelete = sortedUpdates.slice(0, -1).map((u) => u.id);
if (updateIdsToDelete.length > 0) {
await trx
.deleteFrom('document_updates')
.where('id', 'in', updateIdsToDelete)
.execute();
}
});

return true;
} catch (error) {
debug(`Failed to merge updates for document ${documentId}: ${error}`);
return false;
}
};
4 changes: 4 additions & 0 deletions apps/server/src/jobs/index.ts
4D58
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { assistantRespondHandler } from '@colanode/server/jobs/assistant-response';
import { documentEmbedHandler } from '@colanode/server/jobs/document-embed';
import { documentEmbedScanHandler } from '@colanode/server/jobs/document-embed-scan';
import { documentUpdatesMergeHandler } from '@colanode/server/jobs/document-updates-merge';
import { emailPasswordResetSendHandler } from '@colanode/server/jobs/email-password-reset-sent';
import { emailVerifySendHandler } from '@colanode/server/jobs/email-verify-send';
import { nodeCleanHandler } from '@colanode/server/jobs/node-clean';
import { nodeEmbedHandler } from '@colanode/server/jobs/node-embed';
import { nodeEmbedScanHandler } from '@colanode/server/jobs/node-embed-scan';
import { nodeUpdatesMergeHandler } from '@colanode/server/jobs/node-updates-merge';
import { workspaceCleanHandler } from '@colanode/server/jobs/workspace-clean';

// eslint-disable-next-line @typescript-eslint/no-empty-object-type
Expand All @@ -29,4 +31,6 @@ export const jobHandlerMap: JobHandlerMap = {
'assistant.respond': assistantRespondHandler,
'node.embed.scan': nodeEmbedScanHandler,
'document.embed.scan': documentEmbedScanHandler,
'node.updates.merge': nodeUpdatesMergeHandler,
'document.updates.merge': documentUpdatesMergeHandler,
};
Loading
0