From 0af387adcc573f1bcbd5eeb48feeb61029fb9daa Mon Sep 17 00:00:00 2001 From: Flook Date: Sun, 11 May 2025 06:54:56 +0700 Subject: [PATCH] =?UTF-8?q?=E0=B9=80=E0=B8=9E=E0=B8=B4=E0=B9=88=E0=B8=A1?= =?UTF-8?q?=20AI=20=E0=B8=97=E0=B8=B5=E0=B9=88=E0=B9=80=E0=B8=81=E0=B8=B4?= =?UTF-8?q?=E0=B8=94=E0=B8=88=E0=B8=B2=E0=B8=81=E0=B8=82=E0=B9=89=E0=B8=AD?= =?UTF-8?q?=E0=B8=A1=E0=B8=B9=E0=B8=A5=20clickhouse=20=E0=B8=AA=E0=B8=B3?= =?UTF-8?q?=E0=B8=AB=E0=B8=A3=E0=B8=B1=E0=B8=9A=E0=B9=81=E0=B8=AA=E0=B8=94?= =?UTF-8?q?=E0=B8=87=E0=B8=9C=E0=B8=A5=E0=B9=83=E0=B8=99=20Dashboard?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- clickhouse/air_quality_forecast.py | 55 +++++++++++++ img.png | Bin 227 -> 0 bytes pipelines/predict_and_store.py | 127 +++++++++++++++++++++++++++++ 4 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 clickhouse/air_quality_forecast.py delete mode 100644 img.png create mode 100644 pipelines/predict_and_store.py diff --git a/README.md b/README.md index f4b5c75..473cff8 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ ## 📈 ตัวอย่าง Dashboard -ตัวอย่างการแสดงข้อมูลที่เก็บใน ClickHouse ผ่านทาง Dashboard: +ตัวอย่างการสร้าง Dashboard ที่แสดงข้อมูลคุณภาพอากาศและการทำนาย AQI ในอนาคตโดยใช้ข้อมูลจาก ClickHouse และ AI สนับสนุน Data-driven decision: https://metabase.softwarecraft.tech/public/dashboard/bb9e4187-f60e-46f2-ba79-5e220ebe0684 --- diff --git a/clickhouse/air_quality_forecast.py b/clickhouse/air_quality_forecast.py new file mode 100644 index 0000000..59a6e4f --- /dev/null +++ b/clickhouse/air_quality_forecast.py @@ -0,0 +1,55 @@ +import clickhouse_connect +from dotenv import load_dotenv +import os + +load_dotenv() + +# ----- สร้าง Table ถ้ายังไม่มี ----- +def air_quality_forecast(): + try: + # อ่านค่าจาก .env + ch_host = os.getenv("CLICKHOUSE_HOST").replace("https://", "") + ch_user = os.getenv("CLICKHOUSE_USER") + ch_password = os.getenv("CLICKHOUSE_PASSWORD") + + # สร้าง client ด้วย clickhouse-connect + client = clickhouse_connect.get_client( + host=ch_host, # ระบุ Host โดยไม่ต้องใช้ https:// อีก + port=443, # ใช้พอร์ต HTTPS 443 + username=ch_user, # ชื่อผู้ใช้ + password=ch_password, # รหัสผ่าน + secure=True # ใช้การเชื่อมต่อที่ปลอดภัย (HTTPS) + ) + + # สร้าง Table ด้วยคำสั่ง SQL + create_table_sql = """ + CREATE TABLE IF NOT EXISTS air_quality_forecast ( + predicted_for_date Date, -- วันที่ที่ทำการทำนาย + predicted_aqi Int32, -- ค่า AQI ที่ทำนายได้ + aqi_explain String, -- คำอธิบายหรือเหตุผลจากโมเดล + predicted_at DateTime DEFAULT now() -- วันที่-เวลาที่สร้างบันทึกนี้ + ) ENGINE = MergeTree() + ORDER BY predicted_for_date; + """ + client.command(create_table_sql) + verify_air_quality_forecast(client) + + except Exception as e: + print("❌ ClickHouse Error:", e) + +def verify_air_quality_forecast(client): + try: + verify_table_sql = """ + SHOW TABLES LIKE 'air_quality_forecast' + """ + result = client.command(verify_table_sql) + + if result.strip(): + print("✅ Table 'air_quality_forecast' exists.") + else: + print("❌ Table 'air_quality_forecast' not found.") + except Exception as e: + print("❌ Error while verifying table:", e) + +if __name__ == '__main__': + air_quality_forecast() \ No newline at end of file diff --git a/img.png b/img.png deleted file mode 100644 index 9e20f70179bad8e46ce2c9a0e0686bb445d5c8d5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 227 zcmeAS@N?(olHy`uVBq!ia0y~yV2%Z{tvQ&0B!5i#B_OrR)5S5QBJS-$Mn)j7N#Xzc hJP!vDbC7`p{fxoO7#OEqHc 0: + return data[0] # สมมุติว่า API คืนผลลัพธ์ใน list + elif "data" in data and len(data["data"]) > 0: + return data["data"][0] + else: + print("⚠️ No prediction returned.") + else: + print("❌ Failed to query MindsDB:", response.status_code, response.text) + except requests.exceptions.RequestException as e: + print("❌ Request error:", str(e)) + + return None + +# บันทึกผลทำนายลง ClickHouse +def store_prediction(client, predicted_data, target_date): + aqi = predicted_data.get('aqi') + aqi_explain = predicted_data.get('aqi_explain') + + if isinstance(aqi_explain, dict): + aqi_explain_json = json.dumps(aqi_explain) + else: + aqi_explain_json = aqi_explain # assume already JSON string + + insert_query = """ + INSERT INTO air_quality_forecast (predicted_for_date, predicted_aqi, aqi_explain) + VALUES \ + """ + client.command( + insert_query + f"('{target_date}', {aqi}, '{aqi_explain_json}')" + ) + print(f"✅ Prediction for {target_date} saved to ClickHouse.") + +def run_prediction_pipeline(): + client = get_clickhouse_client() + latest_record = get_latest_record(client) + if not latest_record: + print("❌ No recent data found in air_quality_db.") + return + + prediction = predict_aqi(latest_record) + if prediction: + record_time = latest_record['record_time'] + if isinstance(record_time, str): + record_time = datetime.fromisoformat(record_time) + target_date = (record_time + timedelta(days=1)).date() + store_prediction(client, prediction, target_date) + +if __name__ == "__main__": + run_prediction_pipeline()