Package Exports
This package does not declare an exports field, so the exports above have been automatically detected and optimized by JSPM instead. If any package subpath is missing, it is recommended to post an issue to the original package (node-red-contrib-questdb) to support the "exports" field. If that is not possible, create a JSPM override to customize the exports field for this package.
Readme
node-red-contrib-questdb
Node-RED nodes for writing data to QuestDB time-series database using the Influx Line Protocol (ILP).
Features
- High-performance writes using QuestDB's native ILP protocol
- Connection pooling with automatic reconnection
- Multiple protocols: HTTP, HTTPS, TCP, TCPS
- Authentication: Basic auth and Bearer token support
- TLS/SSL: Full TLS support with certificate verification options
- Flexible data mapping: Map message fields to QuestDB columns
- Type support: Symbols, floats, integers, longs, booleans, strings, timestamps, arrays, and decimals
- Auto-flush: Configurable automatic flushing by row count or time interval
- Examples included: Ready-to-use flow examples
Installation
Via Node-RED Palette Manager
- Open Node-RED
- Go to Menu > Manage palette > Install
- Search for
node-red-contrib-questdb - Click Install
Via npm
cd ~/.node-red
npm install node-red-contrib-questdbThen restart Node-RED.
Nodes
QuestDB Write
Writes data to QuestDB using the ILP protocol.
Configuration
Connection Settings:
- Protocol: HTTP (default), HTTPS, TCP, or TCPS
- Host: QuestDB server hostname or IP
- Port: 9000 (HTTP) or 9009 (TCP)
Security Settings:
- Enable Auth: Toggle authentication
- Auth Type: Username/Password or Bearer Token
- TLS Verify: Verify server certificate (for HTTPS/TCPS)
Advanced Settings:
- Auto Flush: Enable automatic flushing
- Flush Rows: Number of rows before auto-flush (default: 75000)
- Flush Interval: Time interval for auto-flush in ms (default: 1000)
- Request Timeout: HTTP request timeout in ms
- Buffer Size: Initial and maximum buffer sizes
Input Message Format
msg.topic = "table_name";
msg.payload = {
symbols: {
tag_name: "sensor1", // Indexed string columns
location: "warehouse"
},
columns: {
temperature: 23.5, // Auto-detected as float
humidity: 65, // Auto-detected as float
status: "active", // String column
alert: true // Boolean column
},
timestamp: Date.now() // Optional: milliseconds or Date object
};Explicit Type Specification
For precise control over column types:
msg.payload = {
symbols: { device: "sensor1" },
columns: {
value: { value: 123456789, type: "long" },
price: { value: "123.456789", type: "decimal" },
readings: { value: [1.1, 2.2, 3.3], type: "array", elementType: "double" }
},
timestamp: Date.now()
};Supported Types:
int/integer- 32-bit signed integerlong- 64-bit signed integerfloat- 32-bit floating pointdouble- 64-bit floating pointdecimal- Arbitrary precision decimalstring- Text valueboolean- true/falsetimestamp- Date/time valuearray- Array with auto-detected element typearray_double- Array of doublesarray_long- Array of longsarray_string- Array of strings
QuestDB Mapper
Maps incoming message fields to QuestDB ILP structure. Useful for transforming data from various sources.
Configuration
- Table Name: Target table (or use
msg.topic) - Timestamp Field: Path to timestamp field in message
- Symbol Mappings: Map source fields to QuestDB symbols
- Column Mappings: Map source fields to columns with type conversion
Example
Input message:
{
topic: "sensors",
payload: {
device: "sensor1",
temp: 23.5,
readings: [1.1, 2.2, 3.3],
ts: 1699999999000
}
}With mappings:
- Symbol:
payload.device→device_id - Column:
payload.temp→temperature(double) - Column:
payload.readings→values(array_double) - Timestamp:
payload.ts
Output:
{
topic: "sensors",
payload: {
symbols: { device_id: "sensor1" },
columns: {
temperature: { value: 23.5, type: "double" },
values: { value: [1.1, 2.2, 3.3], type: "array", elementType: "double" }
},
timestamp: 1699999999000
}
}Examples
The package includes ready-to-use examples. After installation:
- Open Node-RED
- Go to Menu > Import
- Select Examples > node-red-contrib-questdb
Comprehensive Use Case Examples
1. Industrial IoT - Manufacturing Line Monitoring
Monitor production line equipment with high-frequency sensor data:
// Function node: Production Line Sensor Data
msg.topic = "production_line";
msg.payload = {
symbols: {
factory_id: "PLANT_001",
line_id: "LINE_A3",
machine_id: "CNC_MILL_007",
shift: "day"
},
columns: {
// Machine status
spindle_speed_rpm: { value: 12500, type: "int" },
feed_rate_mmpm: 450.5,
tool_wear_percent: 23.7,
// Vibration analysis (100 samples at 10kHz)
vibration_x: {
value: Array.from({length: 100}, () => (Math.random() - 0.5) * 2),
type: "array",
elementType: "double"
},
vibration_y: {
value: Array.from({length: 100}, () => (Math.random() - 0.5) * 2),
type: "array",
elementType: "double"
},
vibration_z: {
value: Array.from({length: 100}, () => (Math.random() - 0.5) * 2),
type: "array",
elementType: "double"
},
// Thermal monitoring
motor_temp_c: 67.3,
coolant_temp_c: 22.1,
ambient_temp_c: 24.5,
// Power consumption
power_kw: 15.7,
current_amps: { value: 42.3, type: "double" },
// Production metrics
parts_produced: { value: 1247, type: "long" },
cycle_time_ms: { value: 45230, type: "long" },
oee_percent: 87.3,
// Quality metrics
defect_count: { value: 3, type: "int" },
in_tolerance: true
},
timestamp: Date.now()
};
return msg;Querying in QuestDB:
-- Real-time machine status
SELECT * FROM production_line
WHERE machine_id = 'CNC_MILL_007'
ORDER BY timestamp DESC LIMIT 1;
-- Hourly OEE trend
SELECT
timestamp,
avg(oee_percent) as avg_oee,
sum(parts_produced) as total_parts,
sum(defect_count) as total_defects
FROM production_line
WHERE factory_id = 'PLANT_001'
SAMPLE BY 1h;
-- Vibration anomaly detection
SELECT timestamp, machine_id,
array_avg(vibration_x) as avg_vib_x,
array_max(vibration_x) as max_vib_x
FROM production_line
WHERE array_max(vibration_x) > 1.5
ORDER BY timestamp DESC;2. Smart Building - HVAC and Energy Management
Monitor building climate control and energy consumption:
// Function node: Building Management System Data
msg.topic = "building_hvac";
msg.payload = {
symbols: {
building_id: "HQ_TOWER_1",
floor: "F15",
zone: "ZONE_A",
hvac_unit: "AHU_15A"
},
columns: {
// Temperature readings
supply_air_temp_c: 14.2,
return_air_temp_c: 23.8,
zone_temp_c: 22.1,
setpoint_temp_c: 22.0,
outdoor_temp_c: 31.5,
// Humidity
zone_humidity_percent: 45.3,
setpoint_humidity_percent: 45.0,
// Airflow
supply_airflow_cfm: { value: 2500, type: "int" },
damper_position_percent: 75.0,
filter_pressure_drop_pa: 125.3,
// Energy
cooling_load_kw: 45.7,
heating_load_kw: 0.0,
fan_power_kw: 3.2,
// Equipment status
compressor_running: true,
economizer_active: false,
maintenance_due: false,
// Occupancy (from sensors)
occupancy_count: { value: 47, type: "int" },
co2_ppm: { value: 650, type: "int" },
// Comfort index (calculated)
pmv_index: 0.2, // Predicted Mean Vote (-3 to +3)
ppd_percent: 6.1 // Predicted Percentage Dissatisfied
},
timestamp: Date.now()
};
return msg;Energy Dashboard Queries:
-- Daily energy consumption by floor
SELECT
floor,
sum(cooling_load_kw + heating_load_kw + fan_power_kw) as total_energy_kwh
FROM building_hvac
WHERE building_id = 'HQ_TOWER_1'
AND timestamp > dateadd('d', -1, now())
SAMPLE BY 1d;
-- Comfort tracking
SELECT
timestamp,
zone,
zone_temp_c,
zone_humidity_percent,
pmv_index,
occupancy_count
FROM building_hvac
WHERE building_id = 'HQ_TOWER_1'
SAMPLE BY 15m;3. Financial Trading - Market Data Capture
Capture real-time market data and trading signals:
// Function node: Market Data Feed
const ticker = msg.payload.ticker || "AAPL";
const bid = 185.23 + (Math.random() - 0.5) * 0.1;
const ask = bid + 0.01;
msg.topic = "market_data";
msg.payload = {
symbols: {
exchange: "NASDAQ",
ticker: ticker,
asset_class: "equity"
},
columns: {
// Price data (use decimal for financial precision)
bid_price: { value: bid.toFixed(4), type: "decimal" },
ask_price: { value: ask.toFixed(4), type: "decimal" },
mid_price: { value: ((bid + ask) / 2).toFixed(4), type: "decimal" },
spread_bps: { value: ((ask - bid) / bid * 10000).toFixed(2), type: "decimal" },
// Volume
bid_size: { value: Math.floor(Math.random() * 1000) * 100, type: "long" },
ask_size: { value: Math.floor(Math.random() * 1000) * 100, type: "long" },
// Trade data
last_price: { value: (bid + Math.random() * (ask - bid)).toFixed(4), type: "decimal" },
last_size: { value: Math.floor(Math.random() * 500) * 100, type: "long" },
volume_today: { value: Math.floor(Math.random() * 10000000), type: "long" },
vwap: { value: (185.15 + Math.random() * 0.2).toFixed(4), type: "decimal" },
// Market microstructure
trade_count: { value: Math.floor(Math.random() * 100), type: "int" },
quote_count: { value: Math.floor(Math.random() * 500), type: "int" },
// Greeks (for options)
implied_vol: 0.235,
// Flags
halted: false,
auction_mode: false
},
timestamp: Date.now()
};
return msg;Trading Analytics:
-- VWAP calculation
SELECT
ticker,
sum(last_price * last_size) / sum(last_size) as vwap,
sum(volume_today) as total_volume
FROM market_data
WHERE timestamp > dateadd('h', -1, now())
GROUP BY ticker;
-- Spread analysis
SELECT
timestamp,
ticker,
spread_bps,
bid_size,
ask_size
FROM market_data
WHERE ticker = 'AAPL'
SAMPLE BY 1s;4. Weather Station Network
Collect meteorological data from distributed weather stations:
// Function node: Weather Station Data
msg.topic = "weather_stations";
msg.payload = {
symbols: {
station_id: "WS_" + String(Math.floor(Math.random() * 100)).padStart(3, "0"),
region: "pacific_northwest",
elevation_class: "lowland",
station_type: "automated"
},
columns: {
// Temperature
air_temp_c: 18.3 + (Math.random() - 0.5) * 5,
feels_like_c: 16.8 + (Math.random() - 0.5) * 5,
dew_point_c: 12.1 + (Math.random() - 0.5) * 3,
ground_temp_c: 15.2 + (Math.random() - 0.5) * 2,
// Humidity & Pressure
relative_humidity_percent: 65 + Math.random() * 20,
pressure_hpa: { value: (1013.25 + (Math.random() - 0.5) * 20).toFixed(2), type: "decimal" },
pressure_trend: { value: (Math.random() - 0.5) * 2, type: "double" },
// Wind
wind_speed_ms: Math.random() * 10,
wind_gust_ms: Math.random() * 15,
wind_direction_deg: { value: Math.floor(Math.random() * 360), type: "int" },
// Precipitation
rain_mm: Math.random() * 2,
rain_rate_mmh: Math.random() * 10,
rain_24h_mm: { value: (Math.random() * 20).toFixed(1), type: "decimal" },
// Solar
solar_radiation_wm2: Math.random() * 800,
uv_index: { value: Math.floor(Math.random() * 11), type: "int" },
// Visibility
visibility_km: 10 + Math.random() * 40,
cloud_base_m: { value: Math.floor(1000 + Math.random() * 2000), type: "int" },
// Air Quality
pm25_ugm3: Math.random() * 50,
pm10_ugm3: Math.random() * 100,
aqi: { value: Math.floor(Math.random() * 150), type: "int" },
// Battery & Status
battery_voltage: 12.4 + Math.random() * 0.4,
solar_charging: true,
sensor_status: "OK"
},
timestamp: Date.now()
};
return msg;Weather Analysis Queries:
-- Regional temperature map
SELECT DISTINCT ON (station_id)
station_id,
air_temp_c,
relative_humidity_percent,
wind_speed_ms
FROM weather_stations
WHERE region = 'pacific_northwest'
ORDER BY station_id, timestamp DESC;
-- Precipitation totals by region
SELECT
region,
sum(rain_mm) as total_rain_mm,
max(rain_rate_mmh) as max_rain_rate
FROM weather_stations
WHERE timestamp > dateadd('h', -24, now())
SAMPLE BY 1h;5. Fleet Tracking and Telematics
Monitor vehicle fleet with GPS and diagnostics:
// Function node: Vehicle Telematics
const vehicleId = "VH_" + String(Math.floor(Math.random() * 50) + 1).padStart(3, "0");
const baseLat = 37.7749 + (Math.random() - 0.5) * 0.1;
const baseLon = -122.4194 + (Math.random() - 0.5) * 0.1;
msg.topic = "fleet_telematics";
msg.payload = {
symbols: {
vehicle_id: vehicleId,
fleet_id: "DELIVERY_WEST",
vehicle_type: "van",
driver_id: "DRV_" + String(Math.floor(Math.random() * 20) + 1).padStart(3, "0")
},
columns: {
// GPS Position (high precision)
latitude: { value: baseLat.toFixed(7), type: "decimal" },
longitude: { value: baseLon.toFixed(7), type: "decimal" },
altitude_m: { value: Math.floor(Math.random() * 100), type: "int" },
gps_accuracy_m: 3.5,
heading_deg: { value: Math.floor(Math.random() * 360), type: "int" },
// Speed & Motion
speed_kmh: Math.floor(Math.random() * 80),
acceleration_ms2: (Math.random() - 0.5) * 3,
odometer_km: { value: 45678 + Math.floor(Math.random() * 100), type: "long" },
// Engine Diagnostics (OBD-II)
engine_rpm: { value: Math.floor(1000 + Math.random() * 3000), type: "int" },
engine_load_percent: Math.random() * 100,
coolant_temp_c: 85 + Math.random() * 15,
intake_temp_c: 25 + Math.random() * 20,
throttle_position_percent: Math.random() * 100,
// Fuel
fuel_level_percent: 45 + Math.random() * 30,
fuel_rate_lph: 8 + Math.random() * 12,
fuel_economy_kpl: { value: (12 + Math.random() * 4).toFixed(2), type: "decimal" },
// Battery & Electrical
battery_voltage: 13.8 + Math.random() * 0.5,
alternator_voltage: 14.2 + Math.random() * 0.3,
// DTC Codes (Diagnostic Trouble Codes)
dtc_count: { value: Math.floor(Math.random() * 3), type: "int" },
check_engine_light: Math.random() > 0.95,
// Trip Data
trip_distance_km: { value: (Math.random() * 50).toFixed(2), type: "decimal" },
idle_time_sec: { value: Math.floor(Math.random() * 300), type: "int" },
// Safety Events
harsh_braking: Math.random() > 0.9,
harsh_acceleration: Math.random() > 0.92,
harsh_cornering: Math.random() > 0.95,
// Cargo (for delivery vehicles)
cargo_door_open: false,
cargo_temp_c: 4.2 + Math.random() * 2,
deliveries_completed: { value: Math.floor(Math.random() * 15), type: "int" }
},
timestamp: Date.now()
};
return msg;Fleet Management Queries:
-- Current fleet positions
SELECT DISTINCT ON (vehicle_id)
vehicle_id,
driver_id,
latitude,
longitude,
speed_kmh,
fuel_level_percent
FROM fleet_telematics
WHERE fleet_id = 'DELIVERY_WEST'
ORDER BY vehicle_id, timestamp DESC;
-- Driver safety scores
SELECT
driver_id,
count(*) as total_events,
sum(case when harsh_braking then 1 else 0 end) as harsh_brakes,
sum(case when harsh_acceleration then 1 else 0 end) as harsh_accel,
sum(case when harsh_cornering then 1 else 0 end) as harsh_corners
FROM fleet_telematics
WHERE timestamp > dateadd('d', -7, now())
GROUP BY driver_id
ORDER BY total_events DESC;
-- Fuel efficiency by vehicle
SELECT
vehicle_id,
avg(fuel_economy_kpl) as avg_fuel_economy,
sum(trip_distance_km) as total_distance
FROM fleet_telematics
WHERE timestamp > dateadd('d', -30, now())
GROUP BY vehicle_id;6. Healthcare - Patient Monitoring
Collect vital signs from patient monitoring devices:
// Function node: Patient Vitals
msg.topic = "patient_vitals";
msg.payload = {
symbols: {
patient_id: "PT_" + String(Math.floor(Math.random() * 100) + 1).padStart(4, "0"),
ward: "ICU_A",
bed: "BED_" + String(Math.floor(Math.random() * 20) + 1).padStart(2, "0"),
device_type: "bedside_monitor"
},
columns: {
// Heart
heart_rate_bpm: { value: Math.floor(60 + Math.random() * 40), type: "int" },
heart_rate_variability_ms: Math.floor(20 + Math.random() * 30),
// ECG Waveform (250Hz sampling, 1 second)
ecg_waveform: {
value: Array.from({length: 250}, () => Math.sin(Math.random() * Math.PI * 2) * 0.5 + Math.random() * 0.1),
type: "array",
elementType: "double"
},
// Blood Pressure
systolic_mmhg: { value: Math.floor(110 + Math.random() * 30), type: "int" },
diastolic_mmhg: { value: Math.floor(70 + Math.random() * 20), type: "int" },
map_mmhg: { value: Math.floor(80 + Math.random() * 20), type: "int" }, // Mean Arterial Pressure
// Respiratory
respiratory_rate: { value: Math.floor(12 + Math.random() * 8), type: "int" },
spo2_percent: { value: (95 + Math.random() * 4).toFixed(1), type: "decimal" },
etco2_mmhg: { value: Math.floor(35 + Math.random() * 10), type: "int" },
// Respiratory waveform
pleth_waveform: {
value: Array.from({length: 100}, (_, i) => Math.sin(i * 0.1) * 50 + 100),
type: "array",
elementType: "double"
},
// Temperature
body_temp_c: { value: (36.5 + Math.random() * 1.5).toFixed(1), type: "decimal" },
skin_temp_c: { value: (32 + Math.random() * 2).toFixed(1), type: "decimal" },
// Consciousness
gcs_score: { value: Math.floor(13 + Math.random() * 3), type: "int" }, // Glasgow Coma Scale
// Pain (if conscious)
pain_score: { value: Math.floor(Math.random() * 5), type: "int" },
// IV Fluids
iv_rate_mlh: { value: Math.floor(50 + Math.random() * 100), type: "int" },
iv_total_ml: { value: Math.floor(500 + Math.random() * 1500), type: "int" },
// Alarms
alarm_active: Math.random() > 0.9,
alarm_priority: Math.random() > 0.95 ? "high" : "low"
},
timestamp: Date.now()
};
return msg;Clinical Queries:
-- Current patient status
SELECT DISTINCT ON (patient_id)
patient_id,
ward,
bed,
heart_rate_bpm,
systolic_mmhg,
diastolic_mmhg,
spo2_percent,
body_temp_c,
alarm_active
FROM patient_vitals
WHERE ward = 'ICU_A'
ORDER BY patient_id, timestamp DESC;
-- Vital trends for specific patient
SELECT
timestamp,
heart_rate_bpm,
systolic_mmhg,
spo2_percent,
respiratory_rate
FROM patient_vitals
WHERE patient_id = 'PT_0042'
ORDER BY timestamp DESC
LIMIT 100;7. Agriculture - Smart Farming
Monitor crop conditions and irrigation systems:
// Function node: Agricultural Sensors
msg.topic = "farm_sensors";
msg.payload = {
symbols: {
farm_id: "FARM_VALLEY_01",
field_id: "FIELD_A3",
crop_type: "corn",
sensor_node: "NODE_" + String(Math.floor(Math.random() * 50) + 1).padStart(3, "0")
},
columns: {
// Soil Sensors (multiple depths)
soil_moisture_10cm: Math.random() * 100,
soil_moisture_30cm: Math.random() * 100,
soil_moisture_60cm: Math.random() * 100,
soil_temp_10cm: 18 + Math.random() * 8,
soil_temp_30cm: 16 + Math.random() * 4,
// Soil Chemistry
soil_ph: { value: (6.0 + Math.random() * 1.5).toFixed(2), type: "decimal" },
soil_ec_dsm: { value: (1.0 + Math.random() * 2.0).toFixed(2), type: "decimal" }, // Electrical Conductivity
nitrogen_ppm: { value: Math.floor(20 + Math.random() * 40), type: "int" },
phosphorus_ppm: { value: Math.floor(10 + Math.random() * 30), type: "int" },
potassium_ppm: { value: Math.floor(100 + Math.random() * 100), type: "int" },
// Weather at field level
air_temp_c: 22 + (Math.random() - 0.5) * 10,
humidity_percent: 50 + Math.random() * 30,
wind_speed_ms: Math.random() * 8,
solar_radiation_wm2: Math.random() * 1000,
leaf_wetness_percent: Math.random() * 100,
// Crop Health (from imaging)
ndvi: { value: (0.3 + Math.random() * 0.5).toFixed(3), type: "decimal" }, // Vegetation Index
chlorophyll_index: { value: (40 + Math.random() * 20).toFixed(1), type: "decimal" },
canopy_temp_c: 20 + Math.random() * 10,
// Growth Stage
plant_height_cm: { value: Math.floor(50 + Math.random() * 150), type: "int" },
growth_stage: "V12", // Vegetative stage 12
days_since_planting: { value: 65, type: "int" },
// Irrigation
irrigation_active: Math.random() > 0.7,
water_applied_mm: { value: (Math.random() * 5).toFixed(1), type: "decimal" },
irrigation_efficiency: 0.85 + Math.random() * 0.1,
// Pest Pressure
pest_trap_count: { value: Math.floor(Math.random() * 20), type: "int" },
disease_risk_index: Math.random() * 100,
// Battery
battery_percent: 70 + Math.random() * 30,
solar_panel_watts: Math.random() * 5
},
timestamp: Date.now()
};
return msg;Farm Analytics:
-- Field irrigation needs
SELECT
field_id,
avg(soil_moisture_30cm) as avg_moisture,
avg(ndvi) as avg_ndvi,
max(canopy_temp_c) as max_canopy_temp
FROM farm_sensors
WHERE farm_id = 'FARM_VALLEY_01'
AND timestamp > dateadd('h', -24, now())
GROUP BY field_id
HAVING avg_moisture < 40;
-- Growth tracking
SELECT
timestamp,
avg(plant_height_cm) as avg_height,
avg(ndvi) as avg_ndvi,
avg(chlorophyll_index) as avg_chlorophyll
FROM farm_sensors
WHERE field_id = 'FIELD_A3'
SAMPLE BY 1d;8. Energy Grid - Smart Meter Data
Collect data from smart meters and grid equipment:
// Function node: Smart Meter Reading
msg.topic = "smart_meters";
msg.payload = {
symbols: {
meter_id: "MTR_" + String(Math.floor(Math.random() * 10000) + 1).padStart(6, "0"),
utility: "PACIFIC_POWER",
meter_type: "residential",
tariff_plan: "time_of_use",
feeder: "FDR_" + String(Math.floor(Math.random() * 50) + 1).padStart(3, "0")
},
columns: {
// Energy Consumption
active_energy_kwh: { value: (15234.567 + Math.random() * 10).toFixed(3), type: "decimal" },
reactive_energy_kvarh: { value: (1234.567 + Math.random() * 5).toFixed(3), type: "decimal" },
// Power
active_power_kw: { value: (Math.random() * 15).toFixed(3), type: "decimal" },
reactive_power_kvar: { value: (Math.random() * 3).toFixed(3), type: "decimal" },
apparent_power_kva: { value: (Math.random() * 16).toFixed(3), type: "decimal" },
power_factor: { value: (0.85 + Math.random() * 0.15).toFixed(3), type: "decimal" },
// Voltage (per phase)
voltage_l1: { value: (230 + (Math.random() - 0.5) * 10).toFixed(1), type: "decimal" },
voltage_l2: { value: (230 + (Math.random() - 0.5) * 10).toFixed(1), type: "decimal" },
voltage_l3: { value: (230 + (Math.random() - 0.5) * 10).toFixed(1), type: "decimal" },
// Current (per phase)
current_l1: { value: (Math.random() * 30).toFixed(2), type: "decimal" },
current_l2: { value: (Math.random() * 30).toFixed(2), type: "decimal" },
current_l3: { value: (Math.random() * 30).toFixed(2), type: "decimal" },
// Frequency
frequency_hz: { value: (50 + (Math.random() - 0.5) * 0.1).toFixed(3), type: "decimal" },
// Harmonics (THD - Total Harmonic Distortion)
thd_voltage_percent: Math.random() * 5,
thd_current_percent: Math.random() * 15,
// Power Quality Events
sag_count: { value: Math.floor(Math.random() * 3), type: "int" },
swell_count: { value: Math.floor(Math.random() * 2), type: "int" },
outage_duration_sec: { value: 0, type: "int" },
// Tamper Detection
cover_opened: false,
magnetic_tamper: false,
reverse_energy_flow: Math.random() > 0.95,
// Communication
signal_strength_dbm: { value: Math.floor(-90 + Math.random() * 40), type: "int" },
last_comm_success: true
},
timestamp: Date.now()
};
return msg;9. Network Infrastructure - Server Monitoring
Monitor servers and network equipment:
// Function node: Server Metrics
const hostname = "srv-" + ["web", "db", "app", "cache"][Math.floor(Math.random() * 4)]
+ "-" + String(Math.floor(Math.random() * 10) + 1).padStart(2, "0");
msg.topic = "server_metrics";
msg.payload = {
symbols: {
hostname: hostname,
datacenter: "DC_WEST_1",
rack: "RACK_" + String(Math.floor(Math.random() * 20) + 1).padStart(2, "0"),
environment: "production",
service: hostname.split("-")[1]
},
columns: {
// CPU
cpu_usage_percent: Math.random() * 100,
cpu_user_percent: Math.random() * 60,
cpu_system_percent: Math.random() * 30,
cpu_iowait_percent: Math.random() * 20,
cpu_cores_used: { value: Math.floor(Math.random() * 32), type: "int" },
load_1m: Math.random() * 10,
load_5m: Math.random() * 8,
load_15m: Math.random() * 6,
// Memory
memory_total_gb: { value: 128, type: "int" },
memory_used_gb: { value: (Math.random() * 100).toFixed(2), type: "decimal" },
memory_cached_gb: { value: (Math.random() * 30).toFixed(2), type: "decimal" },
memory_buffers_gb: { value: (Math.random() * 10).toFixed(2), type: "decimal" },
swap_used_mb: { value: Math.floor(Math.random() * 1000), type: "int" },
// Disk
disk_read_mbps: Math.random() * 500,
disk_write_mbps: Math.random() * 300,
disk_iops: { value: Math.floor(Math.random() * 10000), type: "int" },
disk_latency_ms: Math.random() * 10,
disk_used_percent: 30 + Math.random() * 50,
// Network
network_rx_mbps: Math.random() * 1000,
network_tx_mbps: Math.random() * 800,
network_packets_rx: { value: Math.floor(Math.random() * 100000), type: "long" },
network_packets_tx: { value: Math.floor(Math.random() * 80000), type: "long" },
network_errors: { value: Math.floor(Math.random() * 10), type: "int" },
tcp_connections: { value: Math.floor(Math.random() * 5000), type: "int" },
// Process
process_count: { value: Math.floor(200 + Math.random() * 300), type: "int" },
thread_count: { value: Math.floor(500 + Math.random() * 1000), type: "int" },
zombie_count: { value: Math.floor(Math.random() * 3), type: "int" },
// File Descriptors
fd_used: { value: Math.floor(Math.random() * 10000), type: "int" },
fd_max: { value: 65535, type: "int" },
// Temperature
cpu_temp_c: { value: Math.floor(45 + Math.random() * 30), type: "int" },
// Uptime
uptime_seconds: { value: Math.floor(Math.random() * 10000000), type: "long" }
},
timestamp: Date.now()
};
return msg;Infrastructure Queries:
-- Resource utilization dashboard
SELECT
hostname,
service,
cpu_usage_percent,
memory_used_gb * 100 / memory_total_gb as memory_percent,
disk_used_percent,
network_rx_mbps + network_tx_mbps as network_total_mbps
FROM server_metrics
WHERE environment = 'production'
ORDER BY timestamp DESC
LIMIT 100;
-- Service health aggregation
SELECT
service,
count(DISTINCT hostname) as host_count,
avg(cpu_usage_percent) as avg_cpu,
avg(memory_used_gb) as avg_memory_gb,
sum(network_errors) as total_errors
FROM server_metrics
WHERE timestamp > dateadd('m', -5, now())
GROUP BY service;10. E-commerce - Real-time Analytics
Track user behavior and transactions:
// Function node: E-commerce Events
const eventTypes = ["page_view", "add_to_cart", "purchase", "search", "wishlist"];
const eventType = eventTypes[Math.floor(Math.random() * eventTypes.length)];
msg.topic = "ecommerce_events";
msg.payload = {
symbols: {
event_type: eventType,
session_id: "SES_" + Math.random().toString(36).substring(2, 15),
user_id: Math.random() > 0.3 ? "USR_" + String(Math.floor(Math.random() * 10000)).padStart(6, "0") : "anonymous",
device_type: ["desktop", "mobile", "tablet"][Math.floor(Math.random() * 3)],
country: ["US", "UK", "DE", "FR", "JP"][Math.floor(Math.random() * 5)],
channel: ["organic", "paid", "email", "social", "direct"][Math.floor(Math.random() * 5)]
},
columns: {
// Page/Product Info
page_url: "/products/category/item-" + Math.floor(Math.random() * 1000),
product_id: "PROD_" + String(Math.floor(Math.random() * 5000)).padStart(5, "0"),
category: ["electronics", "clothing", "home", "sports"][Math.floor(Math.random() * 4)],
// Pricing (decimal for accuracy)
product_price: { value: (Math.random() * 500 + 10).toFixed(2), type: "decimal" },
discount_percent: { value: Math.floor(Math.random() * 30), type: "int" },
cart_value: { value: (Math.random() * 1000).toFixed(2), type: "decimal" },
// Quantities
quantity: { value: Math.floor(Math.random() * 5) + 1, type: "int" },
cart_items: { value: Math.floor(Math.random() * 10), type: "int" },
// Transaction (for purchase events)
order_id: eventType === "purchase" ? "ORD_" + Date.now() : null,
payment_method: eventType === "purchase" ? ["card", "paypal", "apple_pay"][Math.floor(Math.random() * 3)] : null,
// User Behavior
time_on_page_sec: { value: Math.floor(Math.random() * 300), type: "int" },
scroll_depth_percent: Math.floor(Math.random() * 100),
click_count: { value: Math.floor(Math.random() * 20), type: "int" },
// Search (for search events)
search_query: eventType === "search" ? ["shoes", "laptop", "jacket", "headphones"][Math.floor(Math.random() * 4)] : null,
search_results_count: eventType === "search" ? { value: Math.floor(Math.random() * 500), type: "int" } : null,
// Technical
page_load_ms: { value: Math.floor(500 + Math.random() * 2000), type: "int" },
browser: ["Chrome", "Firefox", "Safari", "Edge"][Math.floor(Math.random() * 4)],
// Attribution
referrer: ["google", "facebook", "direct", "email"][Math.floor(Math.random() * 4)],
campaign_id: Math.random() > 0.5 ? "CAMP_" + Math.floor(Math.random() * 100) : null
},
timestamp: Date.now()
};
return msg;E-commerce Analytics:
-- Real-time conversion funnel
SELECT
event_type,
count(*) as event_count,
count(DISTINCT session_id) as unique_sessions
FROM ecommerce_events
WHERE timestamp > dateadd('h', -1, now())
GROUP BY event_type;
-- Revenue by channel
SELECT
channel,
country,
sum(cart_value) as total_revenue,
count(*) as purchase_count
FROM ecommerce_events
WHERE event_type = 'purchase'
AND timestamp > dateadd('d', -1, now())
GROUP BY channel, country
ORDER BY total_revenue DESC;Data Type Reference
| JavaScript Value | Auto-Detection | Explicit Type |
|---|---|---|
42 |
float | { value: 42, type: "int" } |
42.5 |
float | { value: 42.5, type: "double" } |
9876543210 |
float | { value: 9876543210, type: "long" } |
"123.45" |
string | { value: "123.45", type: "decimal" } |
"hello" |
string | { value: "hello", type: "string" } |
true |
boolean | { value: true, type: "boolean" } |
[1.1, 2.2] |
array (double) | { value: [...], type: "array", elementType: "double" } |
Date.now() |
timestamp | { value: Date.now(), type: "timestamp" } |
Best Practices
Symbol Selection
Use symbols (tags) for columns that will be frequently filtered:
- Device/sensor identifiers
- Geographic regions
- Service names
- Environment (prod/dev/staging)
Symbols are indexed and provide fast filtering performance.
Timestamp Handling
- Use
Date.now()for millisecond precision - Omit timestamp to use QuestDB server time
- ISO strings are auto-converted:
"2024-01-15T10:30:00Z"
Array Columns
Ideal for:
- High-frequency sensor samples
- FFT/spectral data
- Batch readings from single collection window
Decimal Type
Use for financial data requiring exact precision:
price: { value: "123.456789", type: "decimal" }Performance Tips
- Batch related data into single rows when possible
- Use appropriate flush intervals for your write pattern
- Leverage symbols for frequently queried dimensions
- Use arrays for high-frequency sampled data
QuestDB Setup
Using Docker
docker run -p 9000:9000 -p 9009:9009 questdb/questdbConnection String Format
The node uses QuestDB's connection string format internally:
http::addr=localhost:9000;auto_flush_rows=75000;auto_flush_interval=1000;Compatibility
- Node-RED: >= 2.0.0
- Node.js: >= 14.0.0
- QuestDB: >= 6.0 (recommended: latest)
Troubleshooting
Connection Issues
- Verify QuestDB is running:
curl http://localhost:9000 - Check firewall settings for ports 9000/9009
- For HTTPS/TCPS, ensure certificates are properly configured
Data Not Appearing
- Check the node status indicator (green = connected)
- Verify table creation in QuestDB console
- Enable debug output to see write confirmations
Performance Tips
- Use symbols for frequently queried columns (they're indexed)
- Batch writes when possible using arrays
- Adjust auto-flush settings based on your write patterns
License
MIT
Author
Holger Amort