Health Check & Monitoring
Monitor the health and availability of the SpeechLytics API and related services.
Overview
The Health Check endpoint provides:
- System availability status
- Service component health
- Queue status and metrics
- Processing statistics
- Error diagnostics
Health Check Endpoint
Request
curl -X GET "https://api.example.com/api/v1/health"
No authentication required for health check endpoint.
Response
{
"database": true,
"hangfire": true,
"isLive": true,
"queueSize": 125,
"rabbitMq": true,
"errorCode": null,
"averageProcessingTime": 2.5
}
Response Fields
| Field | Type | Description |
|---|---|---|
| database | boolean | Database connectivity status |
| hangfire | boolean | Background job processor status |
| isLive | boolean | Live transcription service status |
| queueSize | integer | Current number of jobs in queue |
| rabbitMq | boolean | Message queue service status |
| errorCode | string | null | Error code if any issues detected |
| averageProcessingTime | number | Average processing time in seconds |
Status Interpretation
Healthy Status
{
"database": true,
"hangfire": true,
"isLive": true,
"queueSize": 10,
"rabbitMq": true,
"errorCode": null,
"averageProcessingTime": 1.8
}
All services operational, normal queue size
Degraded Status
{
"database": true,
"hangfire": true,
"isLive": true,
"queueSize": 500,
"rabbitMq": true,
"errorCode": "QUEUE_BACKED_UP",
"averageProcessingTime": 5.2
}
Services running but processing slower due to backlog
Unhealthy Status
{
"database": true,
"hangfire": false,
"isLive": false,
"queueSize": 0,
"rabbitMq": false,
"errorCode": "SERVICE_UNAVAILABLE",
"averageProcessingTime": 0
}
Critical services down, API unavailable
Monitoring Implementation
Python Health Monitor
import requests
import time
from datetime import datetime
class HealthMonitor:
def __init__(self, api_base, check_interval=60):
self.api_base = api_base
self.check_interval = check_interval
self.health_history = []
def check_health(self):
"""Get current health status"""
try:
response = requests.get(
f"{self.api_base}/api/v1/health",
timeout=5
)
return response.json()
except Exception as e:
return {
'error': str(e),
'timestamp': datetime.now().isoformat()
}
def is_healthy(self):
"""Check if all critical services are healthy"""
health = self.check_health()
return (
health.get('database') and
health.get('hangfire') and
health.get('rabbitMq') and
health.get('errorCode') is None
)
def get_status_summary(self):
"""Get human-readable status"""
health = self.check_health()
status = {
'healthy': self.is_healthy(),
'database': 'OK' if health.get('database') else 'DOWN',
'hangfire': 'OK' if health.get('hangfire') else 'DOWN',
'live': 'OK' if health.get('isLive') else 'DOWN',
'queue': health.get('queueSize', 0),
'rabbitMq': 'OK' if health.get('rabbitMq') else 'DOWN',
'avgProcessTime': f"{health.get('averageProcessingTime', 0):.2f}s",
'error': health.get('errorCode')
}
return status
def start_monitoring(self, callback=None):
"""Start continuous monitoring"""
while True:
health = self.check_health()
self.health_history.append({
'timestamp': datetime.now().isoformat(),
'health': health
})
if callback:
callback(self.get_status_summary())
time.sleep(self.check_interval)
# Usage
monitor = HealthMonitor("https://api.example.com")
def alert_handler(status):
"""Handle health alerts"""
if not status['healthy']:
print(f"⚠️ ALERT: System unhealthy")
print(f" Database: {status['database']}")
print(f" Hangfire: {status['hangfire']}")
print(f" Queue Size: {status['queue']}")
else:
print(f"✓ System healthy - Queue: {status['queue']}, Avg Time: {status['avgProcessTime']}")
# Check current health
status = monitor.get_status_summary()
print("Current Status:", status)
# Start monitoring (in background thread)
import threading
monitor_thread = threading.Thread(
target=monitor.start_monitoring,
args=(alert_handler,),
daemon=True
)
monitor_thread.start()
Service Descriptions
Database
- Purpose: Stores transcripts, metadata, and system data
- Impact if down: No data persistence; jobs fail
- Typical timeout: <500ms
Hangfire
- Purpose: Distributed background job processing
- Impact if down: Transcription queue not processed
- Typical timeout: <1s
Live Transcription Service (isLive)
- Purpose: Real-time transcription WebSocket connections
- Impact if down: Live transcription unavailable
- Typical timeout: <1s
RabbitMQ
- Purpose: Message queue for job distribution
- Impact if down: Job distribution fails
- Typical timeout: <500ms
Queue Management
Understanding Queue Size
| Size | Meaning | Action |
|---|---|---|
| 0-10 | Normal | No action needed |
| 10-100 | Moderate load | Monitor closely |
| 100-500 | Heavy load | Consider reducing submissions |
| 500+ | Severe backlog | Contact support, reduce load |
Queue Monitoring Strategy
def monitor_queue_health(api_base):
"""Monitor and alert on queue issues"""
monitor = HealthMonitor(api_base, check_interval=10)
queue_warning_threshold = 100
queue_critical_threshold = 500
while True:
health = monitor.check_health()
queue_size = health.get('queueSize', 0)
if queue_size >= queue_critical_threshold:
print(f"🔴 CRITICAL: Queue size {queue_size}")
# Take action: notify admin, slow down submissions
elif queue_size >= queue_warning_threshold:
print(f"🟠 WARNING: Queue size {queue_size}")
# Take action: monitor more frequently
else:
print(f"🟢 HEALTHY: Queue size {queue_size}")
time.sleep(10)
Processing Time Analysis
Understanding averageProcessingTime
- < 2 seconds: Optimal
- 2-5 seconds: Normal
- 5-10 seconds: Acceptable with load
- > 10 seconds: Degraded performance
Trend Analysis
def analyze_processing_trends(api_base, history_length=100):
"""Analyze processing time trends"""
monitor = HealthMonitor(api_base)
times = []
for _ in range(history_length):
health = monitor.check_health()
avg_time = health.get('averageProcessingTime', 0)
times.append(avg_time)
time.sleep(1)
# Calculate statistics
import statistics
stats = {
'mean': statistics.mean(times),
'median': statistics.median(times),
'stdev': statistics.stdev(times) if len(times) > 1 else 0,
'min': min(times),
'max': max(times)
}
return stats
Error Codes
| Error Code | Meaning | Resolution |
|---|---|---|
| null | No errors | System operating normally |
| DATABASE_ERROR | Database connectivity issue | Check database connection |
| QUEUE_BACKED_UP | High job queue backlog | Wait or reduce load |
| SERVICE_UNAVAILABLE | Critical service down | Contact support |
| MEMORY_PRESSURE | System memory constraint | Clear cache, reduce load |
| HIGH_LATENCY | Processing delays | Monitor and wait |
| RATE_LIMIT | Rate limiting active | Reduce request rate |
Alerting Rules
Create Custom Alerts
class HealthAlert:
def __init__(self, api_base, email_config):
self.monitor = HealthMonitor(api_base)
self.email_config = email_config
self.alerts_sent = {}
def check_alert_condition(self, condition_name, condition_func):
"""Check if alert condition is met"""
if condition_func():
self._send_alert(condition_name)
return True
return False
def _send_alert(self, condition_name):
"""Send alert notification"""
# Prevent alert spam
if condition_name in self.alerts_sent:
if time.time() - self.alerts_sent[condition_name] < 300: # 5 minutes
return
# Send email
self._send_email(
subject=f"SpeechLytics Alert: {condition_name}",
body=self._format_alert(condition_name)
)
self.alerts_sent[condition_name] = time.time()
def _format_alert(self, condition_name):
"""Format alert message"""
status = self.monitor.get_status_summary()
return f"""
Alert: {condition_name}
System Status:
- Database: {status['database']}
- Hangfire: {status['hangfire']}
- Queue Size: {status['queue']}
- RabbitMQ: {status['rabbitMq']}
- Error: {status['error']}
Please investigate immediately.
"""
def _send_email(self, subject, body):
"""Send email alert"""
# Implement email sending
pass
# Usage
alerter = HealthAlert(
"https://api.example.com",
email_config={'to': 'ops@company.com'}
)
# Monitor in loop
while True:
alerter.check_alert_condition(
"Database Down",
lambda: not alerter.monitor.check_health().get('database', False)
)
alerter.check_alert_condition(
"Queue Backlog",
lambda: alerter.monitor.check_health().get('queueSize', 0) > 500
)
time.sleep(30)
Dashboard Integration
Health Status Widget
def get_health_widget(api_base):
"""Get health status for dashboard"""
monitor = HealthMonitor(api_base)
status = monitor.get_status_summary()
widget = {
'title': 'System Health',
'status': '🟢 Healthy' if status['healthy'] else '🔴 Unhealthy',
'components': {
'Database': status['database'],
'Processing': status['hangfire'],
'Live Service': status['live'],
'Message Queue': status['rabbitMq']
},
'queue': {
'current': status['queue'],
'avgProcessTime': status['avgProcessTime']
}
}
return widget
Best Practices
1. Regular Monitoring
- Check health every 30-60 seconds
- Log health history
- Track trends over time
- Set up alerts
2. Alert Configuration
- Critical: Any service down
- Warning: Queue > 100
- Info: Processing time > 5s
- Don't spam alerts (cooldown)
3. Response Procedures
- Document escalation paths
- Have on-call rotation
- Maintain runbooks
- Track MTTR (Mean Time To Resolve)
4. Capacity Planning
- Monitor long-term trends
- Forecast capacity needs
- Plan scaling ahead
- Test failover procedures
Troubleshooting
Database Showing Down
- Check database connectivity
- Verify firewall rules
- Check database process
- Review error logs
Queue Growing Indefinitely
- Check if Hangfire is processing
- Review job error logs
- Manually clear stuck jobs
- Contact support if persists
High Processing Times
- Check system resources (CPU, memory)
- Review concurrent load
- Check for long-running operations
- Optimize database queries
Next Steps
- Speech-to-Text - Start transcribing
- Authentication - Set up API access
- Insights - Analytics and reporting
- API Reference - Full documentation