Back to Documentation

Events & Monitoring

Monitor your queues with comprehensive event system and real-time updates. Build dashboards, alerts, and analytics.

Event System

Reliable Queue provides a comprehensive event system that allows you to monitor queue operations, track task lifecycle, and respond to state changes in real-time.

Basic Event Subscription
typescript
import { ReliableQueue } from '@aplanka/reliable-queue';
const queue = new ReliableQueue();
// Subscribe to events
const unsubscribeTaskCompleted = queue.on('taskCompleted', (task) => {
console.log(`Task ${task.id} completed successfully`);
});
const unsubscribeTaskFailed = queue.on('taskFailed', (task, error) => {
console.error(`Task ${task.id} failed:, error.message`);
});
const unsubscribeQueueUpdated = queue.on('queueUpdated', (tasks) => {
console.log(`Queue now has ${tasks.length} tasks`);
});
// Unsubscribe when done
unsubscribeTaskCompleted();
unsubscribeTaskFailed();
unsubscribeQueueUpdated();

Available Events

taskAdded

Fired when a new task is added to the queue.

Event
Signature
(task: QueuedTask<T>) => void
Common Use Cases
Logging new tasksUpdating UI countersTriggering notifications
Example Usage
taskAdded example
typescript
queue.on('taskAdded', (task) => {
console.log(`New task: ${task.id}`);
updateTaskCounter(queue.getStats().total);
});

taskStarted

Fired when a task begins processing.

Event
Signature
(task: QueuedTask<T>) => void
Common Use Cases
Performance trackingProgress indicatorsResource monitoring
Example Usage
taskStarted example
typescript
queue.on('taskStarted', (task) => {
task.startTime = Date.now();
showProgressIndicator(task.id);
});

taskCompleted

Fired when a task completes successfully.

Event
Signature
(task: QueuedTask<T>) => void
Common Use Cases
Success trackingUI updatesAnalyticsCleanup operations
Example Usage
taskCompleted example
typescript
queue.on('taskCompleted', (task) => {
const duration = Date.now() - task.startTime;
analytics.track('task_completed', { duration });
hideProgressIndicator(task.id);
});

taskFailed

Fired when a task fails after exhausting all retries.

Event
Signature
(task: QueuedTask<T>, error: Error) => void
Common Use Cases
Error loggingUser notificationsAlerting systems
Example Usage
taskFailed example
typescript
queue.on('taskFailed', (task, error) => {
errorLogger.log(error, { taskId: task.id });
notifyUser(`Task ${task.id} failed: ${error.message}`);
});

taskRetried

Fired when a task is retried (manually or automatically).

Event
Signature
(task: QueuedTask<T>) => void
Common Use Cases
Retry trackingDebuggingPerformance monitoring
Example Usage
taskRetried example
typescript
queue.on('taskRetried', (task) => {
console.log(`Retry #${task.retryCount} for task ${task.id}`);
metrics.incrementRetryCount();
});

queueUpdated

Fired whenever the queue state changes.

Event
Signature
(tasks: QueuedTask<T>[]) => void
Common Use Cases
Real-time dashboardsState synchronizationReact re-renders
Example Usage
queueUpdated example
typescript
queue.on('queueUpdated', (tasks) => {
const stats = queue.getStats();
updateDashboard(stats);
syncToLocalStorage(tasks);
});

Monitoring Tools

Build comprehensive monitoring solutions using queue events and statistics.

Real-time Dashboard

Live view of queue statistics, task status, and performance metrics.

Features

  • Task counts by status
  • Processing rate
  • Error rate
  • Average processing time

Performance Tracking

Track task performance over time to identify bottlenecks and trends.

Features

  • Processing time trends
  • Throughput analysis
  • Memory usage
  • Success rates

Alerting System

Automated alerts for failures, performance issues, and anomalies.

Features

  • Failure rate alerts
  • Stuck task detection
  • Custom thresholds
  • Multiple channels

Debug Logging

Detailed logging of queue operations for debugging and analysis.

Features

  • Event timeline
  • Task history
  • Error details
  • State snapshots

Real-time Monitoring

Build real-time dashboards and monitoring systems using queue events and statistics.

