Многопоточность
Encodexx позволяет вам удобно работать с SharedArrayBuffer
и Atomics
для создания многопоточных приложений. Тут представлен базовый пример создания очереди задач.
Опишите схему для задачи:
import { Serializer, t } from "encodexx";
export const TaskSchema = new Serializer([ { id: t.str, name: t.str, createdAt: t.optional(t.date), payload: t.optional(t.str), },]);
Далее используем SharedArrayBuffer
и Atomics
для создания очереди задач:
import { Worker } from "worker_threads";import { TaskSchema } from "./schema.js";
const TASKS = [ { id: "1", name: "Task 1" }, { id: "2", name: "Task 2" }, { id: "3", name: "Task 3" }, { id: "4", name: "Task 4" }, { id: "5", name: "Task 5" },];
const encoded = TaskSchema.encode(TASKS).uint8Array;
// Создаём sharedBuffer и view с запасом на размер и lockconst sharedBuffer = new SharedArrayBuffer(encoded.byteLength + 4);
// 4 байта выделяем отдельно под lock в начале буфераconst lockView = new Int32Array(sharedBuffer, 0, 1);const tasksView = new Uint8Array(sharedBuffer, 4);
// Изначально устанавливаем lock = 0 (свободно)Atomics.store(lockView, 0, 0);tasksView.set(encoded);
// Запускаем потокиconst numWorkers = 2;for (let i = 0; i < numWorkers; i++) { new Worker("./worker.js", { workerData: sharedBuffer });}
import { workerData } from "worker_threads";import { TaskSchema } from "./schema.js";
const lockView = new Int32Array(workerData, 0, 1);const tasksView = new Uint8Array(workerData, 4);
// Функции блокировкиfunction lock() { while (Atomics.compareExchange(lockView, 0, 0, 1) !== 0) { Atomics.wait(lockView, 0, 1); }}
function release() { Atomics.store(lockView, 0, 0); Atomics.notify(lockView, 0, 1);}
async function processTasks() { lock();
const tasks = TaskSchema.decode(tasksView); const pendingTask = tasks.shift(); tasksView.set(TaskSchema.encode(tasks).uint8Array);
release();
if (pendingTask) { console.log(`Worker ${process.pid} взял в работу задачу ${pendingTask.name}`);
setTimeout(() => { console.log(`Worker ${process.pid} завершил задачу ${pendingTask.name}`); processTasks(); }, (Math.random() + 0.5) * 3000); } else { console.log(`Worker ${process.pid} - задач больше нет`); }}
processTasks();
Запустим код:
node main.js
Worker 1 взял в работу задачу Task 1Worker 2 взял в работу задачу Task 2Worker 1 завершил задачу Task 1Worker 1 взял в работу задачу Task 3Worker 2 завершил задачу Task 2Worker 2 взял в работу задачу Task 4Worker 1 завершил задачу Task 3Worker 1 взял в работу задачу Task 5Worker 2 завершил задачу Task 4Worker 2 - задач больше нетWorker 1 завершил задачу Task 5Worker 1 - задач больше нет