diff --git a/README.md b/README.md index f4b5c75..473cff8 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ ## 📈 ตัวอย่าง Dashboard -ตัวอย่างการแสดงข้อมูลที่เก็บใน ClickHouse ผ่านทาง Dashboard: +ตัวอย่างการสร้าง Dashboard ที่แสดงข้อมูลคุณภาพอากาศและการทำนาย AQI ในอนาคตโดยใช้ข้อมูลจาก ClickHouse และ AI สนับสนุน Data-driven decision: https://metabase.softwarecraft.tech/public/dashboard/bb9e4187-f60e-46f2-ba79-5e220ebe0684 --- diff --git a/clickhouse/air_quality_forecast.py b/clickhouse/air_quality_forecast.py new file mode 100644 index 0000000..59a6e4f --- /dev/null +++ b/clickhouse/air_quality_forecast.py @@ -0,0 +1,55 @@ +import clickhouse_connect +from dotenv import load_dotenv +import os + +load_dotenv() + +# ----- สร้าง Table ถ้ายังไม่มี ----- +def air_quality_forecast(): + try: + # อ่านค่าจาก .env + ch_host = os.getenv("CLICKHOUSE_HOST").replace("https://", "") + ch_user = os.getenv("CLICKHOUSE_USER") + ch_password = os.getenv("CLICKHOUSE_PASSWORD") + + # สร้าง client ด้วย clickhouse-connect + client = clickhouse_connect.get_client( + host=ch_host, # ระบุ Host โดยไม่ต้องใช้ https:// อีก + port=443, # ใช้พอร์ต HTTPS 443 + username=ch_user, # ชื่อผู้ใช้ + password=ch_password, # รหัสผ่าน + secure=True # ใช้การเชื่อมต่อที่ปลอดภัย (HTTPS) + ) + + # สร้าง Table ด้วยคำสั่ง SQL + create_table_sql = """ + CREATE TABLE IF NOT EXISTS air_quality_forecast ( + predicted_for_date Date, -- วันที่ที่ทำการทำนาย + predicted_aqi Int32, -- ค่า AQI ที่ทำนายได้ + aqi_explain String, -- คำอธิบายหรือเหตุผลจากโมเดล + predicted_at DateTime DEFAULT now() -- วันที่-เวลาที่สร้างบันทึกนี้ + ) ENGINE = MergeTree() + ORDER BY predicted_for_date; + """ + client.command(create_table_sql) + verify_air_quality_forecast(client) + + except Exception as e: + print("❌ ClickHouse Error:", e) + +def verify_air_quality_forecast(client): + try: + verify_table_sql = """ + SHOW TABLES LIKE 'air_quality_forecast' + """ + result = client.command(verify_table_sql) + + if result.strip(): + print("✅ Table 'air_quality_forecast' exists.") + else: + print("❌ Table 'air_quality_forecast' not found.") + except Exception as e: + print("❌ Error while verifying table:", e) + +if __name__ == '__main__': + air_quality_forecast() \ No newline at end of file diff --git a/img.png b/img.png deleted file mode 100644 index 9e20f70..0000000 Binary files a/img.png and /dev/null differ diff --git a/pipelines/predict_and_store.py b/pipelines/predict_and_store.py new file mode 100644 index 0000000..374d8f5 --- /dev/null +++ b/pipelines/predict_and_store.py @@ -0,0 +1,127 @@ +import os +import requests +import json +from datetime import datetime, timedelta +from clickhouse_connect import get_client +from dotenv import load_dotenv +from requests.auth import HTTPBasicAuth + +load_dotenv() + +# เชื่อมต่อ ClickHouse +def get_clickhouse_client(): + return get_client( + host=os.getenv("CLICKHOUSE_HOST").replace("https://", ""), + port=443, + username=os.getenv("CLICKHOUSE_USER"), + password=os.getenv("CLICKHOUSE_PASSWORD"), + secure=True + ) + +# ดึง record ล่าสุดจาก ClickHouse +def get_latest_record(client): + query = """ + SELECT * + FROM air_quality_db + ORDER BY record_time DESC + LIMIT 1 + """ + result = client.query(query) + named_results = list(result.named_results()) # แปลง generator เป็น list + if named_results: + return named_results[0] # เข้าถึงผลลัพธ์แรก + return None + +# เรียกใช้งาน MindsDB API +def predict_aqi(latest_record): + from requests.auth import HTTPBasicAuth + + # อ่านค่าจาก .env + mindsdb_api_url = os.getenv("MINDSDB_REST_API_URL", "https://mindsdb.softwarecraft.tech/api/projects/mindsdb/models/aqi_forecaster/predict") + mindsdb_user = os.getenv("MINDSDB_USER") + mindsdb_pass = os.getenv("MINDSDB_PASSWORD") + + headers = {"Content-Type": "application/json"} + auth = HTTPBasicAuth(mindsdb_user, mindsdb_pass) + + record_time = latest_record['record_time'] + if isinstance(record_time, datetime): + record_time_str = record_time.isoformat() + else: + record_time_str = str(record_time) + + # เตรียมข้อมูลในรูปแบบที่ MindsDB คาดหวัง + input_data = { + "data": [ + { + "record_time": record_time_str, + "pm25": latest_record.get("pm25", 0), + "pm10": latest_record.get("pm10", 0), + "o3": latest_record.get("o3", 0), + "co": latest_record.get("co", 0), + "no2": latest_record.get("no2", 0), + "so2": latest_record.get("so2", 0), + "aqi": latest_record.get("aqi", 0), # หาก aqi เป็น target variable ที่ใช้ train ให้ส่งมาด้วย + } + ] + } + + try: + response = requests.post( + mindsdb_api_url, + headers=headers, + auth=auth, + timeout=20, + data=json.dumps(input_data) + ) + if response.status_code == 200: + data = response.json() + if isinstance(data, list) and len(data) > 0: + return data[0] # สมมุติว่า API คืนผลลัพธ์ใน list + elif "data" in data and len(data["data"]) > 0: + return data["data"][0] + else: + print("⚠️ No prediction returned.") + else: + print("❌ Failed to query MindsDB:", response.status_code, response.text) + except requests.exceptions.RequestException as e: + print("❌ Request error:", str(e)) + + return None + +# บันทึกผลทำนายลง ClickHouse +def store_prediction(client, predicted_data, target_date): + aqi = predicted_data.get('aqi') + aqi_explain = predicted_data.get('aqi_explain') + + if isinstance(aqi_explain, dict): + aqi_explain_json = json.dumps(aqi_explain) + else: + aqi_explain_json = aqi_explain # assume already JSON string + + insert_query = """ + INSERT INTO air_quality_forecast (predicted_for_date, predicted_aqi, aqi_explain) + VALUES \ + """ + client.command( + insert_query + f"('{target_date}', {aqi}, '{aqi_explain_json}')" + ) + print(f"✅ Prediction for {target_date} saved to ClickHouse.") + +def run_prediction_pipeline(): + client = get_clickhouse_client() + latest_record = get_latest_record(client) + if not latest_record: + print("❌ No recent data found in air_quality_db.") + return + + prediction = predict_aqi(latest_record) + if prediction: + record_time = latest_record['record_time'] + if isinstance(record_time, str): + record_time = datetime.fromisoformat(record_time) + target_date = (record_time + timedelta(days=1)).date() + store_prediction(client, prediction, target_date) + +if __name__ == "__main__": + run_prediction_pipeline()