2026-04-23 08:37:48 +02:00
|
|
|
// Spawns N worker threads (one per searchTYPEi.json chunk) and orchestrates
|
|
|
|
|
// queries across them. Workers are kept alive between requests so the chunks
|
|
|
|
|
// stay loaded in memory (replaces the per-request `php searchmultithreads.php`
|
|
|
|
|
// fork from the PHP version).
|
Phase 1: lock cron, reload chaud, argon2, providers, IMDb lookup, cache LRU, /health, /metrics, rate limit, UI dark, biome
2026-04-24 07:35:10 +02:00
|
|
|
//
|
|
|
|
|
// A filesystem watcher detects when the cron rewrites the chunks and recycles
|
|
|
|
|
// the worker pool transparently — no server restart needed.
|
2026-04-23 08:37:48 +02:00
|
|
|
|
Phase 1: lock cron, reload chaud, argon2, providers, IMDb lookup, cache LRU, /health, /metrics, rate limit, UI dark, biome
2026-04-24 07:35:10 +02:00
|
|
|
import { existsSync, watch } from 'node:fs';
|
|
|
|
|
import { dirname, join } from 'node:path';
|
2026-04-23 08:37:48 +02:00
|
|
|
import { fileURLToPath } from 'node:url';
|
Phase 1: lock cron, reload chaud, argon2, providers, IMDb lookup, cache LRU, /health, /metrics, rate limit, UI dark, biome
2026-04-24 07:35:10 +02:00
|
|
|
import { Worker } from 'node:worker_threads';
|
|
|
|
|
import { NB_WORKERS, TMDBINTEGRAL_DIR } from '../config.js';
|
2026-04-23 08:37:48 +02:00
|
|
|
|
|
|
|
|
const __dirname = dirname(fileURLToPath(import.meta.url));
|
|
|
|
|
const WORKER_PATH = join(__dirname, 'searchWorker.js');
|
|
|
|
|
|
|
|
|
|
const pools = new Map();
|
Phase 1: lock cron, reload chaud, argon2, providers, IMDb lookup, cache LRU, /health, /metrics, rate limit, UI dark, biome
2026-04-24 07:35:10 +02:00
|
|
|
let watcher = null;
|
|
|
|
|
let reloadTimer = null;
|
|
|
|
|
const RELOAD_DEBOUNCE_MS = 5000;
|
2026-04-23 08:37:48 +02:00
|
|
|
|
|
|
|
|
class WorkerPool {
|
|
|
|
|
constructor(type) {
|
|
|
|
|
this.type = type;
|
|
|
|
|
this.workers = [];
|
|
|
|
|
this.nextId = 1;
|
|
|
|
|
this.pending = new Map();
|
|
|
|
|
for (let i = 0; i < NB_WORKERS; i++) {
|
|
|
|
|
const chunkPath = join(TMDBINTEGRAL_DIR, `search${type}${i}.json`);
|
|
|
|
|
if (!existsSync(chunkPath)) {
|
|
|
|
|
console.warn(`Missing search chunk: ${chunkPath}`);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
const w = new Worker(WORKER_PATH, { workerData: { chunkPath } });
|
|
|
|
|
w.on('message', (msg) => this._onMessage(msg));
|
|
|
|
|
w.on('error', (err) => console.error(`Worker ${type}/${i} error:`, err));
|
|
|
|
|
w.unref();
|
|
|
|
|
this.workers.push(w);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_onMessage(msg) {
|
|
|
|
|
const entry = this.pending.get(msg.id);
|
|
|
|
|
if (!entry) return;
|
|
|
|
|
if (msg.type === 'result') entry.results.push(...msg.results);
|
|
|
|
|
entry.remaining--;
|
|
|
|
|
if (entry.remaining === 0) {
|
|
|
|
|
this.pending.delete(msg.id);
|
|
|
|
|
entry.resolve(entry.results);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
search(payload) {
|
|
|
|
|
return new Promise((resolve) => {
|
|
|
|
|
const id = this.nextId++;
|
|
|
|
|
this.pending.set(id, { results: [], remaining: this.workers.length, resolve });
|
|
|
|
|
for (const w of this.workers) {
|
|
|
|
|
w.postMessage({ type: 'search', id, payload });
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
Phase 1: lock cron, reload chaud, argon2, providers, IMDb lookup, cache LRU, /health, /metrics, rate limit, UI dark, biome
2026-04-24 07:35:10 +02:00
|
|
|
|
|
|
|
|
async terminate() {
|
|
|
|
|
await Promise.allSettled(this.workers.map((w) => w.terminate()));
|
|
|
|
|
this.workers = [];
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function reloadAllPools() {
|
|
|
|
|
const types = [...pools.keys()];
|
|
|
|
|
console.log(`Reloading search pools: ${types.join(', ')}`);
|
|
|
|
|
for (const type of types) {
|
|
|
|
|
const old = pools.get(type);
|
|
|
|
|
pools.set(type, new WorkerPool(type));
|
|
|
|
|
old.terminate().catch(() => {
|
|
|
|
|
/* ignore */
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function ensureWatcher() {
|
|
|
|
|
if (watcher) return;
|
|
|
|
|
try {
|
|
|
|
|
watcher = watch(TMDBINTEGRAL_DIR, (_event, filename) => {
|
|
|
|
|
if (!filename) return;
|
|
|
|
|
if (!/^search(movie|tv)\d+\.json$/.test(filename)) return;
|
|
|
|
|
clearTimeout(reloadTimer);
|
|
|
|
|
reloadTimer = setTimeout(reloadAllPools, RELOAD_DEBOUNCE_MS);
|
|
|
|
|
});
|
|
|
|
|
watcher.unref();
|
|
|
|
|
} catch (err) {
|
|
|
|
|
console.warn(`Cannot watch ${TMDBINTEGRAL_DIR} for chunk reload:`, err.message);
|
|
|
|
|
}
|
2026-04-23 08:37:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export function getPool(type) {
|
Phase 1: lock cron, reload chaud, argon2, providers, IMDb lookup, cache LRU, /health, /metrics, rate limit, UI dark, biome
2026-04-24 07:35:10 +02:00
|
|
|
if (!pools.has(type)) {
|
|
|
|
|
pools.set(type, new WorkerPool(type));
|
|
|
|
|
ensureWatcher();
|
|
|
|
|
}
|
2026-04-23 08:37:48 +02:00
|
|
|
return pools.get(type);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export async function search(type, filteredTitleIn, yearIn) {
|
|
|
|
|
const pool = getPool(type);
|
|
|
|
|
const results = await pool.search({ filteredTitleIn, yearIn });
|
|
|
|
|
|
|
|
|
|
// Sort by delta ASC, then -popularity ASC (i.e. popularity DESC),
|
|
|
|
|
// then deltaYear ASC, then tmdb ASC. Equivalent to PHP's
|
|
|
|
|
// array_multisort($deltas, $pops, $deltayears, $tmdbs, ...).
|
|
|
|
|
results.sort((a, b) => {
|
|
|
|
|
if (a.delta !== b.delta) return a.delta - b.delta;
|
|
|
|
|
if (a.pop !== b.pop) return a.pop - b.pop;
|
|
|
|
|
if (a.deltaYear !== b.deltaYear) return a.deltaYear - b.deltaYear;
|
|
|
|
|
return a.tmdb - b.tmdb;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return results;
|
|
|
|
|
}
|