Package Exports
- port_agent
- port_agent/dist/index.js
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (port_agent) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
Port Agent
A RPC-like facility for making inter-thread function calls.
Features
- Port Agent will marshall the return value or
Errorfrom the other thread back to the caller. - The other thread may be the main thread or a Worker thread.
- Registered functions (i.e.,
Agent.register) persist until deregistered (i.e.,Agent.deregister) . - Late binding registrants will be called with previously awaited invocations.
Table of Contents
Concepts
Agent
An instance of an Agent facilitates communication across threads. The Agent can be used in order to register a function in one thread and call it from another thread. Calls may be made from the main thread to a worker thread, and conversely from a worker thread to the main thread.
Late binding registrants will be called with previously awaited invocations; thus addressing the race condition. This means that you may await a call to a function that has not yet been registered. Once the function is registered in the other thread it will be called and its return value or Error will be marshalled back to the caller.
Please see the Examples for variations on the Agent's usage.
API
The Agent Class
port_agent.Agent(port)
- port
<threads.MessagePort>or<threads.Worker>The message port.
agent.call<T>(name, ...args)
name
<string>The name of the registered function....args
<Array<unknown>>Arguments to be passed to the registered function.Returns:
<Promise<T>>Errors:
- If the registered function in the other thread throws an
Error, theErrorwill be marshalled back from the other thread to this thread and thePromisewill reject with theErroras its failure reason. - If a worker thread throws an unhandled exception while a call is awaited, the
Errorwill be marshalled back from the other thread to this thread and thePromisewill reject with the unhandled exception as its failure reason. - If a worker exits while a call is awaited, the
Errorwill be marshalled back from the other thread to this thread and thePromisewill reject with the exit code as its failure reason.
- If the registered function in the other thread throws an
agent.register(name, fn)
name
<string>The name of the registered function.fn
<(...args: Array<any>) => any>The registered function.Returns:
<void>
agent.deregister(name)
name
<string>The name of the registered function.Returns:
<void>
Usage
How to create an Agent instance.
You can create a new Agent by passing a parentPort or a Worker instance to the Agent constructor:
In the Main thread,
const worker = new Worker(fileURLToPath(import.meta.url));
const agent = new Agent(worker);or, in a Worker thread,
const agent = new Agent(worker_threads.parentPort);How to use an Agent instance.
You can register a function in the main thread or in a worker thread using the Agent.register method:
agent.register('hello_world', (value: string): string => `Hello, ${value} world!`);You can call a function registered in another thread (i.e., the main thread or a worker thread) using the Agent.call method:
const greeting = await agent.call<string>('hello_world', 'happy');Examples
An Example
In this example you will:
- Instantiate a worker thread.
- Instantiate an Agent in the Main thread.
- Use the Agent to call the
hello_worldfunction and await resolution.- At this point the
hello_worldfunction has not yet been registered in the Worker thread. The function will be called once it is registered.
- At this point the
- Wait for the worker to come online.
- Instantiate an Agent in the worker thread.
- Use the Agent to register the
hello_worldfunction in the worker. - Use the Agent to register the
a_reasonable_assertionfunction in the worker. - Use the Agent to call the function registered as
hello_worldand await resolution. - Resolve (3) and log the return value.
- Resolve (8) and log the return value.
- Use the Agent to call the function registered as
a_reasonable_assertionand await resolution. - Resolve (11) and catch the Error and log the stack trace in the Main thread.
- The Error was marshalled from the Error produced by the reasonable assertion that was made in the
nowThrowAnErrorfunction in the Worker thread.
- The Error was marshalled from the Error produced by the reasonable assertion that was made in the
- Terminate the worker thread asynchronously.
- Await abends.
- The worker thread exited; hence, log the exit code.
- If an unhandled exception had occurred in the worker thread it would have been handled accordingly.
Please see the comments in the code that specify each of the steps above.
./tests/test/index.ts
import { Worker, isMainThread, parentPort } from 'node:worker_threads';
import { fileURLToPath } from 'node:url';
import { strict as assert } from 'node:assert';
import { Agent } from 'port_agent';
if (isMainThread) { // This is the Main Thread.
void (async () => {
const worker = new Worker(fileURLToPath(import.meta.url)); // (1)
const agent = new Agent(worker); // (2)
worker.on('online', /*(4)*/ async () => {
try {
const greeting = await agent.call<string>('hello_world', 'again, another'); // (8)
console.log(greeting); // (10)
await agent.call('a_reasonable_assertion', 'To err is Human.'); // (11)
}
catch (err) {
console.error(`Now, back in the Main Thread, we will handle the`, err); // (12)
}
finally {
void worker.terminate(); // (13)
try {
await agent.call<string>('hello_world', 'no more...'); // (14)
}
catch (err) {
if (err instanceof Error) {
console.error(err);
}
else if (typeof err == 'number') {
console.log(`Exit code: ${err.toString()}`); // (15)
}
}
}
});
// The `hello_world` function call will be invoked in the Worker thread once the function is registered.
const greeting = await agent.call<string>('hello_world', 'another'); // (3)
console.log(greeting); // (9)
})();
} else { // This is a Worker Thread.
function nowThrowAnError(message: string) {
// This *would* seem reasonable, no?...
assert.notEqual(typeof new Object(), typeof null, message);
}
function callAFunction(message: string) {
nowThrowAnError(message);
}
if (parentPort) {
const agent = new Agent(parentPort); // (5)
agent.register('hello_world', (value: string): string => `Hello, ${value} world!`); // (6)
// This will throw in the Main thread
agent.register('a_reasonable_assertion', callAFunction); // (7).
}
} This example should log to the console something that looks similar to this:
Hello, another world!
Hello, again, another world!
Now, back in the Main Thread, we will handle the AssertionError [ERR_ASSERTION]: To err is Human.
at nowThrowAnError (file:///port_agent/tests/test/dist/index.js:31:16)
at callAFunction (file:///port_agent/tests/test/dist/index.js:34:9)
at Agent.tryPost (/port_agent/dist/index.js:92:33)
at MessagePort.<anonymous> (/port_agent/dist/index.js:62:36)
at [nodejs.internal.kHybridDispatch] (node:internal/event_target:762:20)
at exports.emitMessage (node:internal/per_context/messageport:23:28) {
generatedMessage: false,
code: 'ERR_ASSERTION',
actual: 'object',
expected: 'object',
operator: 'notStrictEqual'
}
Exit code: 1Run the test.
You can run the test using:
npm run test