Package Exports
- nx-mongo
- nx-mongo/dist/simpleMongoHelper.js
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (nx-mongo) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
nx-mongo
Version: 3.5.0
A lightweight, feature-rich MongoDB helper library for Node.js and TypeScript. Provides a simple, intuitive API for common MongoDB operations with built-in retry logic, pagination, transactions, config-driven ref mapping, and signature-based deduplication.
Features
- ✅ Simple API - Easy-to-use methods for common MongoDB operations
- ✅ TypeScript Support - Full TypeScript support with type safety
- ✅ Connection Retry - Automatic retry with exponential backoff
- ✅ Pagination - Built-in pagination support with metadata
- ✅ Transactions - Full transaction support for multi-operation consistency
- ✅ Aggregation - Complete aggregation pipeline support
- ✅ Index Management - Create, drop, and list indexes
- ✅ Count Operations - Accurate and estimated document counting
- ✅ Session Support - Transaction sessions for complex operations
- ✅ Config-driven Ref Mapping - Map application-level refs to MongoDB collections
- ✅ Signature-based Deduplication - Automatic duplicate prevention using document signatures
- ✅ Append/Replace Modes - Flexible write modes for data pipelines
Installation
npm install nx-mongoQuick Start
import { SimpleMongoHelper } from 'nx-mongo';
const helper = new SimpleMongoHelper('mongodb://localhost:27017/my-database');
// Initialize connection
await helper.initialize();
// Insert a document
await helper.insert('users', {
name: 'John Doe',
email: 'john@example.com',
age: 30
});
// Find documents
const users = await helper.loadCollection('users');
// Find one document
const user = await helper.findOne('users', { email: 'john@example.com' });
// Update document
await helper.update(
'users',
{ email: 'john@example.com' },
{ $set: { age: 31 } }
);
// Delete document
await helper.delete('users', { email: 'john@example.com' });
// Disconnect
await helper.disconnect();API Reference
Constructor
new SimpleMongoHelper(connectionString: string, retryOptions?: RetryOptions)Parameters:
connectionString- MongoDB connection stringretryOptions(optional) - Retry configurationmaxRetries?: number- Maximum retry attempts (default: 3)retryDelay?: number- Initial retry delay in ms (default: 1000)exponentialBackoff?: boolean- Use exponential backoff (default: true)
Example:
const helper = new SimpleMongoHelper(
'mongodb://localhost:27017/my-db',
{ maxRetries: 5, retryDelay: 2000 }
);Connection Methods
testConnection(): Promise<{ success: boolean; error?: { type: string; message: string; details?: string } }>
Tests the MongoDB connection and returns detailed error information if it fails. This method does not establish a persistent connection - use initialize() for that.
Returns:
success: boolean- Whether the connection test succeedederror?: object- Error details if connection failedtype- Error type:'missing_credentials' | 'invalid_connection_string' | 'connection_failed' | 'authentication_failed' | 'config_error' | 'unknown'message- Human-readable error messagedetails- Detailed error information and troubleshooting tips
Example:
const result = await helper.testConnection();
if (!result.success) {
console.error('Connection test failed:', result.error?.message);
console.error('Details:', result.error?.details);
// Handle error based on result.error?.type
} else {
console.log('Connection test passed!');
await helper.initialize();
}Error Types:
missing_credentials- Username or password missing in connection stringinvalid_connection_string- Connection string format is invalidconnection_failed- Cannot reach MongoDB server (timeout, DNS, network, etc.)authentication_failed- Invalid credentials or insufficient permissionsconfig_error- Configuration issuesunknown- Unexpected error
initialize(): Promise<void>
Establishes MongoDB connection with automatic retry logic. Must be called before using other methods.
await helper.initialize();disconnect(): Promise<void>
Closes the MongoDB connection and cleans up resources.
await helper.disconnect();Query Methods
loadCollection<T>(collectionName: string, query?: Filter<T>, options?: PaginationOptions): Promise<WithId<T>[] | PaginatedResult<T>>
Loads documents from a collection with optional query filter and pagination.
Parameters:
collectionName- Name of the collectionquery(optional) - MongoDB query filteroptions(optional) - Pagination and sorting optionspage?: number- Page number (1-indexed)limit?: number- Documents per pagesort?: Sort- Sort specification
Returns:
- Without pagination:
WithId<T>[] - With pagination:
PaginatedResult<T>with metadata
Examples:
// Load all documents
const allUsers = await helper.loadCollection('users');
// Load with query
const activeUsers = await helper.loadCollection('users', { active: true });
// Load with pagination
const result = await helper.loadCollection('users', {}, {
page: 1,
limit: 10,
sort: { createdAt: -1 }
});
// result.data - array of documents
// result.total - total count
// result.page - current page
// result.totalPages - total pages
// result.hasNext - has next page
// result.hasPrev - has previous pagefindOne<T>(collectionName: string, query: Filter<T>, options?: { sort?: Sort; projection?: Document }): Promise<WithId<T> | null>
Finds a single document in a collection.
Parameters:
collectionName- Name of the collectionquery- MongoDB query filteroptions(optional) - Find optionssort?: Sort- Sort specificationprojection?: Document- Field projection
Example:
const user = await helper.findOne('users', { email: 'john@example.com' });
const latestUser = await helper.findOne('users', {}, { sort: { createdAt: -1 } });Insert Methods
insert<T>(collectionName: string, data: T | T[], options?: { session?: ClientSession }): Promise<any>
Inserts one or more documents into a collection.
Parameters:
collectionName- Name of the collectiondata- Single document or array of documentsoptions(optional) - Insert optionssession?: ClientSession- Transaction session
Examples:
// Insert single document
await helper.insert('users', {
name: 'John Doe',
email: 'john@example.com'
});
// Insert multiple documents
await helper.insert('users', [
{ name: 'John', email: 'john@example.com' },
{ name: 'Jane', email: 'jane@example.com' }
]);
// Insert within transaction
const session = helper.startSession();
await session.withTransaction(async () => {
await helper.insert('users', { name: 'John' }, { session });
});Update Methods
update<T>(collectionName: string, filter: Filter<T>, updateData: UpdateFilter<T>, options?: { upsert?: boolean; multi?: boolean; session?: ClientSession }): Promise<any>
Updates documents in a collection.
Parameters:
collectionName- Name of the collectionfilter- MongoDB query filterupdateData- Update operationsoptions(optional) - Update optionsupsert?: boolean- Create if not existsmulti?: boolean- Update multiple documents (default: false)session?: ClientSession- Transaction session
Examples:
// Update single document
await helper.update(
'users',
{ email: 'john@example.com' },
{ $set: { age: 31 } }
);
// Update multiple documents
await helper.update(
'users',
{ role: 'user' },
{ $set: { lastLogin: new Date() } },
{ multi: true }
);
// Upsert (create if not exists)
await helper.update(
'users',
{ email: 'john@example.com' },
{ $set: { name: 'John Doe', email: 'john@example.com' } },
{ upsert: true }
);Delete Methods
delete<T>(collectionName: string, filter: Filter<T>, options?: { multi?: boolean }): Promise<any>
Deletes documents from a collection.
Parameters:
collectionName- Name of the collectionfilter- MongoDB query filteroptions(optional) - Delete optionsmulti?: boolean- Delete multiple documents (default: false)
Examples:
// Delete single document
await helper.delete('users', { email: 'john@example.com' });
// Delete multiple documents
await helper.delete('users', { role: 'guest' }, { multi: true });Collection Merge Methods
mergeCollections(options: MergeCollectionsOptions): Promise<MergeCollectionsResult>
Merges two collections into a new target collection using various strategies (index-based, key-based, or composite-key). Useful for combining original records with assessment results or joining related data.
Parameters:
sourceCollection1- Name of first source collection (e.g., original records)sourceCollection2- Name of second source collection (e.g., assessment results)targetCollection- Name of target collection for merged resultsstrategy- Merge strategy:'index' | 'key' | 'composite'key- (For 'key' strategy) Field name to match on (supports dot notation)compositeKeys- (For 'composite' strategy) Array of field names for composite key matchingjoinType- (For 'key' and 'composite' strategies) SQL-style join type:'inner' | 'left' | 'right' | 'outer'(optional, overrides onUnmatched flags)fieldPrefix1- Prefix for fields from collection 1 (default: 'record')fieldPrefix2- Prefix for fields from collection 2 (default: 'assessment')includeIndex- Include original index in merged document (default: true for index strategy)onUnmatched1- (Deprecated: usejoinTypeinstead) What to do with unmatched records from collection 1: 'include' | 'skip' (default: 'include')onUnmatched2- (Deprecated: usejoinTypeinstead) What to do with unmatched records from collection 2: 'include' | 'skip' (default: 'include')session- Optional transaction session
Returns:
interface MergeCollectionsResult {
merged: number; // Total merged documents
unmatched1: number; // Unmatched documents from collection 1
unmatched2: number; // Unmatched documents from collection 2
errors: Array<{ index: number; error: Error; doc?: any }>;
}Strategies:
Index-based (
strategy: 'index'): Merges by array position. Assumes both collections are in the same order.const result = await helper.mergeCollections({ sourceCollection1: 'original_records', sourceCollection2: 'assessments', targetCollection: 'merged_results', strategy: 'index', fieldPrefix1: 'record', fieldPrefix2: 'assessment', includeIndex: true }); // Result: { recordIndex: 0, record: {...}, assessment: {...} }
Key-based (
strategy: 'key'): Merges by matching a single unique field. Supports SQL-style join types.// INNER JOIN - Only matched records const result = await helper.mergeCollections({ sourceCollection1: 'applications', sourceCollection2: 'assessments', targetCollection: 'merged', strategy: 'key', key: 'id', joinType: 'inner' // Only records with matching assessments }); // LEFT JOIN - All records, with assessments where available const result = await helper.mergeCollections({ sourceCollection1: 'applications', sourceCollection2: 'assessments', targetCollection: 'merged', strategy: 'key', key: 'id', joinType: 'left' // All apps, null assessment if no match }); // RIGHT JOIN - All assessments, with records where available const result = await helper.mergeCollections({ sourceCollection1: 'applications', sourceCollection2: 'assessments', targetCollection: 'merged', strategy: 'key', key: 'id', joinType: 'right' // All assessments, null record if no match }); // FULL OUTER JOIN - Everything from both sides const result = await helper.mergeCollections({ sourceCollection1: 'applications', sourceCollection2: 'assessments', targetCollection: 'merged', strategy: 'key', key: 'id', joinType: 'outer' // All apps and all assessments });
Composite-key (
strategy: 'composite'): Merges by matching multiple fields (e.g., name + ports + zones). Also supports join types.const result = await helper.mergeCollections({ sourceCollection1: 'original_records', sourceCollection2: 'assessments', targetCollection: 'merged', strategy: 'composite', compositeKeys: ['name', 'ports[]', 'zones[]'], // Arrays are sorted for matching joinType: 'left', // All records, assessments where match fieldPrefix1: 'record', fieldPrefix2: 'assessment' });
SQL-Style Join Types:
'inner'- INNER JOIN: Returns only records that have matches in both collections'left'- LEFT JOIN: Returns all records from collection 1, with matching records from collection 2 (null if no match)'right'- RIGHT JOIN: Returns all records from collection 2, with matching records from collection 1 (null if no match)'outer'- FULL OUTER JOIN: Returns all records from both collections, matching where possible
Multiple Matches: When a key appears multiple times in collection 2, the merge creates multiple rows (one per match), just like SQL joins. For example, if "app1" has 2 assessments, you'll get 2 merged rows.
Examples:
// Index-based merge (fast but requires same order)
const result1 = await helper.mergeCollections({
sourceCollection1: 'records',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'index'
});
console.log(`Merged ${result1.merged} documents, ${result1.unmatched1} unmatched from collection 1`);
// INNER JOIN - Only complete records (both sides matched)
const result2 = await helper.mergeCollections({
sourceCollection1: 'apps',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'key',
key: 'appId',
joinType: 'inner' // Only apps that have assessments
});
// LEFT JOIN - All apps, assessments where available
const result3 = await helper.mergeCollections({
sourceCollection1: 'apps',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'key',
key: 'appId',
joinType: 'left' // All apps, null assessment if no match
});
// RIGHT JOIN - All assessments, apps where available
const result4 = await helper.mergeCollections({
sourceCollection1: 'apps',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'key',
key: 'appId',
joinType: 'right' // All assessments, null app if no match
});
// FULL OUTER JOIN - Everything from both sides
const result5 = await helper.mergeCollections({
sourceCollection1: 'apps',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'key',
key: 'appId',
joinType: 'outer' // All apps and all assessments
});
// Composite-key merge with LEFT JOIN
const result6 = await helper.mergeCollections({
sourceCollection1: 'original',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'composite',
compositeKeys: ['name', 'ports[]', 'zones[]'],
joinType: 'left',
fieldPrefix1: 'record',
fieldPrefix2: 'assessment',
includeIndex: true
});
// Handling multiple matches (one app, multiple assessments)
// If "app1" has 2 assessments, you'll get 2 merged rows:
// - { record: {id: 1, name: "app1"}, assessment: {appId: 1, risk: "high"} }
// - { record: {id: 1, name: "app1"}, assessment: {appId: 1, risk: "medium"} }Notes:
- Index-based merging is fast but fragile if collections are reordered
- Key-based merging is safer and recommended when you have unique identifiers
- Composite-key merging handles cases where no single unique field exists
- SQL-style join types (
inner,left,right,outer) provide explicit control over unmatched records - Multiple matches create multiple rows (SQL-style) - if a key has duplicates, you get one row per match
- Array fields in composite keys are automatically sorted for consistent matching
- Supports dot notation for nested fields (e.g.,
'meta.id','ports[]') - Transaction support available via
sessionoption - Legacy
onUnmatched1/onUnmatched2flags still work but are deprecated in favor ofjoinType
Count Methods
countDocuments<T>(collectionName: string, query?: Filter<T>): Promise<number>
Counts documents matching a query (accurate count).
Example:
const userCount = await helper.countDocuments('users');
const activeUserCount = await helper.countDocuments('users', { active: true });estimatedDocumentCount(collectionName: string): Promise<number>
Gets estimated document count (faster but less accurate).
Example:
const estimatedCount = await helper.estimatedDocumentCount('users');Aggregation Methods
aggregate<T>(collectionName: string, pipeline: Document[]): Promise<T[]>
Runs an aggregation pipeline on a collection.
Example:
const result = await helper.aggregate('orders', [
{ $match: { status: 'completed' } },
{ $group: {
_id: '$customerId',
total: { $sum: '$amount' },
count: { $sum: 1 }
}},
{ $sort: { total: -1 } }
]);Index Methods
createIndex(collectionName: string, indexSpec: IndexSpecification, options?: CreateIndexesOptions): Promise<string>
Creates an index on a collection.
Example:
// Simple index
await helper.createIndex('users', { email: 1 });
// Unique index
await helper.createIndex('users', { email: 1 }, { unique: true });
// Compound index
await helper.createIndex('users', { email: 1, createdAt: -1 });dropIndex(collectionName: string, indexName: string): Promise<any>
Drops an index from a collection.
Example:
await helper.dropIndex('users', 'email_1');listIndexes(collectionName: string): Promise<Document[]>
Lists all indexes on a collection.
Example:
const indexes = await helper.listIndexes('users');
indexes.forEach(idx => console.log(idx.name));Transaction Methods
startSession(): ClientSession
Starts a new client session for transactions.
Example:
const session = helper.startSession();withTransaction<T>(callback: (session: ClientSession) => Promise<T>): Promise<T>
Executes a function within a transaction.
Example:
await helper.withTransaction(async (session) => {
await helper.insert('users', { name: 'John' }, { session });
await helper.update('accounts', { userId: '123' }, { $inc: { balance: 100 } }, { session });
return 'Transaction completed';
});Note: Transactions require a MongoDB replica set or sharded cluster.
Config-driven Ref Mapping and Signature-based Deduplication
Overview
The helper supports config-driven collection mapping and signature-based deduplication. All logic (queries, keys, hashing, append/replace) is generic and built into the helper - applications only pass refs and documents.
Configuration Schema
interface HelperConfig {
inputs: Array<{
ref: string; // Application-level reference name
collection: string; // MongoDB collection name
query?: Filter<any>; // Optional MongoDB query filter
}>;
outputs: Array<{
ref: string; // Application-level reference name
collection: string; // MongoDB collection name
keys?: string[]; // Optional: dot-paths for signature generation
mode?: "append" | "replace"; // Optional: write mode (default from global)
}>;
output?: {
mode?: "append" | "replace"; // Global default mode (default: "append")
};
progress?: {
collection?: string; // Progress collection name (default: "progress_states")
uniqueIndexKeys?: string[]; // Unique index keys (default: ["provider","key"])
provider?: string; // Default provider namespace for this helper instance
};
}Example Configuration:
const config = {
inputs: [
{ ref: "topology", collection: "topology-definition-neo-data", query: {} },
{ ref: "vulnerabilities", collection: "vulnerabilities-data", query: { severity: { "$in": ["high","critical"] } } },
{ ref: "entities", collection: "entities-data" },
{ ref: "crownJewels", collection: "entities-data", query: { type: "crown_jewel" } }
],
outputs: [
{ ref: "paths", collection: "paths-neo-data", keys: ["segments[]","edges[].from","edges[].to","target_role"], mode: "append" },
{ ref: "prioritizedPaths", collection: "prioritized_paths-neo-data", keys: ["segments[]","outside","contains_crown_jewel"], mode: "replace" },
{ ref: "assetPaths", collection: "asset_paths-neo-data", keys: ["asset_ip","segments[]"], mode: "append" }
],
output: { mode: "append" }
};Constructor with Config
new SimpleMongoHelper(connectionString: string, retryOptions?: RetryOptions, config?: HelperConfig)Example:
const helper = new SimpleMongoHelper(
'mongodb://localhost:27017/my-db',
{ maxRetries: 5 },
config
);Config Methods
useConfig(config: HelperConfig): this
Sets or updates the configuration for ref-based operations.
helper.useConfig(config);Ref-based Operations
loadByRef<T>(ref: string, options?: PaginationOptions & { session?: ClientSession }): Promise<WithId<T>[] | PaginatedResult<T>>
Loads data from a collection using a ref name from the configuration.
Parameters:
ref- Application-level reference name (must exist in config.inputs)options(optional) - Pagination and session options
Example:
// Load using ref (applies query automatically)
const topology = await helper.loadByRef('topology');
const vulns = await helper.loadByRef('vulnerabilities');
// With pagination
const result = await helper.loadByRef('topology', {
page: 1,
limit: 10,
sort: { createdAt: -1 }
});writeByRef(ref: string, documents: any[], options?: { session?: ClientSession; ensureIndex?: boolean }): Promise<WriteByRefResult>
Writes documents to a collection using a ref name from the configuration. Supports signature-based deduplication and append/replace modes.
Parameters:
ref- Application-level reference name (must exist in config.outputs)documents- Array of documents to writeoptions(optional) - Write optionssession?: ClientSession- Transaction sessionensureIndex?: boolean- Whether to ensure signature index exists (default: true)
Returns:
interface WriteByRefResult {
inserted: number;
updated: number;
errors: Array<{ index: number; error: Error; doc?: any }>;
indexCreated: boolean;
}Example:
// Write using ref (automatic deduplication, uses keys from config)
const result = await helper.writeByRef('paths', pathDocuments);
console.log(`Inserted: ${result.inserted}, Updated: ${result.updated}`);
console.log(`Index created: ${result.indexCreated}`);
// Replace mode (clears collection first)
await helper.writeByRef('prioritizedPaths', prioritizedDocs);writeStage(ref: string, documents: any[], options?: WriteStageOptions): Promise<WriteStageResult>
Writes documents to a collection and optionally marks a stage as complete atomically. See the Progress Tracking section for details and examples.
Example:
// Write and mark stage complete in one call
await helper.writeStage('tier1', documents, {
complete: {
key: 'tier1',
process: 'processA',
name: 'System Inventory',
provider: 'nessus',
metadata: { itemCount: documents.length }
}
});Signature Index Management
ensureSignatureIndex(collectionName: string, options?: { fieldName?: string; unique?: boolean }): Promise<EnsureSignatureIndexResult>
Ensures a unique index exists on the signature field for signature-based deduplication.
Parameters:
collectionName- Name of the collectionoptions(optional) - Index configurationfieldName?: string- Field name for signature (default: "_sig")unique?: boolean- Whether index should be unique (default: true)
Returns:
interface EnsureSignatureIndexResult {
created: boolean;
indexName: string;
}Example:
const result = await helper.ensureSignatureIndex('paths-neo-data');
console.log(`Index created: ${result.created}, Name: ${result.indexName}`);Progress Tracking
Overview
The helper provides built-in support for tracking provider-defined pipeline stages. This enables applications to:
- Track completion status of different stages (e.g., "tier1", "tier2", "enrichment")
- Skip already-completed stages on resumption
- Atomically write documents and mark stages complete
- Support multi-provider databases with provider namespaces
Configuration
Progress tracking is configured via the progress option in HelperConfig:
const config = {
// ... inputs and outputs
progress: {
collection: "progress_states", // Optional: default "progress_states"
uniqueIndexKeys: ["process", "provider", "key"], // Optional: default ["process","provider","key"]
provider: "nessus" // Optional: default provider for this instance
}
};Progress API
The progress API is available via helper.progress:
isCompleted(key: string, options?: { process?: string; provider?: string; session?: ClientSession }): Promise<boolean>
Checks if a stage is completed. Stages are scoped by process, so the same key can exist in different processes.
Example:
// Check stage in a specific process
if (await helper.progress.isCompleted('tier1', { process: 'processA', provider: 'nessus' })) {
console.log('Stage "tier1" in processA already completed, skipping...');
}
// Same key, different process
if (await helper.progress.isCompleted('tier1', { process: 'processB', provider: 'nessus' })) {
console.log('Stage "tier1" in processB already completed, skipping...');
}start(identity: StageIdentity, options?: { session?: ClientSession }): Promise<void>
Marks a stage as started. Idempotent - safe to call multiple times. Stages are scoped by process.
Example:
await helper.progress.start({
key: 'tier1',
process: 'processA',
name: 'System Inventory',
provider: 'nessus'
});complete(identity: StageIdentity & { metadata?: StageMetadata }, options?: { session?: ClientSession }): Promise<void>
Marks a stage as completed with optional metadata. Idempotent - safe to call multiple times. Stages are scoped by process.
Example:
await helper.progress.complete({
key: 'tier1',
process: 'processA',
name: 'System Inventory',
provider: 'nessus',
metadata: {
itemCount: 150,
durationMs: 5000
}
});getCompleted(options?: { process?: string; provider?: string; session?: ClientSession }): Promise<Array<{ key: string; name?: string; completedAt?: Date }>>
Gets a list of all completed stages, optionally filtered by process and/or provider.
Example:
// Get all completed stages for a specific process
const completed = await helper.progress.getCompleted({ process: 'processA', provider: 'nessus' });
// → [{ key: 'tier1', name: 'System Inventory', completedAt: Date }, ...]
// Get all completed stages across all processes for a provider
const allCompleted = await helper.progress.getCompleted({ provider: 'nessus' });getProgress(options?: { process?: string; provider?: string; session?: ClientSession }): Promise<StageRecord[]>
Gets all stage records (both completed and in-progress), optionally filtered by process and/or provider.
Example:
// Get all stages for a specific process
const allStages = await helper.progress.getProgress({ process: 'processA', provider: 'nessus' });
// Get all stages for a provider across all processes
const allProviderStages = await helper.progress.getProgress({ provider: 'nessus' });reset(key: string, options?: { process?: string; provider?: string; session?: ClientSession }): Promise<void>
Resets a stage to not-started state (clears completion status). Stages are scoped by process.
Example:
await helper.progress.reset('tier1', { process: 'processA', provider: 'nessus' });Stage-Aware Writes
writeStage(ref: string, documents: any[], options?: WriteStageOptions): Promise<WriteStageResult>
Writes documents to a collection and optionally marks a stage as complete in a single call. If a session is provided, both operations are atomic within the transaction.
Parameters:
ref- Application-level reference name (must exist in config.outputs)documents- Array of documents to writeoptions(optional) - Write and completion optionsensureIndex?: boolean- Whether to ensure signature index exists (default: true)session?: ClientSession- Transaction session (makes write and complete atomic)complete?: { key: string; name?: string; provider?: string; metadata?: StageMetadata }- Stage completion info
Returns:
interface WriteStageResult extends WriteByRefResult {
completed?: boolean; // true if stage was marked complete
}Examples:
// Skip completed stages, then save-and-complete in one call
const processName = 'processA';
if (!force && (await helper.progress.isCompleted('tier1', { process: processName, provider: 'nessus' }))) {
console.log('Skipping stage "tier1" in processA');
} else {
const docs = [
{ type: 'server_status', ...status },
...scanners.map(s => ({ type: 'scanner', ...s }))
];
await helper.writeStage('tier1', docs, {
complete: {
key: 'tier1',
process: processName,
name: 'System Inventory',
provider: 'nessus',
metadata: { itemCount: docs.length }
}
});
}
// Transactional multi-write with explicit completion
const session = helper.startSession();
try {
await session.withTransaction(async () => {
await helper.writeByRef('tier2_scans', scans, { session });
await helper.writeByRef('tier2_hosts', hosts, { session });
await helper.progress.complete({
key: 'tier2',
process: 'processA',
name: 'Scan Inventory',
provider: 'nessus',
metadata: { itemCount: hosts.length }
}, { session });
});
} finally {
await session.endSession();
}Usage Patterns
Resumption Pattern
const processName = 'processA';
const stages = ['tier1', 'tier2', 'tier3'];
for (const stageKey of stages) {
if (await helper.progress.isCompleted(stageKey, { process: processName, provider: 'nessus' })) {
console.log(`Skipping completed stage: ${stageKey} in ${processName}`);
continue;
}
await helper.progress.start({ key: stageKey, process: processName, provider: 'nessus' });
try {
const docs = await processStage(stageKey);
await helper.writeStage(`ref_${stageKey}`, docs, {
complete: { key: stageKey, process: processName, provider: 'nessus' }
});
} catch (error) {
console.error(`Stage ${stageKey} in ${processName} failed:`, error);
// Stage remains incomplete, can be retried
}
}
// Different process can have same stage keys independently
const processB = 'processB';
if (!await helper.progress.isCompleted('tier1', { process: processB, provider: 'nessus' })) {
// Process B's tier1 is independent from Process A's tier1
await helper.progress.start({ key: 'tier1', process: processB, provider: 'nessus' });
}Utility Functions
getByDotPath(value: any, path: string): any[]
Extracts values from an object using dot-notation paths with array wildcard support.
Parameters:
value- The object to extract values frompath- Dot-notation path (e.g., "meta.id", "edges[].from", "segments[]")
Returns: Array of extracted values (flattened and deduplicated for arrays)
Examples:
import { getByDotPath } from 'nx-mongo';
// Simple path
getByDotPath({ meta: { id: "123" } }, "meta.id"); // ["123"]
// Array wildcard
getByDotPath({ segments: [1, 2, 3] }, "segments[]"); // [1, 2, 3]
// Nested array access
getByDotPath({ edges: [{ from: "A" }, { from: "B" }] }, "edges[].from"); // ["A", "B"]computeSignature(doc: any, keys: string[], options?: { algorithm?: "sha256" | "sha1" | "md5" }): string
Computes a deterministic signature for a document based on specified keys.
Parameters:
doc- The document to compute signature forkeys- Array of dot-notation paths to extract values fromoptions(optional) - Configurationalgorithm?: "sha256" | "sha1" | "md5"- Hash algorithm (default: "sha256")
Returns: Hex string signature
Example:
import { computeSignature } from 'nx-mongo';
const sig = computeSignature(
{ segments: [1, 2], role: "admin" },
["segments[]", "role"]
);Signature Algorithm
The signature generation follows these steps:
- Extract values for each key using
getByDotPath - Normalize values:
- Strings: As-is
- Numbers:
String(value) - Booleans:
"true"or"false" - Dates:
value.toISOString()(UTC) - Null/Undefined:
"null" - Objects:
JSON.stringify(value, Object.keys(value).sort())(sorted keys) - Arrays: Flatten recursively, normalize each element, deduplicate, sort lexicographically
- Create canonical map:
{ key1: [normalized values], key2: [normalized values], ... } - Sort keys alphabetically
- Stringify:
JSON.stringify(canonicalMap) - Hash: SHA-256 (or configurable algorithm)
- Return: Hex string
Usage Examples
Basic Usage with Config
import { SimpleMongoHelper } from 'nx-mongo';
const config = {
inputs: [
{ ref: "topology", collection: "topology-definition", query: {} },
{ ref: "vulnerabilities", collection: "vulnerabilities", query: { severity: "high" } }
],
outputs: [
{ ref: "paths", collection: "paths", keys: ["segments[]", "target_role"], mode: "append" },
{ ref: "prioritizedPaths", collection: "prioritized_paths", keys: ["segments[]"], mode: "replace" }
],
output: { mode: "append" }
};
const helper = new SimpleMongoHelper('mongodb://localhost:27017/mydb', undefined, config);
await helper.initialize();
// Load using ref (applies query automatically)
const topology = await helper.loadByRef('topology');
const vulns = await helper.loadByRef('vulnerabilities');
// Write using ref (automatic deduplication, uses keys from config)
const result = await helper.writeByRef('paths', pathDocuments);
console.log(`Inserted: ${result.inserted}, Updated: ${result.updated}`);
// Replace mode (clears collection first)
await helper.writeByRef('prioritizedPaths', prioritizedDocs);With Transactions
const session = helper.startSession();
try {
await session.withTransaction(async () => {
await helper.writeByRef('paths', docs, { session });
await helper.writeByRef('prioritizedPaths', prioDocs, { session });
});
} finally {
await session.endSession();
}Standalone Utilities
import { getByDotPath, computeSignature } from 'nx-mongo';
// Extract values
const values = getByDotPath(doc, "edges[].from"); // ["A", "B", "C"]
// Compute signature
const sig = computeSignature(doc, ["segments[]", "target_role"]);TypeScript Interfaces
PaginationOptions
interface PaginationOptions {
page?: number;
limit?: number;
sort?: Sort;
}PaginatedResult
interface PaginatedResult<T> {
data: WithId<T>[];
total: number;
page: number;
limit: number;
totalPages: number;
hasNext: boolean;
hasPrev: boolean;
}RetryOptions
interface RetryOptions {
maxRetries?: number;
retryDelay?: number;
exponentialBackoff?: boolean;
}HelperConfig
interface HelperConfig {
inputs: InputConfig[];
outputs: OutputConfig[];
output?: {
mode?: 'append' | 'replace';
};
progress?: {
collection?: string;
uniqueIndexKeys?: string[];
provider?: string;
};
}
interface InputConfig {
ref: string;
collection: string;
query?: Filter<any>;
}
interface OutputConfig {
ref: string;
collection: string;
keys?: string[];
mode?: 'append' | 'replace';
}WriteByRefResult
interface WriteByRefResult {
inserted: number;
updated: number;
errors: Array<{ index: number; error: Error; doc?: any }>;
indexCreated: boolean;
}EnsureSignatureIndexResult
interface EnsureSignatureIndexResult {
created: boolean;
indexName: string;
}Progress Tracking Interfaces
interface StageIdentity {
key: string;
process?: string;
provider?: string;
name?: string;
}
interface StageMetadata {
itemCount?: number;
errorCount?: number;
durationMs?: number;
[key: string]: any;
}
interface StageRecord extends StageIdentity {
completed: boolean;
startedAt?: Date;
completedAt?: Date;
metadata?: StageMetadata;
}
interface WriteStageOptions {
ensureIndex?: boolean;
session?: ClientSession;
complete?: {
key: string;
process?: string;
name?: string;
provider?: string;
metadata?: StageMetadata;
};
}
interface WriteStageResult extends WriteByRefResult {
completed?: boolean;
}Merge Collections Interfaces
interface MergeCollectionsOptions {
sourceCollection1: string;
sourceCollection2: string;
targetCollection: string;
strategy: 'index' | 'key' | 'composite';
key?: string;
compositeKeys?: string[];
joinType?: 'inner' | 'left' | 'right' | 'outer'; // SQL-style join type
fieldPrefix1?: string;
fieldPrefix2?: string;
includeIndex?: boolean;
onUnmatched1?: 'include' | 'skip'; // Deprecated: use joinType instead
onUnmatched2?: 'include' | 'skip'; // Deprecated: use joinType instead
session?: ClientSession;
}
interface MergeCollectionsResult {
merged: number;
unmatched1: number;
unmatched2: number;
errors: Array<{ index: number; error: Error; doc?: any }>;
}Error Handling
All methods throw errors with descriptive messages. Always wrap operations in try-catch blocks:
try {
await helper.initialize();
const users = await helper.loadCollection('users');
} catch (error) {
console.error('Operation failed:', error.message);
}Best Practices
Always initialize before use:
await helper.initialize();
Use transactions for multi-operation consistency:
await helper.withTransaction(async (session) => { // Multiple operations });
Use pagination for large datasets:
const result = await helper.loadCollection('users', {}, { page: 1, limit: 50 });
Create indexes for frequently queried fields:
await helper.createIndex('users', { email: 1 }, { unique: true });
Always disconnect when done:
await helper.disconnect();
License
ISC
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Changelog
3.5.0
- Enhanced
mergeCollections()with SQL-style join types (inner,left,right,outer) - Multiple match handling: Now creates multiple rows when keys have duplicates (SQL-style behavior)
- Improved key-based and composite-key merging to handle one-to-many and many-to-many relationships
- Added explicit join type control for better clarity and SQL compatibility
- Legacy
onUnmatched1/onUnmatched2flags deprecated in favor ofjoinTypeparameter
3.4.0
- Added
mergeCollections()method for merging two collections into a new target collection - Supports three merge strategies: index-based, key-based, and composite-key merging
- Index-based merging for same-order collections
- Key-based merging using unique identifiers (supports dot notation)
- Composite-key merging using multiple fields (e.g., name + ports + zones)
- Configurable field prefixes and unmatched record handling
- Transaction support for atomic merge operations
3.3.0
- Added
testConnection()method for detailed connection testing and error diagnostics - Package renamed from
nx-mongodb-helpertonx-mongo(shorter, cleaner name) - Connection test provides detailed error messages for missing credentials, invalid connection strings, authentication failures, and network issues
3.2.0
- Added process-scoped stages support - stages can now be scoped by process identifier
- Updated default unique index to include
processfield:['process', 'provider', 'key'] - All ProgressAPI methods now accept
processparameter for process-scoped stage tracking - Updated
writeStage()to support process-scoped completion - Stages with the same key can exist independently in different processes
3.1.0
- Added built-in progress tracking API (
helper.progress) for provider-defined pipeline stages - Added
writeStage()method that combines document writing with stage completion - Added progress tracking configuration to
HelperConfig(collection, uniqueIndexKeys, provider) - Progress API supports idempotent operations, transactions, and provider namespaces
- All progress operations support optional transaction sessions for atomicity
3.0.0
- Added config-driven ref mapping (HelperConfig, InputConfig, OutputConfig)
- Added signature-based deduplication with automatic index management
- Added
loadByRef()method for loading data by ref name - Added
writeByRef()method with signature computation, bulk upsert, and append/replace modes - Added
ensureSignatureIndex()method for signature index management - Added
useConfig()method for runtime config updates - Added
getByDotPath()utility function for dot-notation path extraction with array wildcards - Added
computeSignature()utility function for deterministic document signatures - Enhanced constructor to accept optional config parameter
- All new methods support transaction sessions
2.0.1
- Package renamed from
nx-mongodb-helpertonx-mongo - Add version number to README header
2.0.0
- Added delete operations
- Added findOne operation
- Added count operations (countDocuments, estimatedDocumentCount)
- Added pagination support
- Added aggregation pipeline support
- Added transaction support
- Added connection retry logic with exponential backoff
- Added index management (createIndex, dropIndex, listIndexes)
- Enhanced insert and update methods with session support
1.0.0
- Initial release with basic CRUD operations