Skip to content
This repository has been archived by the owner on Jan 31, 2024. It is now read-only.

Thread more imports #187

Merged
merged 4 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/backend/src/core/QueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,14 @@ export class QueueService {
}

@bindThis
public createImportMastoToDbJob(user: ThinUser, targets: string[]) {
const jobs = targets.map(rel => this.generateToDbJobData('importMastoToDb', { user, target: rel }));
public createImportMastoToDbJob(user: ThinUser, targets: string[], note: MiNote['id'] | null) {
const jobs = targets.map(rel => this.generateToDbJobData('importMastoToDb', { user, target: rel, note }));
return this.dbQueue.addBulk(jobs);
}

@bindThis
public createImportPleroToDbJob(user: ThinUser, targets: string[]) {
const jobs = targets.map(rel => this.generateToDbJobData('importPleroToDb', { user, target: rel }));
public createImportPleroToDbJob(user: ThinUser, targets: string[], note: MiNote['id'] | null) {
const jobs = targets.map(rel => this.generateToDbJobData('importPleroToDb', { user, target: rel, note }));
return this.dbQueue.addBulk(jobs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { extractApHashtagObjects } from '@/core/activitypub/models/tag.js';
import { IdService } from '@/core/IdService.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbKeyNoteImportToDbJobData } from '../types.js';
import type { DbNoteImportToDbJobData, DbNoteImportJobData, DbNoteWithParentImportToDbJobData } from '../types.js';

@Injectable()
export class ImportNotesProcessorService {
Expand Down Expand Up @@ -74,7 +74,7 @@ export class ImportNotesProcessorService {

// Function was taken from Firefish and modified for our needs
@bindThis
private async recreateChain(idField: string, replyField: string, arr: any[]): Promise<any[]> {
private async recreateChain(idFieldPath: string[], replyFieldPath: string[], arr: any[], includeOrphans: boolean): Promise<any[]> {
type NotesMap = {
[id: string]: any;
};
Expand All @@ -83,28 +83,42 @@ export class ImportNotesProcessorService {
const notesWaitingForParent: NotesMap = {};

for await (const note of arr) {
noteById[note[idField]] = note;
const noteId = idFieldPath.reduce(
(obj, step) => obj[step],
note,
);

noteById[noteId] = note;
note.childNotes = [];

const children = notesWaitingForParent[note[idField]];
const children = notesWaitingForParent[noteId];
if (children) {
note.childNotes.push(...children);
delete notesWaitingForParent[noteId];
}

if (note[replyField] == null) {
const noteReplyId = replyFieldPath.reduce(
(obj, step) => obj[step],
note,
);
if (noteReplyId == null) {
notesTree.push(note);
continue;
}

const parent = noteById[note[replyField]];
const parent = noteById[noteReplyId];
if (parent) {
parent.childNotes.push(note);
} else {
notesWaitingForParent[note[replyField]] ||= [];
notesWaitingForParent[note[replyField]].push(note);
notesWaitingForParent[noteReplyId] ||= [];
notesWaitingForParent[noteReplyId].push(note);
}
}

if (includeOrphans) {
notesTree.push(...Object.values(notesWaitingForParent).flat(1));
}

return notesTree;
}

Expand Down Expand Up @@ -176,7 +190,7 @@ export class ImportNotesProcessorService {
const tweets = Object.keys(fakeWindow.window.YTD.tweets.part0).reduce((m, key, i, obj) => {
return m.concat(fakeWindow.window.YTD.tweets.part0[key].tweet);
}, []);
const processedTweets = await this.recreateChain('id_str', 'in_reply_to_status_id_str', tweets);
const processedTweets = await this.recreateChain(['id_str'], ['in_reply_to_status_id_str'], tweets, false);
this.queueService.createImportTweetsToDbJob(job.data.user, processedTweets, null);
} finally {
cleanup();
Expand Down Expand Up @@ -254,7 +268,8 @@ export class ImportNotesProcessorService {
if (isPleroma) {
const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8');
const outbox = JSON.parse(outboxJson);
this.queueService.createImportPleroToDbJob(job.data.user, outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'));
const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true);
this.queueService.createImportPleroToDbJob(job.data.user, processedToots, null);
} else {
const outboxJson = fs.readFileSync(outputPath + '/outbox.json', 'utf-8');
const outbox = JSON.parse(outboxJson);
Expand All @@ -266,7 +281,8 @@ export class ImportNotesProcessorService {
if (fs.existsSync(outputPath + '/media_attachments/files') && mastoFolder) {
await this.uploadFiles(outputPath + '/media_attachments/files', user, mastoFolder.id);
}
this.queueService.createImportMastoToDbJob(job.data.user, outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'));
const processedToots = await this.recreateChain(['object', 'id'], ['object', 'inReplyTo'], outbox.orderedItems.filter((x: any) => x.type === 'Create' && x.object.type === 'Note'), true);
this.queueService.createImportMastoToDbJob(job.data.user, processedToots, null);
}
}
} finally {
Expand All @@ -289,7 +305,7 @@ export class ImportNotesProcessorService {

const notesJson = fs.readFileSync(path, 'utf-8');
const notes = JSON.parse(notesJson);
const processedNotes = await this.recreateChain('id', 'replyId', notes);
const processedNotes = await this.recreateChain(['id'], ['replyId'], notes, false);
this.queueService.createImportKeyNotesToDbJob(job.data.user, processedNotes, null);
cleanup();
}
Expand All @@ -298,7 +314,7 @@ export class ImportNotesProcessorService {
}

@bindThis
public async processKeyNotesToDb(job: Bull.Job<DbKeyNoteImportToDbJobData>): Promise<void> {
public async processKeyNotesToDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> {
const note = job.data.target;
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
Expand Down Expand Up @@ -355,28 +371,33 @@ export class ImportNotesProcessorService {
}

@bindThis
public async processMastoToDb(job: Bull.Job<DbNoteImportToDbJobData>): Promise<void> {
public async processMastoToDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> {
const toot = job.data.target;
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
return;
}

if (toot.directMessage) return;

const date = new Date(toot.object.published);
let text = undefined;
const files: MiDriveFile[] = [];
let reply: MiNote | null = null;

if (toot.object.inReplyTo != null) {
try {
reply = await this.apNoteService.resolveNote(toot.object.inReplyTo);
} catch (error) {
reply = null;
const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null;
if (parentNote) {
reply = parentNote;
} else {
try {
reply = await this.apNoteService.resolveNote(toot.object.inReplyTo);
} catch (error) {
reply = null;
}
}
}

if (toot.directMessage) return;

const hashtags = extractApHashtagObjects(toot.object.tag).map((x) => x.name).filter((x): x is string => x != null);

try {
Expand All @@ -396,17 +417,20 @@ export class ImportNotesProcessorService {
}
}

await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: toot.object.sensitive ? toot.object.summary : null, reply: reply });
const createdNote = await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: toot.object.sensitive ? toot.object.summary : null, reply: reply });
if (toot.childNotes) this.queueService.createImportMastoToDbJob(user, toot.childNotes, createdNote.id);
}

@bindThis
public async processPleroToDb(job: Bull.Job<DbNoteImportToDbJobData>): Promise<void> {
public async processPleroToDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> {
const post = job.data.target;
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
return;
}

if (post.directMessage) return;

const date = new Date(post.object.published);
let text = undefined;
const files: MiDriveFile[] = [];
Expand All @@ -416,15 +440,18 @@ export class ImportNotesProcessorService {
if (folder == null) return;

if (post.object.inReplyTo != null) {
try {
reply = await this.apNoteService.resolveNote(post.object.inReplyTo);
} catch (error) {
reply = null;
const parentNote = job.data.note ? await this.notesRepository.findOneBy({ id: job.data.note }) : null;
if (parentNote) {
reply = parentNote;
} else {
try {
reply = await this.apNoteService.resolveNote(post.object.inReplyTo);
} catch (error) {
reply = null;
}
}
}

if (post.directMessage) return;

const hashtags = extractApHashtagObjects(post.object.tag).map((x) => x.name).filter((x): x is string => x != null);

try {
Expand Down Expand Up @@ -468,7 +495,8 @@ export class ImportNotesProcessorService {
}
}

await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: post.object.sensitive ? post.object.summary : null, reply: reply });
const createdNote = await this.noteCreateService.import(user, { createdAt: date, text: text, files: files, apMentions: new Array(0), cw: post.object.sensitive ? post.object.summary : null, reply: reply });
if (post.childNotes) this.queueService.createImportPleroToDbJob(user, post.childNotes, createdNote.id);
}

@bindThis
Expand Down Expand Up @@ -517,7 +545,7 @@ export class ImportNotesProcessorService {
}

@bindThis
public async processTwitterDb(job: Bull.Job<DbKeyNoteImportToDbJobData>): Promise<void> {
public async processTwitterDb(job: Bull.Job<DbNoteWithParentImportToDbJobData>): Promise<void> {
const tweet = job.data.target;
const user = await this.usersRepository.findOneBy({ id: job.data.user.id });
if (user == null) {
Expand Down
10 changes: 5 additions & 5 deletions packages/backend/src/queue/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ export type DbJobMap = {
exportUserLists: DbJobDataWithUser;
importAntennas: DBAntennaImportJobData;
importNotes: DbNoteImportJobData;
importTweetsToDb: DbKeyNoteImportToDbJobData;
importTweetsToDb: DbNoteWithParentImportToDbJobData;
importIGToDb: DbNoteImportToDbJobData;
importFBToDb: DbNoteImportToDbJobData;
importMastoToDb: DbNoteImportToDbJobData;
importPleroToDb: DbNoteImportToDbJobData;
importKeyNotesToDb: DbKeyNoteImportToDbJobData;
importMastoToDb: DbNoteWithParentImportToDbJobData;
importPleroToDb: DbNoteWithParentImportToDbJobData;
importKeyNotesToDb: DbNoteWithParentImportToDbJobData;
importFollowing: DbUserImportJobData;
importFollowingToDb: DbUserImportToDbJobData;
importMuting: DbUserImportJobData;
Expand Down Expand Up @@ -113,7 +113,7 @@ export type DbNoteImportToDbJobData = {
target: any;
};

export type DbKeyNoteImportToDbJobData = {
export type DbNoteWithParentImportToDbJobData = {
user: ThinUser;
target: any;
note: MiNote['id'] | null;
Expand Down
Loading