From d90b233bbc277f0c7b171175bcd6aa97f27c399c Mon Sep 17 00:00:00 2001 From: Flook Date: Mon, 12 May 2025 09:46:18 +0700 Subject: [PATCH] =?UTF-8?q?=E0=B8=9B=E0=B8=A3=E0=B8=B1=E0=B8=9A=E0=B8=95?= =?UTF-8?q?=E0=B8=B2=E0=B8=A3=E0=B8=B2=E0=B8=87=E0=B9=83=E0=B8=AB=E0=B9=89?= =?UTF-8?q?=E0=B9=81=E0=B8=AA=E0=B8=94=E0=B8=87=E0=B8=82=E0=B9=89=E0=B8=AD?= =?UTF-8?q?=E0=B8=A1=E0=B8=B9=E0=B8=A5=E0=B9=80=E0=B8=A7=E0=B8=A5=E0=B8=B2?= =?UTF-8?q?=E0=B8=97=E0=B8=B3=E0=B8=99=E0=B8=B2=E0=B8=A2=E0=B9=84=E0=B8=94?= =?UTF-8?q?=E0=B9=89=20=E0=B8=AA=E0=B8=B3=E0=B8=AB=E0=B8=A3=E0=B8=B1?= =?UTF-8?q?=E0=B8=9A=E0=B9=81=E0=B8=AA=E0=B8=94=E0=B8=87=E0=B8=9C=E0=B8=A5?= =?UTF-8?q?=E0=B9=83=E0=B8=99=20Dashboard?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- clickhouse/air_quality_forecast.py | 42 +++++++++++++++++------------- pipelines/predict_and_store.py | 15 +++++------ 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/clickhouse/air_quality_forecast.py b/clickhouse/air_quality_forecast.py index 59a6e4f..1be47b7 100644 --- a/clickhouse/air_quality_forecast.py +++ b/clickhouse/air_quality_forecast.py @@ -4,7 +4,7 @@ import os load_dotenv() -# ----- สร้าง Table ถ้ายังไม่มี ----- +# ----- สร้าง Table ใหม่ โดยลบของเดิมถ้ามี ----- def air_quality_forecast(): try: # อ่านค่าจาก .env @@ -14,34 +14,40 @@ def air_quality_forecast(): # สร้าง client ด้วย clickhouse-connect client = clickhouse_connect.get_client( - host=ch_host, # ระบุ Host โดยไม่ต้องใช้ https:// อีก - port=443, # ใช้พอร์ต HTTPS 443 - username=ch_user, # ชื่อผู้ใช้ - password=ch_password, # รหัสผ่าน - secure=True # ใช้การเชื่อมต่อที่ปลอดภัย (HTTPS) + host=ch_host, + port=443, + username=ch_user, + password=ch_password, + secure=True ) - # สร้าง Table ด้วยคำสั่ง SQL + # 🔥 ลบตารางเดิม (ถ้ามี) ก่อนสร้างใหม่ + drop_table_sql = "DROP TABLE IF EXISTS air_quality_forecast" + client.command(drop_table_sql) + print("🗑️ Table 'air_quality_forecast' dropped (if existed).") + + # ✅ สร้าง Table ใหม่ที่ใช้ DateTime create_table_sql = """ - CREATE TABLE IF NOT EXISTS air_quality_forecast ( - predicted_for_date Date, -- วันที่ที่ทำการทำนาย - predicted_aqi Int32, -- ค่า AQI ที่ทำนายได้ - aqi_explain String, -- คำอธิบายหรือเหตุผลจากโมเดล - predicted_at DateTime DEFAULT now() -- วันที่-เวลาที่สร้างบันทึกนี้ - ) ENGINE = MergeTree() - ORDER BY predicted_for_date; + CREATE TABLE air_quality_forecast ( + predicted_for_date DateTime, -- วันที่-เวลาที่ทำการทำนาย + predicted_aqi Int32, -- ค่า AQI ที่ทำนายได้ + aqi_explain String, -- คำอธิบายหรือเหตุผลจากโมเดล + predicted_at DateTime DEFAULT now() -- วันที่-เวลาที่สร้างบันทึกนี้ + ) ENGINE = MergeTree() + ORDER BY predicted_for_date; \ """ client.command(create_table_sql) + print("✅ Table 'air_quality_forecast' created successfully.") + verify_air_quality_forecast(client) except Exception as e: print("❌ ClickHouse Error:", e) +# ----- ตรวจสอบการสร้าง Table ----- def verify_air_quality_forecast(client): try: - verify_table_sql = """ - SHOW TABLES LIKE 'air_quality_forecast' - """ + verify_table_sql = "SHOW TABLES LIKE 'air_quality_forecast'" result = client.command(verify_table_sql) if result.strip(): @@ -52,4 +58,4 @@ def verify_air_quality_forecast(client): print("❌ Error while verifying table:", e) if __name__ == '__main__': - air_quality_forecast() \ No newline at end of file + air_quality_forecast() diff --git a/pipelines/predict_and_store.py b/pipelines/predict_and_store.py index 374d8f5..82eb15b 100644 --- a/pipelines/predict_and_store.py +++ b/pipelines/predict_and_store.py @@ -4,7 +4,6 @@ import json from datetime import datetime, timedelta from clickhouse_connect import get_client from dotenv import load_dotenv -from requests.auth import HTTPBasicAuth load_dotenv() @@ -90,23 +89,23 @@ def predict_aqi(latest_record): return None # บันทึกผลทำนายลง ClickHouse -def store_prediction(client, predicted_data, target_date): +def store_prediction(client, predicted_data, target_datetime): aqi = predicted_data.get('aqi') aqi_explain = predicted_data.get('aqi_explain') if isinstance(aqi_explain, dict): aqi_explain_json = json.dumps(aqi_explain) else: - aqi_explain_json = aqi_explain # assume already JSON string + aqi_explain_json = aqi_explain insert_query = """ INSERT INTO air_quality_forecast (predicted_for_date, predicted_aqi, aqi_explain) - VALUES \ + VALUES """ client.command( - insert_query + f"('{target_date}', {aqi}, '{aqi_explain_json}')" + insert_query + f"('{target_datetime.isoformat()}', {aqi}, '{aqi_explain_json}')" ) - print(f"✅ Prediction for {target_date} saved to ClickHouse.") + print(f"✅ Prediction for {target_datetime} saved to ClickHouse.") def run_prediction_pipeline(): client = get_clickhouse_client() @@ -120,8 +119,8 @@ def run_prediction_pipeline(): record_time = latest_record['record_time'] if isinstance(record_time, str): record_time = datetime.fromisoformat(record_time) - target_date = (record_time + timedelta(days=1)).date() - store_prediction(client, prediction, target_date) + target_datetime = record_time + timedelta(days=1) + store_prediction(client, prediction, target_datetime) if __name__ == "__main__": run_prediction_pipeline()