Queue Monitoring System
typescript
import { ReliableQueue } from '@aplanka/reliable-queue';
const queue = new ReliableQueue();
// Real-time monitoring dashboard
class QueueMonitor {
constructor(queue) {
this.queue = queue;
this.metrics = {
totalProcessed: 0,
totalFailed: 0,
averageProcessingTime: 0,
lastError: null,
throughputPerMinute: 0
};
this.setupEventListeners();
this.startMetricsCollection();
}
setupEventListeners() {
// Track task lifecycle
this.queue.on('taskStarted', (task) => {
task.startTime = Date.now();
this.updateDashboard();
});
this.queue.on('taskCompleted', (task) => {
const processingTime = Date.now() - task.startTime;
this.metrics.totalProcessed++;
this.updateAverageProcessingTime(processingTime);
this.updateDashboard();
});
this.queue.on('taskFailed', (task, error) => {
this.metrics.totalFailed++;
this.metrics.lastError = {
taskId: task.id,
error: error.message,
timestamp: Date.now()
};
this.sendAlert('Task Failed', `Task ${task.id} failed: ${error.message}`);
this.updateDashboard();
});
// Monitor queue state changes
this.queue.on('queueUpdated', (tasks) => {
this.calculateThroughput(tasks);
this.updateDashboard();
});
}
updateDashboard() {
const stats = this.queue.getStats();
console.log('📊 Queue Metrics:', {
...stats,
...this.metrics,
successRate: this.calculateSuccessRate()
});
}
calculateSuccessRate() {
const total = this.metrics.totalProcessed + this.metrics.totalFailed;
return total > 0 ? (this.metrics.totalProcessed / total * 100).toFixed(2) : 0;
}
}
const monitor = new QueueMonitor(queue);

Key Metrics

  • • Tasks processed per minute
  • • Average processing time
  • • Success/failure rates
  • • Queue size over time

Performance Tracking

  • • Response time trends
  • • Throughput analysis
  • • Resource utilization
  • • Error rate patterns

Real-time Updates

  • • Live task status
  • • Queue state changes
  • • Instant notifications
  • • Dynamic dashboards

Alerting System

Set up automated alerts to detect issues early and maintain system reliability.

Advanced Alerting System
typescript
// Advanced alerting system
class QueueAlerts {
constructor(queue, options = {}) {
this.queue = queue;
this.options = {
failureThreshold: options.failureThreshold || 5,
stuckTaskThreshold: options.stuckTaskThreshold || 300000, // 5 minutes
errorRateThreshold: options.errorRateThreshold || 0.1, // 10%
...options
};
this.consecutiveFailures = 0;
this.errorHistory = [];
this.setupAlerts();
}
setupAlerts() {
// Alert on consecutive failures
this.queue.on('taskCompleted', () => {
this.consecutiveFailures = 0;
});
this.queue.on('taskFailed', (task, error) => {
this.consecutiveFailures++;
this.recordError(task, error);
if (this.consecutiveFailures >= this.options.failureThreshold) {
this.sendCriticalAlert('High Failure Rate',
`${this.consecutiveFailures} consecutive task failures detected`);
}
});
// Check for stuck tasks
setInterval(() => {
this.checkForStuckTasks();
}, 60000); // Check every minute
// Monitor error rate
setInterval(() => {
this.checkErrorRate();
}, 300000); // Check every 5 minutes
}
checkForStuckTasks() {
const tasks = this.queue.getTasks();
const stuckTasks = tasks.filter(task =>
task.status === 'processing' &&
Date.now() - task.updatedAt > this.options.stuckTaskThreshold
);
if (stuckTasks.length > 0) {
this.sendAlert('Stuck Tasks',
`${stuckTasks.length} tasks have been processing for too long`);
}
}
sendCriticalAlert(title, message) {
console.error(`🚨 CRITICAL: ${title} - ${message}`);
// Integration with alerting services:
// - Send to Slack, Discord, or Teams
// - Email notifications
// - Push notifications
// - PagerDuty integration
}
}

Failure Alerts

  • • Consecutive failure detection
  • • High error rate warnings
  • • Critical task failures
  • • Custom failure thresholds

Performance Alerts

  • • Stuck task detection
  • • Processing time anomalies
  • • Queue backlog warnings
  • • Throughput degradation

