// Spawns N worker threads (one per searchTYPEi.json chunk) and orchestrates // queries across them. Workers are kept alive between requests so the chunks // stay loaded in memory (replaces the per-request `php searchmultithreads.php` // fork from the PHP version). // // A filesystem watcher detects when the cron rewrites the chunks and recycles // the worker pool transparently — no server restart needed. import { existsSync, watch } from 'node:fs'; import { dirname, join } from 'node:path'; import { fileURLToPath } from 'node:url'; import { Worker } from 'node:worker_threads'; import { NB_WORKERS, TMDBINTEGRAL_DIR } from '../config.js'; const __dirname = dirname(fileURLToPath(import.meta.url)); const WORKER_PATH = join(__dirname, 'searchWorker.js'); const pools = new Map(); let watcher = null; let reloadTimer = null; const RELOAD_DEBOUNCE_MS = 5000; class WorkerPool { constructor(type) { this.type = type; this.workers = []; this.nextId = 1; this.pending = new Map(); for (let i = 0; i < NB_WORKERS; i++) { const chunkPath = join(TMDBINTEGRAL_DIR, `search${type}${i}.json`); if (!existsSync(chunkPath)) { console.warn(`Missing search chunk: ${chunkPath}`); continue; } const w = new Worker(WORKER_PATH, { workerData: { chunkPath } }); w.on('message', (msg) => this._onMessage(msg)); w.on('error', (err) => console.error(`Worker ${type}/${i} error:`, err)); w.unref(); this.workers.push(w); } } _onMessage(msg) { const entry = this.pending.get(msg.id); if (!entry) return; if (msg.type === 'result') entry.results.push(...msg.results); entry.remaining--; if (entry.remaining === 0) { this.pending.delete(msg.id); entry.resolve(entry.results); } } search(payload) { return new Promise((resolve) => { const id = this.nextId++; this.pending.set(id, { results: [], remaining: this.workers.length, resolve }); for (const w of this.workers) { w.postMessage({ type: 'search', id, payload }); } }); } async terminate() { await Promise.allSettled(this.workers.map((w) => w.terminate())); this.workers = []; } } async function reloadAllPools() { const types = [...pools.keys()]; console.log(`Reloading search pools: ${types.join(', ')}`); for (const type of types) { const old = pools.get(type); pools.set(type, new WorkerPool(type)); old.terminate().catch(() => { /* ignore */ }); } } function ensureWatcher() { if (watcher) return; try { watcher = watch(TMDBINTEGRAL_DIR, (_event, filename) => { if (!filename) return; if (!/^search(movie|tv)\d+\.json$/.test(filename)) return; clearTimeout(reloadTimer); reloadTimer = setTimeout(reloadAllPools, RELOAD_DEBOUNCE_MS); }); watcher.unref(); } catch (err) { console.warn(`Cannot watch ${TMDBINTEGRAL_DIR} for chunk reload:`, err.message); } } export function getPool(type) { if (!pools.has(type)) { pools.set(type, new WorkerPool(type)); ensureWatcher(); } return pools.get(type); } export async function search(type, filteredTitleIn, yearIn) { const pool = getPool(type); const results = await pool.search({ filteredTitleIn, yearIn }); // Sort by delta ASC, then -popularity ASC (i.e. popularity DESC), // then deltaYear ASC, then tmdb ASC. Equivalent to PHP's // array_multisort($deltas, $pops, $deltayears, $tmdbs, ...). results.sort((a, b) => { if (a.delta !== b.delta) return a.delta - b.delta; if (a.pop !== b.pop) return a.pop - b.pop; if (a.deltaYear !== b.deltaYear) return a.deltaYear - b.deltaYear; return a.tmdb - b.tmdb; }); return results; }