Package Exports
- durable-cf-streams
- durable-cf-streams/storage
- durable-cf-streams/storage/d1
- durable-cf-streams/storage/kv
- durable-cf-streams/storage/memory
- durable-cf-streams/storage/r2
- durable-cf-streams/storage/sqlite
Readme
durable-cf-streams
building blocks for durable streams on cloudflare. storage backends and utilities. the idea is that you can borrow utilities and wire up http (or whatever) however you want.
install
pnpm add durable-cf-streamsstorage backends
import { MemoryStore } from "durable-cf-streams/storage/memory";
import { SqliteStore } from "durable-cf-streams/storage/sqlite";
import { D1Store } from "durable-cf-streams/storage/d1";
import { KVStore } from "durable-cf-streams/storage/kv";
import { R2Store } from "durable-cf-streams/storage/r2";
// in-memory (for durable objects without persistence)
const store = new MemoryStore();
// sqlite (for durable objects with persistence via SqlStorage)
const store = new SqliteStore(state.storage.sql);
store.initialize(); // creates table
// d1 database
const store = new D1Store(env.DB);
await store.initialize(); // creates table
// workers kv
const store = new KVStore(env.KV);
// r2 bucket
const store = new R2Store(env.BUCKET);streamstore interface
interface StreamStore {
put(path: string, options: PutOptions): Promise<PutResult>;
append(path: string, data: Uint8Array, options?: AppendOptions): Promise<AppendResult>;
get(path: string, options?: GetOptions): Promise<GetResult>;
head(path: string): Promise<HeadResult | null>;
delete(path: string): Promise<void>;
has(path: string): boolean;
waitForData(path: string, offset: string, timeoutMs: number): Promise<WaitResult>;
formatResponse(path: string, messages: StreamMessage[]): Uint8Array;
}protocol constants
compatible with the durable streams protocol:
import {
// header constants
STREAM_OFFSET_HEADER, // "Stream-Next-Offset"
STREAM_CURSOR_HEADER, // "Stream-Cursor"
STREAM_UP_TO_DATE_HEADER, // "Stream-Up-To-Date"
STREAM_SEQ_HEADER, // "Stream-Seq"
STREAM_TTL_HEADER, // "Stream-TTL"
STREAM_EXPIRES_AT_HEADER, // "Stream-Expires-At"
STREAM_SSE_DATA_ENCODING_HEADER, // "Stream-SSE-Data-Encoding"
STREAM_CLOSED_HEADER, // "Stream-Closed"
STREAM_FORKED_FROM_HEADER, // "Stream-Forked-From"
STREAM_FORK_OFFSET_HEADER, // "Stream-Fork-Offset"
STREAM_FORK_SUB_OFFSET_HEADER, // "Stream-Fork-Sub-Offset"
RESERVED_CONTROL_PATH_SEGMENT, // "__ds"
PRODUCER_ID_HEADER, // "Producer-Id"
PRODUCER_EPOCH_HEADER, // "Producer-Epoch"
PRODUCER_SEQ_HEADER, // "Producer-Seq"
PRODUCER_EXPECTED_SEQ_HEADER, // "Producer-Expected-Seq"
PRODUCER_RECEIVED_SEQ_HEADER, // "Producer-Received-Seq"
CACHE_CONTROL_HEADER, // "Cache-Control"
CONTENT_TYPE_OPTIONS_HEADER, // "X-Content-Type-Options"
CROSS_ORIGIN_RESOURCE_POLICY_HEADER, // "Cross-Origin-Resource-Policy"
// response header values
PROTOCOL_SECURITY_HEADERS,
HEAD_CACHE_CONTROL_VALUE, // "no-store"
SSE_CACHE_CONTROL_VALUE, // "no-cache"
DEFAULT_CONTENT_TYPE, // "application/octet-stream"
// query param constants
OFFSET_QUERY_PARAM, // "offset"
TAIL_OFFSET_QUERY_VALUE, // "now"
LIVE_QUERY_PARAM, // "live"
CURSOR_QUERY_PARAM, // "cursor"
// sse
SSE_OFFSET_FIELD, // "streamNextOffset"
SSE_CURSOR_FIELD, // "streamCursor"
SSE_CLOSED_FIELD, // "streamClosed"
SSE_COMPATIBLE_CONTENT_TYPES,
// path encoding
encodeStreamPath,
decodeStreamPath,
// cursor utilities
calculateCursor,
generateResponseCursor,
DEFAULT_CURSOR_EPOCH,
DEFAULT_CURSOR_INTERVAL_SECONDS,
} from "durable-cf-streams";branded protocol types
import {
CursorSchema,
ETagSchema,
OffsetSchema,
ProducerStateMapSchema,
ProducerStateSchema,
type Cursor,
type ETag,
type Offset,
type ProducerState,
type ProducerStateMap,
} from "durable-cf-streams";utilities
import {
// offsets
parseOffset,
formatOffset,
compareOffsets,
isValidOffset,
initialOffset,
isSentinelOffset,
normalizeOffset,
advanceOffset,
incrementSeq,
// protocol
normalizeContentType,
isJsonContentType,
isSSETextCompatibleContentType,
validateTTL,
validateForkSubOffset,
validateExpiresAt,
generateETag,
parseETag,
processJsonAppend,
formatJsonResponse,
validateJsonCreate,
encodeSSEData,
encodeBase64Data,
// producer idempotency
parseProducerHeaders,
evaluateProducerAppend,
commitProducerAppend,
} from "durable-cf-streams";errors
tagged errors for pattern matching:
import {
ContentTypeMismatchError,
InvalidJsonError,
InvalidOffsetError,
InvalidProducerError,
PayloadTooLargeError,
ProducerFencedError,
ProducerSequenceConflictError,
SequenceConflictError,
StreamClosedError,
StreamConflictError,
StreamGoneError,
StreamNotFoundError,
isStreamError,
streamErrorHeaders,
streamErrorStatus,
} from "durable-cf-streams";
// check error type
if (error instanceof StreamNotFoundError) {
return new Response("not found", { status: 404 });
}
// or map any known stream error to its protocol status
if (isStreamError(error)) {
return new Response(error.message, {
headers: streamErrorHeaders(error),
status: streamErrorStatus(error),
});
}example
import { SqliteStore } from "durable-cf-streams/storage/sqlite";
import {
normalizeContentType,
STREAM_OFFSET_HEADER,
} from "durable-cf-streams";
export class StreamDO extends DurableObject {
private store: SqliteStore;
constructor(state: DurableObjectState, env: Env) {
super(state, env);
this.store = new SqliteStore(state.storage.sql);
this.store.initialize();
}
async fetch(request: Request): Promise<Response> {
const path = new URL(request.url).pathname;
if (request.method === "PUT") {
const contentType = request.headers.get("content-type");
const body = new Uint8Array(await request.arrayBuffer());
const result = await this.store.put(path, {
contentType: contentType ? normalizeContentType(contentType) : undefined,
data: body.length > 0 ? body : undefined,
});
return new Response(null, {
status: result.created ? 201 : 200,
headers: {
[STREAM_OFFSET_HEADER]: result.nextOffset,
"Content-Type": result.contentType,
},
});
}
// ...
}
}see examples for complete implementations.
license
mit