JSPM

  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 1660
  • Score
    100M100P100Q101832F
  • License 0BSD

a small web framework for handling XRPC operations

Package Exports

  • @atcute/xrpc-server
  • @atcute/xrpc-server/auth
  • @atcute/xrpc-server/middlewares/cors

Readme

@atcute/xrpc-server

web framework for XRPC servers.

npm install @atcute/xrpc-server

prerequisites

this framework relies on schemas generated by @atcute/lex-cli, you'd need to follow its quick start guide on how to set it up.

for these examples, we'll use a simple query operation that greets a name:

// file: lexicons/com/example/greet.json
{
    "lexicon": 1,
    "id": "com.example.greet",
    "defs": {
        "main": {
            "type": "query",
            "parameters": {
                "type": "params",
                "required": ["name"],
                "properties": {
                    "name": { "type": "string" }
                }
            },
            "output": {
                "encoding": "application/json",
                "schema": {
                    "type": "object",
                    "required": ["message"],
                    "properties": {
                        "message": { "type": "string" }
                    }
                }
            }
        }
    }
}

usage

handling requests

use addQuery() for queries (GET) and addProcedure() for procedures (POST). handlers receive typed params and input, and return responses using the json() helper:

import { XRPCRouter, json } from '@atcute/xrpc-server';
import { cors } from '@atcute/xrpc-server/middlewares/cors';

import { ComExampleGreet, ComExampleCreatePost } from './lexicons/index.js';

const router = new XRPCRouter({ middlewares: [cors()] });

router.addQuery(ComExampleGreet, {
    async handler({ params }) {
        return json({ message: `hello ${params.name}!` });
    },
});

router.addProcedure(ComExampleCreatePost, {
    async handler({ input }) {
        const post = await db.createPost(input);
        return json(post);
    },
});

export default router;

serving the router

on Deno, Bun or Cloudflare Workers, you can export the router directly and expect it to work out of the box.

for Node.js, you'll need a fetch-to-Node adapter like @remix-run/node-fetch-server since the router works with standard Web Request/Response:

import * as http from 'node:http';
import { createRequestListener } from '@remix-run/node-fetch-server';
import { XRPCRouter } from '@atcute/xrpc-server';

const router = new XRPCRouter();

// ... add handlers ...

const server = http.createServer(createRequestListener(router.fetch));
server.listen(3000, () => {
    console.log('listening on port 3000');
});

standalone handlers

if you only need a single XRPC operation, you can skip creating a router and export a handler directly:

import { createXrpcHandler, json } from '@atcute/xrpc-server';
import { AppBskyFeedGetFeedSkeleton } from '@atcute/bluesky';

export default createXrpcHandler({
    lxm: AppBskyFeedGetFeedSkeleton,
    async handler({ params }) {
        return json({ feed: [] });
    },
});

requests should be routed to /xrpc/<nsid>.

error handling

throw XRPCError in handlers to return error responses:

import { XRPCError } from '@atcute/xrpc-server';

router.addQuery(ComExampleGetPost, {
    async handler({ params, request }) {
        const session = await getSession(request);
        if (!session) {
            throw new XRPCError({ status: 401, error: 'AuthenticationRequired' });
        }

        const post = await db.getPost(params.uri);
        if (!post) {
            throw new XRPCError({ status: 400, error: 'InvalidRequest', message: `post not found` });
        }

        return json(post);
    },
});

convenience subclasses are also available: InvalidRequestError, AuthRequiredError, ForbiddenError, RateLimitExceededError, InternalServerError, UpstreamFailureError, NotEnoughResourcesError, UpstreamTimeoutError.

AuthRequiredError accepts a wwwAuthenticate option that auto-formats an RFC 7235 WWW-Authenticate header on the response (and appends access-control-expose-headers so browsers can read it from CORS responses):

import { AuthRequiredError } from '@atcute/xrpc-server';

throw new AuthRequiredError({
    message: 'invalid token',
    wwwAuthenticate: { scheme: 'Bearer', params: { error: 'BadJwtSignature' } },
});

observing errors

for logs or metrics, use the onError / onSocketError router options. these are fire-and-forget hooks invoked alongside response generation, and they are NOT called for client-induced errors (aborted requests, XRPCError, XRPCSubscriptionError) — only for bugs worth reporting:

const router = new XRPCRouter({
    onError({ error, request }) {
        reportToSentry(error, { url: request.url });
    },
    onSocketError({ error, request }) {
        reportToSentry(error, { url: request.url, subscription: true });
    },
});

health check

the router can optionally answer /xrpc/_health if you pass handleHealthCheck. this endpoint is non-standard — consumers decide the response shape and status.

const router = new XRPCRouter({
    async handleHealthCheck() {
        const healthy = await pingDatabase();
        return Response.json({ status: healthy ? 'ok' : 'degraded' }, { status: healthy ? 200 : 503 });
    },
});

