Skip to main content

Health Check & Monitoring

Monitor the health and availability of the SpeechLytics API and related services.

Overview

The Health Check endpoint provides:

  • System availability status
  • Service component health
  • Queue status and metrics
  • Processing statistics
  • Error diagnostics

Health Check Endpoint

Request

curl -X GET "https://api.example.com/api/v1/health"

No authentication required for health check endpoint.

Response

{
"database": true,
"hangfire": true,
"isLive": true,
"queueSize": 125,
"rabbitMq": true,
"errorCode": null,
"averageProcessingTime": 2.5
}

Response Fields

FieldTypeDescription
databasebooleanDatabase connectivity status
hangfirebooleanBackground job processor status
isLivebooleanLive transcription service status
queueSizeintegerCurrent number of jobs in queue
rabbitMqbooleanMessage queue service status
errorCodestring | nullError code if any issues detected
averageProcessingTimenumberAverage processing time in seconds

Status Interpretation

Healthy Status

{
"database": true,
"hangfire": true,
"isLive": true,
"queueSize": 10,
"rabbitMq": true,
"errorCode": null,
"averageProcessingTime": 1.8
}

All services operational, normal queue size

Degraded Status

{
"database": true,
"hangfire": true,
"isLive": true,
"queueSize": 500,
"rabbitMq": true,
"errorCode": "QUEUE_BACKED_UP",
"averageProcessingTime": 5.2
}

Services running but processing slower due to backlog

Unhealthy Status

{
"database": true,
"hangfire": false,
"isLive": false,
"queueSize": 0,
"rabbitMq": false,
"errorCode": "SERVICE_UNAVAILABLE",
"averageProcessingTime": 0
}

Critical services down, API unavailable

Monitoring Implementation

Python Health Monitor

import requests
import time
from datetime import datetime

class HealthMonitor:
def __init__(self, api_base, check_interval=60):
self.api_base = api_base
self.check_interval = check_interval
self.health_history = []

def check_health(self):
"""Get current health status"""
try:
response = requests.get(
f"{self.api_base}/api/v1/health",
timeout=5
)
return response.json()
except Exception as e:
return {
'error': str(e),
'timestamp': datetime.now().isoformat()
}

def is_healthy(self):
"""Check if all critical services are healthy"""
health = self.check_health()

return (
health.get('database') and
health.get('hangfire') and
health.get('rabbitMq') and
health.get('errorCode') is None
)

def get_status_summary(self):
"""Get human-readable status"""
health = self.check_health()

status = {
'healthy': self.is_healthy(),
'database': 'OK' if health.get('database') else 'DOWN',
'hangfire': 'OK' if health.get('hangfire') else 'DOWN',
'live': 'OK' if health.get('isLive') else 'DOWN',
'queue': health.get('queueSize', 0),
'rabbitMq': 'OK' if health.get('rabbitMq') else 'DOWN',
'avgProcessTime': f"{health.get('averageProcessingTime', 0):.2f}s",
'error': health.get('errorCode')
}

return status

def start_monitoring(self, callback=None):
"""Start continuous monitoring"""
while True:
health = self.check_health()
self.health_history.append({
'timestamp': datetime.now().isoformat(),
'health': health
})

if callback:
callback(self.get_status_summary())

time.sleep(self.check_interval)

# Usage
monitor = HealthMonitor("https://api.example.com")

def alert_handler(status):
"""Handle health alerts"""
if not status['healthy']:
print(f"⚠️ ALERT: System unhealthy")
print(f" Database: {status['database']}")
print(f" Hangfire: {status['hangfire']}")
print(f" Queue Size: {status['queue']}")
else:
print(f"✓ System healthy - Queue: {status['queue']}, Avg Time: {status['avgProcessTime']}")

# Check current health
status = monitor.get_status_summary()
print("Current Status:", status)

# Start monitoring (in background thread)
import threading
monitor_thread = threading.Thread(
target=monitor.start_monitoring,
args=(alert_handler,),
daemon=True
)
monitor_thread.start()

Service Descriptions

Database

  • Purpose: Stores transcripts, metadata, and system data
  • Impact if down: No data persistence; jobs fail
  • Typical timeout: <500ms

Hangfire

  • Purpose: Distributed background job processing
  • Impact if down: Transcription queue not processed
  • Typical timeout: <1s

Live Transcription Service (isLive)

  • Purpose: Real-time transcription WebSocket connections
  • Impact if down: Live transcription unavailable
  • Typical timeout: <1s

RabbitMQ

  • Purpose: Message queue for job distribution
  • Impact if down: Job distribution fails
  • Typical timeout: <500ms

Queue Management

Understanding Queue Size

SizeMeaningAction
0-10NormalNo action needed
10-100Moderate loadMonitor closely
100-500Heavy loadConsider reducing submissions
500+Severe backlogContact support, reduce load

Queue Monitoring Strategy

def monitor_queue_health(api_base):
"""Monitor and alert on queue issues"""
monitor = HealthMonitor(api_base, check_interval=10)

