Package Exports
- port_agent
- port_agent/dist/index.js
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (port_agent) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
Port Agent
A RPC-like facility for making inter-thread function calls.
Introduction
Port Agent provides a simple and intuitive interface that makes inter-thread function calls easy.
Features
- Port Agent will marshal the return value or
Errorfrom the other thread back to the caller. - The other thread may be the main thread or a worker thread.
- Registered functions (i.e.,
Agent.register) persist until deregistered (i.e.,Agent.deregister) . - Late binding registrants will be called with previously awaited invocations.
Table of Contents
Concepts
Agent
An instance of an Agent facilitates communication across threads. The Agent can be used in order to register a function in one thread and call it from another thread. Calls may be made from the main thread to a worker thread, and conversely from a worker thread to the main thread.
Late binding registrants will be called with previously awaited invocations; thus preventing a race condition. This means that you may await a call to a function that has not yet been registered. Once the function is registered in the other thread it will be called and its return value or Error will be marshalled back to the caller.
Please see the Examples for variations on the Agent's usage.
API
The Agent Class
port_agent.Agent(port)
- port
<threads.MessagePort>or<threads.Worker>The message port.
agent.call<T>(name, ...args)
name
<string>The name of the registered function....args
<Array<unknown>>Arguments to be passed to the registered function.Returns:
<Promise<T>>Errors:
- If the registered function in the other thread throws an
Error, theErrorwill be marshalled back from the other thread to this thread and thePromisewill reject with theErroras its failure reason. - If a worker thread throws an unhandled exception while a call is awaited, the
Errorwill be marshalled back from the other thread to this thread and thePromisewill reject with the unhandled exception as its failure reason. - If a worker exits while a call is awaited, the
Errorwill be marshalled back from the other thread to this thread and thePromisewill reject with the exit code as its failure reason.
- If the registered function in the other thread throws an
agent.register(name, fn)
name
<string>The name of the registered function.fn
<(...args: Array<any>) => any>The registered function.Returns:
<void>
agent.deregister(name)
name
<string>The name of the registered function.Returns:
<void>
Usage
How to create an Agent instance.
You can create a new Agent by passing a parentPort or a Worker instance to the Agent constructor:
In the main thread,
const worker = new Worker(fileURLToPath(import.meta.url));
const agent = new Agent(worker);or, in a worker thread,
const agent = new Agent(worker_threads.parentPort);How to use an Agent instance.
You can register a function in the main thread or in a worker thread using the Agent.register method:
agent.register('hello_world', (value: string): string => `Hello, ${value} world!`);You can call a function registered in another thread (i.e., the main thread or a worker thread) using the Agent.call method:
const greeting = await agent.call<string>('hello_world', 'happy');Examples
A Simple Example
In this example you will:
- Instantiate a worker thread.
- Instantiate an
Agentin the main thread. - Use the
Agentto call thehello_worldfunction. - Instantiate an
Agentin the worker thread. - Use the
Agentin order to register a function to handle calls to thehello_worldfunction. - Resolve (3) and log the result to the console.
examples/simple/index.js
import { Worker, isMainThread, parentPort } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
import { Agent } from 'port_agent';
if (isMainThread) { // This is the main thread.
void (async () => {
const worker = new Worker(fileURLToPath(import.meta.url)); // (1)
const agent = new Agent(worker); // (2)
try {
const greeting = await agent.call('hello_world', 'another'); // (3)
console.log(greeting); // (6)
}
catch (err) {
console.error(err);
}
finally {
worker.terminate();
}
})();
}
else { // This is a worker thread.
if (parentPort) {
const agent = new Agent(parentPort); // (4)
agent.register('hello_world', (value) => `Hello, ${value} world!`); // (5)
}
}The example should log to the console:
Hello, another world!Please see the Simple Example for a working implementation.
Test
In this test you will:
- Instantiate a worker thread.
- Instantiate an
Agentin the main thread. - Use the
Agentto call thehello_worldfunction and await resolution.- At this point the
hello_worldfunction has not yet been registered in the worker thread. The function will be called once it is registered.
- At this point the
- Wait for the worker to come online.
- Instantiate an
Agentin the worker thread. - Use the
Agentto register thehello_worldfunction in the worker. - Use the
Agentto register thea_reasonable_assertionfunction in the worker. - Use the
Agentto call avery_late_bindingfunction in the main thread that is not yet registered. - Use the
Agentto call the function registered ashello_worldand await resolution. - Resolve (3) and log the return value.
- Resolve (8) and log the return value.
- Use the
Agentto call the function registered asa_reasonable_assertionand await resolution. - Resolve (11) and catch the Error and log the stack trace in the main thread.
- The Error was marshalled from the Error produced by the reasonable assertion that was made in the
nowThrowAnErrorfunction in the worker thread.
- The Error was marshalled from the Error produced by the reasonable assertion that was made in the
- Terminate the worker thread asynchronously.
- Await abends.
- The worker thread exited; hence, log the exit code.
- If an unhandled exception had occurred in the worker thread it would have been handled accordingly.
- Use the
Agentto register avery_late_bindingfunction in the main thread and log the long disposed thread's ID.
Please see the comments in the code that specify each of the steps above. The output of the test is printed below.
./tests/test/index.ts
import { Worker, isMainThread, parentPort, threadId } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
import { strict as assert } from 'node:assert';
import { Agent } from 'port_agent';
if (isMainThread) { // This is the main thread.
void (async () => {
const worker = new Worker(fileURLToPath(import.meta.url)); // (1)
const agent = new Agent(worker); // (2)
worker.on('online', /*(4)*/ async () => {
try {
const greeting = await agent.call<string>('hello_world', 'again, another'); // (9)
console.log(greeting); // (11)
await agent.call('a_reasonable_assertion', 'To err is Human.'); // (12)
}
catch (err) {
console.error(`Now, back in the main thread, we will handle the`, err); // (13)
}
finally {
void worker.terminate(); // (14)
setTimeout(async () => {
try {
await agent.call<string>('hello_world', 'no more...'); // (15)
}
catch (err) {
if (err instanceof Error) {
console.error(err);
}
else if (typeof err == 'number') {
console.log(`Exit code: ${err.toString()}`); // (16)
}
}
agent.register('very_late_binding', (value: number): void => console.log(`The worker's thread ID was ${value}.`)); // (17)
}, 4);
}
});
try {
// This call will be invoked once the `hello_world` function has been bound in the worker.
const greeting = await agent.call<string>('hello_world', 'another'); // (3)
console.log(greeting); // (10)
}
catch (err) {
console.error(err);
}
})();
} else { // This is a worker thread.
function nowThrowAnError(message: string) {
// This seems reasonable...
assert.notEqual(typeof new Object(), typeof null, message);
}
function callAFunction(message: string) {
nowThrowAnError(message);
}
if (parentPort) {
try {
const agent = new Agent(parentPort); // (5)
agent.register('hello_world', (value: string): string => `Hello, ${value} world!`); // (6)
// This will throw in the main thread.
agent.register('a_reasonable_assertion', callAFunction); // (7).
await agent.call<void>('very_late_binding', threadId); // (8)
}
catch(err) {
console.error(err);
}
}
} This test should log to the console something that looks similar to this:
Hello, another world!
Hello, again, another world!
Now, back in the Main Thread, we will handle the AssertionError [ERR_ASSERTION]: To err is Human.
at nowThrowAnError (file:///port_agent/tests/test/dist/index.js:31:16)
at callAFunction (file:///port_agent/tests/test/dist/index.js:34:9)
at Agent.tryPost (/port_agent/dist/index.js:92:33)
at MessagePort.<anonymous> (/port_agent/dist/index.js:62:36)
at [nodejs.internal.kHybridDispatch] (node:internal/event_target:762:20)
at exports.emitMessage (node:internal/per_context/messageport:23:28) {
generatedMessage: false,
code: 'ERR_ASSERTION',
actual: 'object',
expected: 'object',
operator: 'notStrictEqual'
}
Exit code: 1
The worker's thread ID was 1.Run the Test
Clone the repository.
git clone https://github.com/faranalytics/port_agent.gitChange directory into the root of the repository.
cd port_agentRun the test.
npm run test