subscriptions

subscriptions provide real-time streaming over WebSocket. they require a runtime-specific adapter:

runtime adapter package
Bun @atcute/xrpc-server-bun
Node.js @atcute/xrpc-server-node
Deno @atcute/xrpc-server-deno
Cloudflare Workers @atcute/xrpc-server-cloudflare

here's an example using Bun:

import { XRPCRouter } from '@atcute/xrpc-server';
import { createBunWebSocket } from '@atcute/xrpc-server-bun';

import { ComExampleSubscribe } from './lexicons/index.js';

const ws = createBunWebSocket();

const router = new XRPCRouter({ websocket: ws.adapter });

router.addSubscription(ComExampleSubscribe, {
    async *handler({ params, signal }) {
        // yield messages until the client disconnects
        while (!signal.aborted) {
            const events = await getNewEvents(params.cursor);

            for (const event of events) {
                yield event;
            }

            await sleep(1000);
        }
    },
});

export default ws.wrap(router);

the handler is an async generator that yields messages. each yielded value is encoded as a CBOR frame and sent to the client. the signal is aborted when the client disconnects.

for subscription errors, use XRPCSubscriptionError:

import { XRPCSubscriptionError } from '@atcute/xrpc-server';

router.addSubscription(ComExampleSubscribe, {
    async *handler({ params }) {
        if (params.cursor && isCursorTooOld(params.cursor)) {
            throw new XRPCSubscriptionError({
                error: 'FutureCursor',
                message: `cursor is too old`,
            });
        }

        // ...
    },
});

backpressure is handled per adapter: each adapter's create*WebSocket() factory accepts highWaterMark / lowWaterMark options (default 250 KB / 50 KB) that throttle the send loop when the outgoing buffer grows. the Cloudflare Workers adapter does not apply backpressure — the runtime does not expose the outgoing WebSocket buffer.

service authentication

the @atcute/xrpc-server/auth subpackage provides utilities for service-to-service authentication using JWTs.

verifying incoming JWTs:

import { ServiceJwtVerifier } from '@atcute/xrpc-server/auth';
import {
    CompositeDidDocumentResolver,
    PlcDidDocumentResolver,
    WebDidDocumentResolver,
} from '@atcute/identity-resolver';

const jwtVerifier = new ServiceJwtVerifier({
    // list of audience values this service accepts. during the transition to proposal 0014
    // (atproto service auth audience), configure both the bare DID and the DID-with-service-ref
    // form so that tokens from older issuers keep working alongside new ones.
    acceptAudiences: ['did:web:my-service.example.com', 'did:web:my-service.example.com#atproto_pds'],
    resolver: new CompositeDidDocumentResolver({
        methods: {
            plc: new PlcDidDocumentResolver(),
            web: new WebDidDocumentResolver(),
        },
    }),
});

router.addQuery(ComExampleProtectedEndpoint, {
    async handler({ request }) {
        const auth = await jwtVerifier.verifyRequest(request, { lxm: 'com.example.protectedEndpoint' });
        return json({ caller: auth.issuer });
    },
});

verifyRequest parses the Authorization: Bearer header, verifies the token, and throws an AuthRequiredError with a populated WWW-Authenticate: Bearer error="…" challenge on every failure path. it forwards request.signal into DID resolution so aborted requests don't keep network calls alive.

additional options tune verification:

const jwtVerifier = new ServiceJwtVerifier({
    acceptAudiences: [...],
    resolver: ...,
    maxAge: 300,       // max token lifetime window in seconds (default 300)
    clockLeeway: 5,    // leeway applied to nbf/exp comparisons (default 5)
    replayStore: {     // optional replay protection; requires jti in tokens
        async check({ iss, jti }, ttlSeconds) {
            const key = `${iss}:${jti}`;
            const isNew = await redis.setnx(key, '1');
            if (isNew === 1) await redis.expire(key, ttlSeconds);
            return isNew === 1;
        },
    },
});

creating outgoing JWTs:

import { createServiceJwt } from '@atcute/xrpc-server/auth';

const jwt = await createServiceJwt({
    keypair: myServiceKeypair,
    issuer: 'did:web:my-service.example.com',
    audience: 'did:plc:targetservice',
    lxm: 'com.example.someEndpoint',
});

// use jwt in Authorization header when calling other services

internal calls

you can make typed calls to your own endpoints using @atcute/client:

import { Client, ok } from '@atcute/client';

const client = new Client({
    handler(pathname, init) {
        return router.fetch(new Request(new URL(pathname, 'http://localhost'), init));
    },
});

const data = await ok(
    client.get('com.example.greet', {
        params: { name: 'world' },
    }),
);

console.log(data.message); // fully typed!