JSPM

durable-cf-streams

0.2.0
  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 179
  • Score
    100M100P100Q45928F
  • License MIT

pure building blocks for durable streams on cloudflare

Package Exports

  • durable-cf-streams
  • durable-cf-streams/storage
  • durable-cf-streams/storage/d1
  • durable-cf-streams/storage/kv
  • durable-cf-streams/storage/memory
  • durable-cf-streams/storage/r2
  • durable-cf-streams/storage/sqlite

Readme

durable-cf-streams

building blocks for durable streams on cloudflare. storage backends and utilities. the idea is that you can borrow utilities and wire up http (or whatever) however you want.

install

pnpm add durable-cf-streams

storage backends

import { MemoryStore } from "durable-cf-streams/storage/memory";
import { SqliteStore } from "durable-cf-streams/storage/sqlite";
import { D1Store } from "durable-cf-streams/storage/d1";
import { KVStore } from "durable-cf-streams/storage/kv";
import { R2Store } from "durable-cf-streams/storage/r2";

// in-memory (for durable objects without persistence)
const store = new MemoryStore();

// sqlite (for durable objects with persistence via SqlStorage)
const store = new SqliteStore(state.storage.sql);
store.initialize(); // creates table

// d1 database
const store = new D1Store(env.DB);
await store.initialize(); // creates table

// workers kv
const store = new KVStore(env.KV);

// r2 bucket
const store = new R2Store(env.BUCKET);

streamstore interface

interface StreamStore {
  put(path: string, options: PutOptions): Promise<PutResult>;
  append(path: string, data: Uint8Array, options?: AppendOptions): Promise<AppendResult>;
  get(path: string, options?: GetOptions): Promise<GetResult>;
  head(path: string): Promise<HeadResult | null>;
  delete(path: string): Promise<void>;
  has(path: string): boolean;
  waitForData(path: string, offset: string, timeoutMs: number): Promise<WaitResult>;
  formatResponse(path: string, messages: StreamMessage[]): Uint8Array;
}

protocol constants

compatible with the durable streams protocol:

import {
  // header constants
  STREAM_OFFSET_HEADER,     // "Stream-Next-Offset"
  STREAM_CURSOR_HEADER,     // "Stream-Cursor"
  STREAM_UP_TO_DATE_HEADER, // "Stream-Up-To-Date"
  STREAM_SEQ_HEADER,        // "Stream-Seq"
  STREAM_TTL_HEADER,        // "Stream-TTL"
  STREAM_EXPIRES_AT_HEADER, // "Stream-Expires-At"
  STREAM_SSE_DATA_ENCODING_HEADER, // "Stream-SSE-Data-Encoding"
  STREAM_CLOSED_HEADER,     // "Stream-Closed"
  STREAM_FORKED_FROM_HEADER, // "Stream-Forked-From"
  STREAM_FORK_OFFSET_HEADER, // "Stream-Fork-Offset"
  STREAM_FORK_SUB_OFFSET_HEADER, // "Stream-Fork-Sub-Offset"
  RESERVED_CONTROL_PATH_SEGMENT, // "__ds"
  PRODUCER_ID_HEADER,       // "Producer-Id"
  PRODUCER_EPOCH_HEADER,    // "Producer-Epoch"
  PRODUCER_SEQ_HEADER,      // "Producer-Seq"
  PRODUCER_EXPECTED_SEQ_HEADER, // "Producer-Expected-Seq"
  PRODUCER_RECEIVED_SEQ_HEADER, // "Producer-Received-Seq"
  CACHE_CONTROL_HEADER,     // "Cache-Control"
  CONTENT_TYPE_OPTIONS_HEADER,        // "X-Content-Type-Options"
  CROSS_ORIGIN_RESOURCE_POLICY_HEADER, // "Cross-Origin-Resource-Policy"

  // response header values
  PROTOCOL_SECURITY_HEADERS,
  HEAD_CACHE_CONTROL_VALUE, // "no-store"
  SSE_CACHE_CONTROL_VALUE,  // "no-cache"
  DEFAULT_CONTENT_TYPE,     // "application/octet-stream"

  // query param constants
  OFFSET_QUERY_PARAM,       // "offset"
  TAIL_OFFSET_QUERY_VALUE,  // "now"
  LIVE_QUERY_PARAM,         // "live"
  CURSOR_QUERY_PARAM,       // "cursor"

  // sse
  SSE_OFFSET_FIELD, // "streamNextOffset"
  SSE_CURSOR_FIELD, // "streamCursor"
  SSE_CLOSED_FIELD, // "streamClosed"
  SSE_COMPATIBLE_CONTENT_TYPES,

  // path encoding
  encodeStreamPath,
  decodeStreamPath,

  // cursor utilities
  calculateCursor,
  generateResponseCursor,
  DEFAULT_CURSOR_EPOCH,
  DEFAULT_CURSOR_INTERVAL_SECONDS,
} from "durable-cf-streams";

