Events & Monitoring
Monitor your queues with comprehensive event system and real-time updates. Build dashboards, alerts, and analytics.
Quick Navigation
Event System
Reliable Queue provides a comprehensive event system that allows you to monitor queue operations, track task lifecycle, and respond to state changes in real-time.
import { ReliableQueue } from '@aplanka/reliable-queue';const queue = new ReliableQueue();// Subscribe to eventsconst unsubscribeTaskCompleted = queue.on('taskCompleted', (task) => {console.log(`Task ${task.id} completed successfully`);});const unsubscribeTaskFailed = queue.on('taskFailed', (task, error) => {console.error(`Task ${task.id} failed:, error.message`);});const unsubscribeQueueUpdated = queue.on('queueUpdated', (tasks) => {console.log(`Queue now has ${tasks.length} tasks`);});// Unsubscribe when doneunsubscribeTaskCompleted();unsubscribeTaskFailed();unsubscribeQueueUpdated();
Available Events
taskAdded
Fired when a new task is added to the queue.
Signature
(task: QueuedTask<T>) => void
Common Use Cases
Example Usage
queue.on('taskAdded', (task) => {console.log(`New task: ${task.id}`);updateTaskCounter(queue.getStats().total);});
taskStarted
Fired when a task begins processing.
Signature
(task: QueuedTask<T>) => void
Common Use Cases
Example Usage
queue.on('taskStarted', (task) => {task.startTime = Date.now();showProgressIndicator(task.id);});
taskCompleted
Fired when a task completes successfully.
Signature
(task: QueuedTask<T>) => void
Common Use Cases
Example Usage
queue.on('taskCompleted', (task) => {const duration = Date.now() - task.startTime;analytics.track('task_completed', { duration });hideProgressIndicator(task.id);});
taskFailed
Fired when a task fails after exhausting all retries.
Signature
(task: QueuedTask<T>, error: Error) => void
Common Use Cases
Example Usage
queue.on('taskFailed', (task, error) => {errorLogger.log(error, { taskId: task.id });notifyUser(`Task ${task.id} failed: ${error.message}`);});
taskRetried
Fired when a task is retried (manually or automatically).
Signature
(task: QueuedTask<T>) => void
Common Use Cases
Example Usage
queue.on('taskRetried', (task) => {console.log(`Retry #${task.retryCount} for task ${task.id}`);metrics.incrementRetryCount();});
queueUpdated
Fired whenever the queue state changes.
Signature
(tasks: QueuedTask<T>[]) => void
Common Use Cases
Example Usage
queue.on('queueUpdated', (tasks) => {const stats = queue.getStats();updateDashboard(stats);syncToLocalStorage(tasks);});
Monitoring Tools
Build comprehensive monitoring solutions using queue events and statistics.
Real-time Dashboard
Live view of queue statistics, task status, and performance metrics.
Features
- Task counts by status
- Processing rate
- Error rate
- Average processing time
Performance Tracking
Track task performance over time to identify bottlenecks and trends.
Features
- Processing time trends
- Throughput analysis
- Memory usage
- Success rates
Alerting System
Automated alerts for failures, performance issues, and anomalies.
Features
- Failure rate alerts
- Stuck task detection
- Custom thresholds
- Multiple channels
Debug Logging
Detailed logging of queue operations for debugging and analysis.
Features
- Event timeline
- Task history
- Error details
- State snapshots
Real-time Monitoring
Build real-time dashboards and monitoring systems using queue events and statistics.
import { ReliableQueue } from '@aplanka/reliable-queue';const queue = new ReliableQueue();// Real-time monitoring dashboardclass QueueMonitor {constructor(queue) {this.queue = queue;this.metrics = {totalProcessed: 0,totalFailed: 0,averageProcessingTime: 0,lastError: null,throughputPerMinute: 0};this.setupEventListeners();this.startMetricsCollection();}setupEventListeners() {// Track task lifecyclethis.queue.on('taskStarted', (task) => {task.startTime = Date.now();this.updateDashboard();});this.queue.on('taskCompleted', (task) => {const processingTime = Date.now() - task.startTime;this.metrics.totalProcessed++;this.updateAverageProcessingTime(processingTime);this.updateDashboard();});this.queue.on('taskFailed', (task, error) => {this.metrics.totalFailed++;this.metrics.lastError = {taskId: task.id,error: error.message,timestamp: Date.now()};this.sendAlert('Task Failed', `Task ${task.id} failed: ${error.message}`);this.updateDashboard();});// Monitor queue state changesthis.queue.on('queueUpdated', (tasks) => {this.calculateThroughput(tasks);this.updateDashboard();});}updateDashboard() {const stats = this.queue.getStats();console.log('📊 Queue Metrics:', {...stats,...this.metrics,successRate: this.calculateSuccessRate()});}calculateSuccessRate() {const total = this.metrics.totalProcessed + this.metrics.totalFailed;return total > 0 ? (this.metrics.totalProcessed / total * 100).toFixed(2) : 0;}}const monitor = new QueueMonitor(queue);
Key Metrics
- • Tasks processed per minute
- • Average processing time
- • Success/failure rates
- • Queue size over time
Performance Tracking
- • Response time trends
- • Throughput analysis
- • Resource utilization
- • Error rate patterns
Real-time Updates
- • Live task status
- • Queue state changes
- • Instant notifications
- • Dynamic dashboards
Alerting System
Set up automated alerts to detect issues early and maintain system reliability.
// Advanced alerting systemclass QueueAlerts {constructor(queue, options = {}) {this.queue = queue;this.options = {failureThreshold: options.failureThreshold || 5,stuckTaskThreshold: options.stuckTaskThreshold || 300000, // 5 minuteserrorRateThreshold: options.errorRateThreshold || 0.1, // 10%...options};this.consecutiveFailures = 0;this.errorHistory = [];this.setupAlerts();}setupAlerts() {// Alert on consecutive failuresthis.queue.on('taskCompleted', () => {this.consecutiveFailures = 0;});this.queue.on('taskFailed', (task, error) => {this.consecutiveFailures++;this.recordError(task, error);if (this.consecutiveFailures >= this.options.failureThreshold) {this.sendCriticalAlert('High Failure Rate',`${this.consecutiveFailures} consecutive task failures detected`);}});// Check for stuck taskssetInterval(() => {this.checkForStuckTasks();}, 60000); // Check every minute// Monitor error ratesetInterval(() => {this.checkErrorRate();}, 300000); // Check every 5 minutes}checkForStuckTasks() {const tasks = this.queue.getTasks();const stuckTasks = tasks.filter(task =>task.status === 'processing' &&Date.now() - task.updatedAt > this.options.stuckTaskThreshold);if (stuckTasks.length > 0) {this.sendAlert('Stuck Tasks',`${stuckTasks.length} tasks have been processing for too long`);}}sendCriticalAlert(title, message) {console.error(`🚨 CRITICAL: ${title} - ${message}`);// Integration with alerting services:// - Send to Slack, Discord, or Teams// - Email notifications// - Push notifications// - PagerDuty integration}}
Failure Alerts
- • Consecutive failure detection
- • High error rate warnings
- • Critical task failures
- • Custom failure thresholds
Performance Alerts
- • Stuck task detection
- • Processing time anomalies
- • Queue backlog warnings
- • Throughput degradation
Metrics Collection
Collect comprehensive metrics for analysis and integration with monitoring platforms.
// Comprehensive metrics collectionclass QueueMetrics {constructor(queue) {this.queue = queue;this.history = [];this.taskTimings = new Map();this.setupMetricsCollection();}setupMetricsCollection() {// Collect metrics every 30 secondssetInterval(() => {this.collectSnapshot();}, 30000);// Track individual task performancethis.queue.on('taskStarted', (task) => {this.taskTimings.set(task.id, {startTime: Date.now(),retryCount: task.retryCount});});this.queue.on('taskCompleted', (task) => {this.recordTaskCompletion(task);});this.queue.on('taskFailed', (task) => {this.recordTaskFailure(task);});}collectSnapshot() {const stats = this.queue.getStats();const snapshot = {timestamp: Date.now(),...stats,activeMemory: this.calculateMemoryUsage(),averageWaitTime: this.calculateAverageWaitTime(),throughput: this.calculateThroughput()};this.history.push(snapshot);// Keep only last 100 snapshotsif (this.history.length > 100) {this.history.shift();}}// Export metrics for external monitoring toolsexportMetrics() {return {current: this.queue.getStats(),history: this.history,performance: this.getPerformanceMetrics()};}// Integration with monitoring servicessendToPrometheus() {// Send metrics to Prometheus}sendToDatadog() {// Send metrics to Datadog}}
Integration Options
Time Series Databases
Prometheus, InfluxDB, CloudWatch
APM Tools
Datadog, New Relic, AppDynamics
Analytics Platforms
Google Analytics, Mixpanel, Amplitude
Debugging Tools
Debug queue issues with detailed logging and task history tracking.
// Queue debugging utilitiesclass QueueDebugger {constructor(queue) {this.queue = queue;this.debugMode = false;this.logs = [];}enableDebugMode() {this.debugMode = true;// Log all events with detailed informationthis.queue.on('taskAdded', (task) => {this.log('TASK_ADDED', `Task ${task.id} added with priority ${task.priority}`);});this.queue.on('taskStarted', (task) => {this.log('TASK_STARTED', `Task ${task.id} started processing`);});this.queue.on('taskCompleted', (task) => {const duration = Date.now() - task.startTime;this.log('TASK_COMPLETED', `Task ${task.id} completed in ${duration}ms`);});this.queue.on('taskFailed', (task, error) => {this.log('TASK_FAILED', `Task ${task.id} failed: ${error.message}`);});this.queue.on('taskRetried', (task) => {this.log('TASK_RETRIED', `Task ${task.id} retry #${task.retryCount}`);});}log(event, message) {const logEntry = {timestamp: new Date().toISOString(),event,message,queueState: this.queue.getStats()};this.logs.push(logEntry);if (this.debugMode) {console.log(`[QUEUE DEBUG] ${event}: ${message}`);}// Keep only last 1000 logsif (this.logs.length > 1000) {this.logs.shift();}}exportLogs() {return this.logs;}findTaskHistory(taskId) {return this.logs.filter(log => log.message.includes(taskId));}}
Debug Features
- • Detailed event logging
- • Task lifecycle tracking
- • State change history
- • Error context capture
Log Analysis
- • Task history queries
- • Error pattern analysis
- • Performance bottlenecks
- • State transition logs
Monitoring Best Practices
Event Handling
- • Always unsubscribe from events when components unmount
- • Use debouncing for high-frequency events like queueUpdated
- • Handle event errors gracefully to prevent queue disruption
- • Keep event handlers lightweight and non-blocking
Performance Monitoring
- • Monitor key metrics: throughput, latency, error rates
- • Set up alerts for abnormal patterns and thresholds
- • Track task processing times and queue depths
- • Use sampling for high-volume queues to reduce overhead