Package Exports
- @classytic/flow
- @classytic/flow/counting
- @classytic/flow/domain
- @classytic/flow/domain/contracts
- @classytic/flow/domain/enums
- @classytic/flow/domain/policies
- @classytic/flow/events
- @classytic/flow/matching
- @classytic/flow/models
- @classytic/flow/packaging
- @classytic/flow/procurement
- @classytic/flow/reporting
- @classytic/flow/repositories
- @classytic/flow/reservations
- @classytic/flow/routing
- @classytic/flow/scanning
- @classytic/flow/scope
- @classytic/flow/services
- @classytic/flow/traceability
- @classytic/flow/types
- @classytic/flow/valuation
Readme
@classytic/flow
Mongokit-native WMS + inventory primitives for Node.js.
A pure library — no Fastify, no HTTP, no framework coupling. Drop it into Express, NestJS, Arc, a CLI, a Temporal activity, or a Lambda and get a production-grade warehouse kernel powered by MongoDB.
- Single-standard event system. Typed
FlowDomainEventcatalog with a pluggable transport (Arc outbox / Redis streams / Kafka / Temporal signals / in-process fan-out). Every emit also persists to an append-onlyStockEventledger for replay + outbox relay. - Atomic ordered-lock reservation. Concurrent workers never overcommit.
Saleor's deadlock-free pattern implemented against MongoDB via
findOneAndUpdate+$exprguards. - Two-tier move model.
StockMoveholds the plan,StockMoveLineholds the realised slices (lot, package, cost). Odoo'sstock.move/stock.move.linesplit — the foundation for mixed-lot picks, partial availability, backorders, and serial traceability. - Declarative routing.
StockRule+StockRouteexpress push/pull chains as data. TheRoutingServiceexpands them into route plans the caller can execute. - Ownership-aware quants.
owned,consignment,dropship,pending_auth,in_transit,customer_held. The same lot in the same bin can coexist in multiple commercial states — enough to model Toyota VMI, StockX authentication, dropship phantom stock, and transit handoffs. - Removal strategies as first-class location fields. Odoo's
stock.location.removal_strategypattern — setfifo,lifo,fefo,closest,highest_cost,lowest_costper bin. - Denormalized for read speed. Hot-path listings never need
.populate(). When you DO want populate, everyObjectIdfield carriesref:so it resolves cleanly. - Pluggable demand-forecast port. Wire your own Croston / ETS / S&OP
output into
ReplenishmentServiceand triggered rules size suggested POs to forecast-over-lead-time + safety stock.
Install
npm install @classytic/flow \
mongoose \
zod \
@classytic/mongokit \
@classytic/primitives \
@classytic/repo-corePeer dependencies:
mongoose>= 9.4.1zod>= 4.0.0@classytic/mongokit>= 3.11.0@classytic/primitives>= 0.1.0@classytic/repo-core>= 0.2.0
Quick start
import { createFlowEngine, ensureFlowReady } from '@classytic/flow';
import mongoose from 'mongoose';
const conn = await mongoose
.createConnection(process.env.MONGO_URL, { dbName: 'myapp' })
.asPromise();
const flow = createFlowEngine({
mongoose: conn,
mode: 'standard', // 'simple' | 'standard' | 'enterprise'
catalog: {
async resolveSku(skuRef) {
const sku = await mySkuLookup(skuRef);
return {
skuRef,
sku: sku.code,
displayName: sku.name,
trackingMode: sku.trackingMode ?? 'none',
uom: sku.uom ?? 'unit',
isActive: true,
};
},
},
});
// Required once at startup — materialises collections + drains index
// builds so the first transactional call never fails.
await ensureFlowReady(flow);
// Reserve 10 units — ordered-lock atomic, race-free under concurrency
const ctx = { organizationId: 'org_1', actorId: 'user_1' };
const result = await flow.services.moveLine.reserveLinesForMove(
{
moveId: 'mv_1',
moveGroupId: 'grp_1',
skuRef: 'WIDGET-A',
sourceLocationId: 'STOCK',
destinationLocationId: 'OUTPUT',
quantity: 10,
strategy: 'fefo',
},
ctx,
);
console.log(result.lines); // [{ lotId, quantity, lotExpiresAt, … }]Integration examples
Express
import express from 'express';
const app = express();
app.post('/reserve', async (req, res) => {
const result = await flow.services.moveLine.reserveLinesForMove(req.body, {
organizationId: req.user.orgId,
actorId: req.user.id,
});
res.json(result);
});Arc (@classytic/arc)
import { createFlowEngine, ensureFlowReady } from '@classytic/flow';
import { MemoryEventTransport } from '@classytic/arc/events';
// Arc's EventTransport is structurally identical to the `EventTransport`
// from `@classytic/primitives/events` that flow accepts.
// Pass it directly — no adapter, no bridge, one shared bus.
const transport = new MemoryEventTransport();
const flow = createFlowEngine({
mongoose: conn,
mode: 'standard',
catalog,
eventTransport: transport, // flow events land on Arc's bus natively
});
await ensureFlowReady(flow);
// Arc subscribers receive flow events directly:
// subscribe('flow.reservation.created', handler)
// subscribe('flow.package.sealed', handler)NestJS
@Module({
providers: [
{
provide: 'FLOW',
useFactory: () => createFlowEngine({ mongoose: conn, mode: 'standard', catalog }),
},
],
})
export class InventoryModule {}
@Injectable()
export class InventoryService {
constructor(@Inject('FLOW') private flow: FlowEngine) {}
reserve(dto, ctx) {
return this.flow.services.moveLine.reserveLinesForMove(dto, ctx);
}
}CLI / worker / Temporal activity
// Outbox relay cron — call every few seconds
const count = await flow.services.eventLog.relayUndelivered(ctx, 500);
console.log(`Published ${count} events`);Plug a demand forecast (optional)
import type { ForecastPort } from '@classytic/flow/procurement';
const forecastPort: ForecastPort = {
async getForecast({ skuRef, locationId, horizonDays, now }, ctx) {
const series = await myPlanner.demand({ skuRef, locationId, horizonDays, ctx });
return { points: series, method: 'croston' };
},
};
const flow = createFlowEngine({ mongoose: conn, mode: 'standard', catalog, forecastPort });
// ReplenishmentService now sizes triggered rules to
// max(min/max baseline, expectedDemand + safetyStock − projected).Subscribe to events
// subscribe() returns Promise<() => void> — same as Arc's EventTransport
const unsub = await flow.events.subscribe('flow.package.sealed', async (event) => {
await transit.createParcel({
packageId: event.payload.packageId,
destination: event.payload.destinationAddress,
});
});
// Later
unsub();Glob patterns: flow.reservation.*, flow.move_line.*, *.
Event transport compatibility
Flow accepts any EventTransport from @classytic/primitives/events.
The interface matches arc's EventTransport structurally. Any transport
that works with arc works with flow:
// Arc's MemoryEventTransport (in-process)
import { MemoryEventTransport } from '@classytic/arc/events';
createFlowEngine({ ..., eventTransport: new MemoryEventTransport() });
// Redis (when you scale out)
import { RedisEventTransport } from '@classytic/arc/events';
createFlowEngine({ ..., eventTransport: new RedisEventTransport(ioredis) });
// Custom Kafka / Temporal / NATS — implement publish + subscribe
createFlowEngine({ ..., eventTransport: myKafkaTransport });Flow events also persist to the StockEvent ledger regardless of
transport, so eventLog.relayUndelivered() works as an outbox for any
transport that was down at emit time.
Mode matrix
| Feature | simple | standard | enterprise |
|---|---|---|---|
| Basic moves, quants, reservations, allocation, posting | ✓ | ✓ | ✓ |
| Procurement, replenishment, cycle count, cost layers (FIFO/FEFO) | — | ✓ | ✓ |
| Traceability, recall, package content, routing rules | — | ✓ | ✓ |
| Quality inspection, task dispatch, RFID, dock + manifest, offline sync | — | — | ✓ |
Documentation
- docs/architecture.md — Ports/adapters, event flow, UoW, repository pattern.
- docs/recipes.md — Copy-paste snippets for the 20 most common operations.
- CHANGELOG.md — Release notes.
License
MIT