branded protocol types

import {
  CursorSchema,
  ETagSchema,
  OffsetSchema,
  ProducerStateMapSchema,
  ProducerStateSchema,
  type Cursor,
  type ETag,
  type Offset,
  type ProducerState,
  type ProducerStateMap,
} from "durable-cf-streams";

utilities

import {
  // offsets
  parseOffset,
  formatOffset,
  compareOffsets,
  isValidOffset,
  initialOffset,
  isSentinelOffset,
  normalizeOffset,
  advanceOffset,
  incrementSeq,
  
  // protocol
  normalizeContentType,
  isJsonContentType,
  isSSETextCompatibleContentType,
  validateTTL,
  validateForkSubOffset,
  validateExpiresAt,
  generateETag,
  parseETag,
  processJsonAppend,
  formatJsonResponse,
  validateJsonCreate,
  encodeSSEData,
  encodeBase64Data,

  // producer idempotency
  parseProducerHeaders,
  evaluateProducerAppend,
  commitProducerAppend,
} from "durable-cf-streams";

errors

tagged errors for pattern matching:

import {
  ContentTypeMismatchError,
  InvalidJsonError,
  InvalidOffsetError,
  InvalidProducerError,
  PayloadTooLargeError,
  ProducerFencedError,
  ProducerSequenceConflictError,
  SequenceConflictError,
  StreamClosedError,
  StreamConflictError,
  StreamGoneError,
  StreamNotFoundError,
  isStreamError,
  streamErrorHeaders,
  streamErrorStatus,
} from "durable-cf-streams";

// check error type
if (error instanceof StreamNotFoundError) {
  return new Response("not found", { status: 404 });
}

// or map any known stream error to its protocol status
if (isStreamError(error)) {
  return new Response(error.message, {
    headers: streamErrorHeaders(error),
    status: streamErrorStatus(error),
  });
}

example

import { SqliteStore } from "durable-cf-streams/storage/sqlite";
import {
  normalizeContentType,
  STREAM_OFFSET_HEADER,
} from "durable-cf-streams";

export class StreamDO extends DurableObject {
  private store: SqliteStore;

  constructor(state: DurableObjectState, env: Env) {
    super(state, env);
    this.store = new SqliteStore(state.storage.sql);
    this.store.initialize();
  }

  async fetch(request: Request): Promise<Response> {
    const path = new URL(request.url).pathname;

    if (request.method === "PUT") {
      const contentType = request.headers.get("content-type");
      const body = new Uint8Array(await request.arrayBuffer());
      
      const result = await this.store.put(path, {
        contentType: contentType ? normalizeContentType(contentType) : undefined,
        data: body.length > 0 ? body : undefined,
      });

      return new Response(null, {
        status: result.created ? 201 : 200,
        headers: {
          [STREAM_OFFSET_HEADER]: result.nextOffset,
          "Content-Type": result.contentType,
        },
      });
    }

    // ...
  }
}

see examples for complete implementations.

license

mit