import os import requests import json from datetime import datetime, timedelta from clickhouse_connect import get_client from dotenv import load_dotenv load_dotenv() # เชื่อมต่อ ClickHouse def get_clickhouse_client(): return get_client( host=os.getenv("CLICKHOUSE_HOST").replace("https://", ""), port=443, username=os.getenv("CLICKHOUSE_USER"), password=os.getenv("CLICKHOUSE_PASSWORD"), secure=True ) # ดึง record ล่าสุดจาก ClickHouse def get_latest_record(client): query = """ SELECT * FROM air_quality_db ORDER BY record_time DESC LIMIT 1 """ result = client.query(query) named_results = list(result.named_results()) # แปลง generator เป็น list if named_results: return named_results[0] # เข้าถึงผลลัพธ์แรก return None # เรียกใช้งาน MindsDB API def predict_aqi(latest_record): from requests.auth import HTTPBasicAuth # อ่านค่าจาก .env mindsdb_api_url = os.getenv("MINDSDB_REST_API_URL") mindsdb_user = os.getenv("MINDSDB_USER") mindsdb_pass = os.getenv("MINDSDB_PASSWORD") headers = {"Content-Type": "application/json"} auth = HTTPBasicAuth(mindsdb_user, mindsdb_pass) record_time = latest_record['record_time'] if isinstance(record_time, datetime): record_time_str = record_time.isoformat() else: record_time_str = str(record_time) # เตรียมข้อมูลในรูปแบบที่ MindsDB คาดหวัง input_data = { "data": [ { "record_time": record_time_str, "pm25": latest_record.get("pm25", 0), "pm10": latest_record.get("pm10", 0), "o3": latest_record.get("o3", 0), "co": latest_record.get("co", 0), "no2": latest_record.get("no2", 0), "so2": latest_record.get("so2", 0), "aqi": latest_record.get("aqi", 0), # หาก aqi เป็น target variable ที่ใช้ train ให้ส่งมาด้วย } ] } try: response = requests.post( mindsdb_api_url, headers=headers, auth=auth, timeout=20, data=json.dumps(input_data) ) if response.status_code == 200: data = response.json() if isinstance(data, list) and len(data) > 0: return data[0] # สมมุติว่า API คืนผลลัพธ์ใน list elif "data" in data and len(data["data"]) > 0: return data["data"][0] else: print("⚠️ No prediction returned.") else: print("❌ Failed to query MindsDB:", response.status_code, response.text) except requests.exceptions.RequestException as e: print("❌ Request error:", str(e)) return None # บันทึกผลทำนายลง ClickHouse def store_prediction(client, predicted_data, target_datetime): aqi = predicted_data.get('aqi') aqi_explain = predicted_data.get('aqi_explain') if isinstance(aqi_explain, dict): aqi_explain_json = json.dumps(aqi_explain) else: aqi_explain_json = aqi_explain insert_query = """ INSERT INTO air_quality_forecast (predicted_for_date, predicted_aqi, aqi_explain) VALUES """ client.command( insert_query + f"('{target_datetime.isoformat()}', {aqi}, '{aqi_explain_json}')" ) print(f"✅ Prediction for {target_datetime} saved to ClickHouse.") def run_prediction_pipeline(): client = get_clickhouse_client() latest_record = get_latest_record(client) if not latest_record: print("❌ No recent data found in air_quality_db.") return prediction = predict_aqi(latest_record) if prediction: record_time = latest_record['record_time'] if isinstance(record_time, str): record_time = datetime.fromisoformat(record_time) target_datetime = record_time + timedelta(days=1) store_prediction(client, prediction, target_datetime) if __name__ == "__main__": run_prediction_pipeline()