help-desk/backend/api/services/health_service.py
2025-11-18 21:53:33 +00:00

102 lines
4.3 KiB
Python

import os
import time
from django.db import connection, Error as DjangoDBError
from django.core.cache import cache
from django.conf import settings
import boto3
from botocore.client import Config
from botocore.exceptions import ClientError
from datetime import datetime, timezone
# สร้าง Exception ขึ้นมาใหม่แทนการใช้ของ model_registry
class HealthCheckError(Exception):
"""Custom exception raised for health check errors."""
pass
class HealthService:
def __init__(self):
pass
def _check_database(self):
# Logic ตรวจสอบ CockroachDB
start_time = time.time()
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
# ใช้ round() ปกติ
latency = round((time.time() - start_time) * 1000, 2)
return "UP", f"Query executed successfully. Latency: {latency}ms"
except DjangoDBError as e:
return "DOWN", f"DB Connection Error: {e}"
except Exception as e:
return "DOWN", f"Unknown DB Error: {e}"
def _check_cache(self):
# Logic ตรวจสอบ Redis
start_time = time.time()
test_key = 'health_test_key'
try:
cache.set(test_key, 'ok', timeout=1)
result = cache.get(test_key)
latency = round((time.time() - start_time) * 1000, 2)
if result == 'ok':
return "UP", f"Read/Write successful. Latency: {latency}ms"
return "DOWN", "Failed to retrieve test key."
except Exception as e:
return "DOWN", f"Redis Error: {e}"
def _check_minio(self):
"""Logic ตรวจสอบ Object Storage (MinIO) โดยใช้ boto3"""
try:
# 1. สร้าง S3 Client ด้วย boto3
s3_client = boto3.client(
"s3",
endpoint_url=os.getenv("MINIO_ENDPOINT", "http://localhost:9000"),
aws_access_key_id=os.getenv("MINIO_ACCESS_KEY", "minio_admin"),
aws_secret_access_key=os.getenv("MINIO_SECRET_KEY", "minio_p@ssw0rd!"),
# ใช้ Config เพื่อจัดการ timeout/signature version
config=Config(signature_version="s3v4", connect_timeout=5, read_timeout=10)
)
# ควรเปลี่ยนชื่อ MODEL_BUCKET เป็นชื่อที่สื่อถึงการใช้งานอื่น (แผนอนาคต)
bucket_name = os.getenv("MODEL_BUCKET", "models")
# 2. ทดสอบการเข้าถึง Bucket
s3_client.head_bucket(Bucket=bucket_name)
return "UP", f"Bucket '{bucket_name}' accessible via boto3."
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
return "DOWN", f"Bucket '{bucket_name}' not found."
elif error_code == '403':
return "DOWN", f"MinIO S3 Access Denied. Check Key/Secret."
return "DOWN", f"MinIO S3 Error (Code {error_code}): {e}"
except Exception as e:
return "DOWN", f"MinIO Connection Error: {e}"
def get_system_health(self):
"""เมธอดหลักที่รวมผลลัพธ์ทั้งหมด"""
results = {}
overall_status = "Healthy"
# รัน Check ทั้งหมด
db_status, db_details = self._check_database()
results['database'] = {"name": "CockroachDB", "status": db_status, "details": db_details}
if db_status != 'UP': overall_status = "Degraded"
cache_status, cache_details = self._check_cache()
results['cache'] = {"name": "Redis Cache", "status": cache_status, "details": cache_details}
if cache_status != 'UP': overall_status = "Degraded"
minio_status, minio_details = self._check_minio()
results['storage'] = {"name": "MinIO S3", "status": minio_status, "details": minio_details}
if minio_status != 'UP': overall_status = "Degraded"
return {
"status": overall_status,
"services": results,
"last_checked": datetime.now(timezone.utc).isoformat()
}