JSPM

durable-cf-streams

0.1.0
  • ESM via JSPM
  • ES Module Entrypoint
  • Export Map
  • Keywords
  • License
  • Repository URL
  • TypeScript Types
  • README
  • Created
  • Published
  • Downloads 10
  • Score
    100M100P100Q42463F
  • License MIT

pure building blocks for durable streams on cloudflare

Package Exports

  • durable-cf-streams
  • durable-cf-streams/storage
  • durable-cf-streams/storage/d1
  • durable-cf-streams/storage/kv
  • durable-cf-streams/storage/memory
  • durable-cf-streams/storage/r2

Readme

durable-cf-streams

building blocks for durable streams on cloudflare. storage backends and utilities. the idea is that you can borrow utilities and wire up http (or whatever) however you want.

install

pnpm add durable-cf-streams

storage backends

import { MemoryStore } from "durable-cf-streams/storage/memory";
import { D1Store } from "durable-cf-streams/storage/d1";
import { KVStore } from "durable-cf-streams/storage/kv";
import { R2Store } from "durable-cf-streams/storage/r2";

// in-memory (for durable objects)
const store = new MemoryStore();

// d1 database
const store = new D1Store(env.DB);
await store.initialize(); // creates table

// workers kv
const store = new KVStore(env.KV);

// r2 bucket
const store = new R2Store(env.BUCKET);

streamstore interface

interface StreamStore {
  put(path: string, options: PutOptions): Promise<PutResult>;
  append(path: string, data: Uint8Array, options?: AppendOptions): Promise<AppendResult>;
  get(path: string, options?: GetOptions): Promise<GetResult>;
  head(path: string): Promise<HeadResult | null>;
  delete(path: string): Promise<void>;
  has(path: string): boolean;
  waitForData(path: string, offset: string, timeoutMs: number): Promise<WaitResult>;
  formatResponse(path: string, messages: StreamMessage[]): Uint8Array;
}

utilities

import {
  // offsets
  parseOffset,
  formatOffset,
  compareOffsets,
  isValidOffset,
  initialOffset,
  
  // cursors
  generateCursor,
  getNextCursor,
  
  // protocol
  normalizeContentType,
  validateTTL,
  validateExpiresAt,
  generateETag,
  isJsonContentType,
} from "durable-cf-streams";

errors

tagged errors for pattern matching:

import {
  StreamNotFoundError,
  SequenceConflictError,
  ContentTypeMismatchError,
  StreamConflictError,
  InvalidJsonError,
  PayloadTooLargeError,
} from "durable-cf-streams";

// check error type
if (error instanceof StreamNotFoundError) {
  return new Response("not found", { status: 404 });
}

// or use _tag for switch
switch (error._tag) {
  case "StreamNotFoundError": return new Response("not found", { status: 404 });
  case "SequenceConflictError": return new Response("conflict", { status: 409 });
}

example

import { MemoryStore } from "durable-cf-streams/storage/memory";
import { normalizeContentType } from "durable-cf-streams";

export class StreamDO implements DurableObject {
  private store = new MemoryStore();

  async fetch(request: Request): Promise<Response> {
    const path = new URL(request.url).pathname;

    if (request.method === "PUT") {
      const contentType = request.headers.get("content-type") ?? "application/octet-stream";
      const body = new Uint8Array(await request.arrayBuffer());
      
      const result = await this.store.put(path, {
        contentType: normalizeContentType(contentType),
        data: body.length > 0 ? body : undefined,
      });

      return new Response(null, {
        status: result.created ? 201 : 200,
        headers: { "Stream-Next-Offset": result.nextOffset },
      });
    }

    // ... handle other methods
  }
}

see examples for complete implementations.

license

mit