queue_warning_threshold = 100
queue_critical_threshold = 500

while True:
health = monitor.check_health()
queue_size = health.get('queueSize', 0)

if queue_size >= queue_critical_threshold:
print(f"🔴 CRITICAL: Queue size {queue_size}")
# Take action: notify admin, slow down submissions
elif queue_size >= queue_warning_threshold:
print(f"🟠 WARNING: Queue size {queue_size}")
# Take action: monitor more frequently
else:
print(f"🟢 HEALTHY: Queue size {queue_size}")

time.sleep(10)

Processing Time Analysis

Understanding averageProcessingTime

  • < 2 seconds: Optimal
  • 2-5 seconds: Normal
  • 5-10 seconds: Acceptable with load
  • > 10 seconds: Degraded performance

Trend Analysis

def analyze_processing_trends(api_base, history_length=100):
"""Analyze processing time trends"""
monitor = HealthMonitor(api_base)
times = []

for _ in range(history_length):
health = monitor.check_health()
avg_time = health.get('averageProcessingTime', 0)
times.append(avg_time)
time.sleep(1)

# Calculate statistics
import statistics
stats = {
'mean': statistics.mean(times),
'median': statistics.median(times),
'stdev': statistics.stdev(times) if len(times) > 1 else 0,
'min': min(times),
'max': max(times)
}

return stats

Error Codes

Error CodeMeaningResolution
nullNo errorsSystem operating normally
DATABASE_ERRORDatabase connectivity issueCheck database connection
QUEUE_BACKED_UPHigh job queue backlogWait or reduce load
SERVICE_UNAVAILABLECritical service downContact support
MEMORY_PRESSURESystem memory constraintClear cache, reduce load
HIGH_LATENCYProcessing delaysMonitor and wait
RATE_LIMITRate limiting activeReduce request rate

Alerting Rules

Create Custom Alerts

class HealthAlert:
def __init__(self, api_base, email_config):
self.monitor = HealthMonitor(api_base)
self.email_config = email_config
self.alerts_sent = {}

def check_alert_condition(self, condition_name, condition_func):
"""Check if alert condition is met"""
if condition_func():
self._send_alert(condition_name)
return True
return False

def _send_alert(self, condition_name):
"""Send alert notification"""
# Prevent alert spam
if condition_name in self.alerts_sent:
if time.time() - self.alerts_sent[condition_name] < 300: # 5 minutes
return

# Send email
self._send_email(
subject=f"SpeechLytics Alert: {condition_name}",
body=self._format_alert(condition_name)
)

self.alerts_sent[condition_name] = time.time()

def _format_alert(self, condition_name):
"""Format alert message"""
status = self.monitor.get_status_summary()
return f"""
Alert: {condition_name}

System Status:
- Database: {status['database']}
- Hangfire: {status['hangfire']}
- Queue Size: {status['queue']}
- RabbitMQ: {status['rabbitMq']}
- Error: {status['error']}

Please investigate immediately.
"""

def _send_email(self, subject, body):
"""Send email alert"""
# Implement email sending
pass

# Usage
alerter = HealthAlert(
"https://api.example.com",
email_config={'to': 'ops@company.com'}
)

# Monitor in loop
while True:
alerter.check_alert_condition(
"Database Down",
lambda: not alerter.monitor.check_health().get('database', False)
)

alerter.check_alert_condition(
"Queue Backlog",
lambda: alerter.monitor.check_health().get('queueSize', 0) > 500
)

time.sleep(30)

Dashboard Integration

Health Status Widget

def get_health_widget(api_base):
"""Get health status for dashboard"""
monitor = HealthMonitor(api_base)
status = monitor.get_status_summary()

widget = {
'title': 'System Health',
'status': '🟢 Healthy' if status['healthy'] else '🔴 Unhealthy',
'components': {
'Database': status['database'],
'Processing': status['hangfire'],
'Live Service': status['live'],
'Message Queue': status['rabbitMq']
},
'queue': {
'current': status['queue'],
'avgProcessTime': status['avgProcessTime']
}
}

return widget

Best Practices

1. Regular Monitoring

  • Check health every 30-60 seconds
  • Log health history
  • Track trends over time
  • Set up alerts

2. Alert Configuration

  • Critical: Any service down
  • Warning: Queue > 100
  • Info: Processing time > 5s
  • Don't spam alerts (cooldown)

3. Response Procedures

  • Document escalation paths
  • Have on-call rotation
  • Maintain runbooks
  • Track MTTR (Mean Time To Resolve)

4. Capacity Planning

  • Monitor long-term trends
  • Forecast capacity needs
  • Plan scaling ahead
  • Test failover procedures

Troubleshooting

Database Showing Down

  • Check database connectivity
  • Verify firewall rules
  • Check database process
  • Review error logs

Queue Growing Indefinitely

  • Check if Hangfire is processing
  • Review job error logs
  • Manually clear stuck jobs
  • Contact support if persists

High Processing Times

  • Check system resources (CPU, memory)
  • Review concurrent load
  • Check for long-running operations
  • Optimize database queries

Next Steps