Metrics Collection

Collect comprehensive metrics for analysis and integration with monitoring platforms.

Comprehensive Metrics Collection
typescript
// Comprehensive metrics collection
class QueueMetrics {
constructor(queue) {
this.queue = queue;
this.history = [];
this.taskTimings = new Map();
this.setupMetricsCollection();
}
setupMetricsCollection() {
// Collect metrics every 30 seconds
setInterval(() => {
this.collectSnapshot();
}, 30000);
// Track individual task performance
this.queue.on('taskStarted', (task) => {
this.taskTimings.set(task.id, {
startTime: Date.now(),
retryCount: task.retryCount
});
});
this.queue.on('taskCompleted', (task) => {
this.recordTaskCompletion(task);
});
this.queue.on('taskFailed', (task) => {
this.recordTaskFailure(task);
});
}
collectSnapshot() {
const stats = this.queue.getStats();
const snapshot = {
timestamp: Date.now(),
...stats,
activeMemory: this.calculateMemoryUsage(),
averageWaitTime: this.calculateAverageWaitTime(),
throughput: this.calculateThroughput()
};
this.history.push(snapshot);
// Keep only last 100 snapshots
if (this.history.length > 100) {
this.history.shift();
}
}
// Export metrics for external monitoring tools
exportMetrics() {
return {
current: this.queue.getStats(),
history: this.history,
performance: this.getPerformanceMetrics()
};
}
// Integration with monitoring services
sendToPrometheus() {
// Send metrics to Prometheus
}
sendToDatadog() {
// Send metrics to Datadog
}
}

Integration Options

Time Series Databases

Prometheus, InfluxDB, CloudWatch

APM Tools

Datadog, New Relic, AppDynamics

Analytics Platforms

Google Analytics, Mixpanel, Amplitude

Debugging Tools

Debug queue issues with detailed logging and task history tracking.

Queue Debugging Utilities
typescript
// Queue debugging utilities
class QueueDebugger {
constructor(queue) {
this.queue = queue;
this.debugMode = false;
this.logs = [];
}
enableDebugMode() {
this.debugMode = true;
// Log all events with detailed information
this.queue.on('taskAdded', (task) => {
this.log('TASK_ADDED', `Task ${task.id} added with priority ${task.priority}`);
});
this.queue.on('taskStarted', (task) => {
this.log('TASK_STARTED', `Task ${task.id} started processing`);
});
this.queue.on('taskCompleted', (task) => {
const duration = Date.now() - task.startTime;
this.log('TASK_COMPLETED', `Task ${task.id} completed in ${duration}ms`);
});
this.queue.on('taskFailed', (task, error) => {
this.log('TASK_FAILED', `Task ${task.id} failed: ${error.message}`);
});
this.queue.on('taskRetried', (task) => {
this.log('TASK_RETRIED', `Task ${task.id} retry #${task.retryCount}`);
});
}
log(event, message) {
const logEntry = {
timestamp: new Date().toISOString(),
event,
message,
queueState: this.queue.getStats()
};
this.logs.push(logEntry);
if (this.debugMode) {
console.log(`[QUEUE DEBUG] ${event}: ${message}`);
}
// Keep only last 1000 logs
if (this.logs.length > 1000) {
this.logs.shift();
}
}
exportLogs() {
return this.logs;
}
findTaskHistory(taskId) {
return this.logs.filter(log => log.message.includes(taskId));
}
}

Debug Features

  • • Detailed event logging
  • • Task lifecycle tracking
  • • State change history
  • • Error context capture

Log Analysis

  • • Task history queries
  • • Error pattern analysis
  • • Performance bottlenecks
  • • State transition logs

Monitoring Best Practices

Event Handling

  • • Always unsubscribe from events when components unmount
  • • Use debouncing for high-frequency events like queueUpdated
  • • Handle event errors gracefully to prevent queue disruption
  • • Keep event handlers lightweight and non-blocking

Performance Monitoring

  • • Monitor key metrics: throughput, latency, error rates
  • • Set up alerts for abnormal patterns and thresholds
  • • Track task processing times and queue depths
  • • Use sampling for high-volume queues to reduce overhead