Multithreading
Encodexx allows you to conveniently work with SharedArrayBuffer
and Atomics
to create multithreaded applications. Below is a basic example of creating a task queue.
Describe the schema for the task:
import { Serializer, t } from "encodexx";
export const TaskSchema = new Serializer([ { id: t.str, name: t.str, createdAt: t.optional(t.date), payload: t.optional(t.str), },]);
Next, we use SharedArrayBuffer
and Atomics
to create a task queue:
import { Worker } from "worker_threads";import { TaskSchema } from "./schema.js";
const TASKS = [ { id: "1", name: "Task 1" }, { id: "2", name: "Task 2" }, { id: "3", name: "Task 3" }, { id: "4", name: "Task 4" }, { id: "5", name: "Task 5" },];
const encoded = TaskSchema.encode(TASKS).uint8Array;
// Create a sharedBuffer and a view with space for data plus the lockconst sharedBuffer = new SharedArrayBuffer(encoded.byteLength + 4);
// Allocate 4 bytes at the start of the buffer for the lockconst lockView = new Int32Array(sharedBuffer, 0, 1);const tasksView = new Uint8Array(sharedBuffer, 4);
// Initially set lock = 0 (unlocked)Atomics.store(lockView, 0, 0);tasksView.set(encoded);
// Launch the threadsconst numWorkers = 2;for (let i = 0; i < numWorkers; i++) { new Worker("./worker.js", { workerData: sharedBuffer });}
import { workerData } from "worker_threads";import { TaskSchema } from "./schema.js";
const lockView = new Int32Array(workerData, 0, 1);const tasksView = new Uint8Array(workerData, 4);
// Locking functionsfunction lock() { while (Atomics.compareExchange(lockView, 0, 0, 1) !== 0) { Atomics.wait(lockView, 0, 1); }}
function release() { Atomics.store(lockView, 0, 0); Atomics.notify(lockView, 0, 1);}
async function processTasks() { lock();
const tasks = TaskSchema.decode(tasksView); const pendingTask = tasks.shift(); tasksView.set(TaskSchema.encode(tasks).uint8Array);
release();
if (pendingTask) { console.log(`Worker ${process.pid} took the task ${pendingTask.name} to process`);
setTimeout(() => { console.log(`Worker ${process.pid} finished the task ${pendingTask.name}`); processTasks(); }, (Math.random() + 0.5) * 3000); } else { console.log(`Worker ${process.pid} - no more tasks`); }}
processTasks();
Run the code:
node main.js
Worker 1 took the task Task 1 to processWorker 2 took the task Task 2 to processWorker 2 finished the task Task 2Worker 2 took the task Task 3 to processWorker 1 finished the task Task 1Worker 1 took the task Task 4 to processWorker 2 finished the task Task 3Worker 2 took the task Task 5 to processWorker 1 finished the task Task 4Worker 1 - no more tasksWorker 2 finished the task Task 5Worker 2 - no more tasks