From 2b7cdd34753d2ae1ba64bc448e888902c4d96839 Mon Sep 17 00:00:00 2001 From: Flook Date: Mon, 28 Apr 2025 17:10:48 +0700 Subject: [PATCH] Initial commit of th-air-quality-etl-ml app --- .gitignore | 35 ++++++++ README.md | 76 ++++++++++++++++ check_packages.py | 21 +++++ clickhouse/create_air_quality_table.py | 68 ++++++++++++++ pipelines/__init__.py | 0 pipelines/fetch_air_quality.py | 18 ++++ pipelines/load_to_clickhouse.py | 64 ++++++++++++++ pipelines/my_flow.py | 118 +++++++++++++++++++++++++ pipelines/save_to_minio.py | 32 +++++++ pipelines/send_to_kafka.py | 29 ++++++ pipelines/transform_clean.py | 68 ++++++++++++++ requirements.txt | 6 ++ test_connections.py | 64 ++++++++++++++ 13 files changed, 599 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 check_packages.py create mode 100644 clickhouse/create_air_quality_table.py create mode 100644 pipelines/__init__.py create mode 100644 pipelines/fetch_air_quality.py create mode 100644 pipelines/load_to_clickhouse.py create mode 100644 pipelines/my_flow.py create mode 100644 pipelines/save_to_minio.py create mode 100644 pipelines/send_to_kafka.py create mode 100644 pipelines/transform_clean.py create mode 100644 requirements.txt create mode 100644 test_connections.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d43c6ed --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +# Python compiled files +__pycache__/ +*.pyc +*.pyo +*.pyd +*.py[cod] + +# Virtual Environment +.venv/ +env/ +venv/ + +# Security +.env + +# IDE specific files +.idea/ +.vscode/ +*.suo +*.ntvs* +*.njsproj +*.sln +*.iml +*.sublime-workspace +*.sublime-project + +# Ignore unnecessary system files +.DS_Store +Thumbs.db +Desktop.ini +Icon? + +# OS-specific swap files +*.swp +*.swo diff --git a/README.md b/README.md new file mode 100644 index 0000000..4beef31 --- /dev/null +++ b/README.md @@ -0,0 +1,76 @@ +# Thailand Air Quality ETL & Streaming Pipeline + +โปรเจคนี้ออกแบบมาเพื่อทำ **ETL (Extract, Transform, Load)** และ **Streaming Pipeline** สำหรับข้อมูลคุณภาพอากาศในประเทศไทย +โดยสามารถนำข้อมูลที่ได้ไปใช้งานด้าน **Data Visualization** หรือ **AI Model Training** ในอนาคตได้ + +--- + +## 📋 ฟีเจอร์หลัก + +- **Extract:** ดึงข้อมูลคุณภาพอากาศจากแหล่งข้อมูล (API) +- **Save:** เก็บข้อมูลดิบ (Raw Data) ลง **MinIO** (S3-compatible storage) +- **Stream:** ส่งข้อมูลเข้า **Apache Kafka** ผ่าน **Kafka REST Proxy** +- **Transform:** แปลงข้อมูลให้อยู่ในรูปแบบที่เหมาะสม +- **Load:** บันทึกข้อมูลลงฐานข้อมูล **ClickHouse** เพื่อการวิเคราะห์ต่อไป + +--- + +## ⚙️ เทคโนโลยีที่ใช้ + +- **Python** (ETL Core) +- **ClickHouse** (Data Warehouse) +- **Kafka** (Message Streaming) +- **MinIO** (S3-compatible Object Storage) +- **Pydantic** (Data Validation) +- **requests** (API Communication) + +--- + +## 🛠️ โครงสร้างระบบ (Data Flow) + +```plaintext +[Air Quality API] + ↓ +[fetch_air_quality()] + ↓ +[save_to_minio()] + ↓ +[send_to_kafka()] + ↓ +[consume_kafka_and_process()] + ↓ +[transform_json()] + ↓ +[insert_to_clickhouse()] +``` + +## 📈 โอกาสในการต่อยอด + +โปรเจคนี้สามารถนำไปใช้ในการพัฒนาและขยายระบบได้ตามแนวทางต่าง ๆ ที่กล่าวถึงด้านล่าง: + +### 1. **เชื่อมต่อข้อมูลไปยังเครื่องมือ BI** + - สามารถใช้ **BI** เพื่อสร้างแดชบอร์ดสำหรับการวิเคราะห์ข้อมูลคุณภาพอากาศแบบ **Real-Time** หรือ **Historical Data** จาก **ClickHouse**. + - การเชื่อมต่อสามารถทำได้ง่าย ๆ โดยการตั้งค่า **Data Source** สำหรับ **ClickHouse** ในเครื่องมือ BI ที่เลือก โดยการตั้งค่าให้ตรงกับรายละเอียดการเชื่อมต่อในไฟล์ `.env` ที่ได้กำหนดไว้ในโปรเจค + - **BI** สามารถช่วยให้คุณสร้างการวิเคราะห์ในรูปแบบกราฟ แผนภูมิ และรายงานต่าง ๆ เพื่อแสดงผลคุณภาพอากาศในพื้นที่ต่าง ๆ + +### 2. **พัฒนาระบบ Real-Time Air Quality Monitoring Dashboard** + - ใช้ข้อมูลจาก **Kafka** ที่กำลังไหลเข้ามาในระบบเพื่อนำมาวิเคราะห์และแสดงผล **Real-Time** บน **Dashboard**. + - สามารถสร้าง **Web Dashboard** ที่แสดงผลแบบ **Live Updates** โดยการใช้งาน **Grafana** หรือ **Custom Web App** ที่ดึงข้อมูลจาก **Kafka** หรือ **ClickHouse**. + - **Real-Time Monitoring** สามารถใช้เพื่อเฝ้าระวังคุณภาพอากาศในแต่ละพื้นที่ได้ทันที พร้อมทั้งแสดงค่าดัชนีคุณภาพอากาศ (AQI) และสารมลพิษต่าง ๆ ที่มีผลกระทบต่อสุขภาพ + +### 3. **สร้างโมเดล AI เพื่อทำนายคุณภาพอากาศในอนาคต** + - ข้อมูลจาก **ClickHouse** สามารถนำไปฝึกโมเดล **AI/ML** สำหรับการทำนาย **Air Quality Index (AQI)** ในอนาคต โดยใช้ข้อมูลจากประวัติย้อนหลัง (Historical Data). + - โมเดลที่ได้สามารถช่วยในการคาดการณ์ระดับ **PM2.5**, **PM10**, และสารมลพิษอื่น ๆ เพื่อให้สามารถแจ้งเตือนล่วงหน้าหรือวางแผนเพื่อรับมือกับภาวะมลพิษ + - ตัวอย่างเทคนิคที่สามารถนำมาใช้ เช่น **Time Series Forecasting** ด้วยเครื่องมืออย่าง **ARIMA**, **Prophet**, หรือ **LSTM (Long Short-Term Memory)** + +### 4. **สร้างระบบแจ้งเตือนอัตโนมัติเมื่อ AQI เกินเกณฑ์** + - พัฒนาระบบ **Alert System** เพื่อแจ้งเตือนเมื่อค่า **AQI** หรือ **ระดับมลพิษ** เกินเกณฑ์ที่กำหนด + - ระบบสามารถใช้ **Kafka Consumer** ที่รับข้อมูล **Real-Time** เพื่อเช็คว่า AQI เกินเกณฑ์ที่ตั้งไว้หรือไม่ และแจ้งเตือนผ่าน **Email**, **SMS**, หรือ **Push Notification** (ผ่านช่องทางต่าง ๆ เช่น **Twilio**, **Firebase Cloud Messaging**, หรือ **Telegram Bot**) + - การแจ้งเตือนนี้สามารถช่วยในการรับมือกับปัญหามลพิษและเตือนประชาชนให้หลีกเลี่ยงพื้นที่ที่มีมลพิษสูง + +--- + +## 📜 License + +โปรเจคนี้แจกจ่ายภายใต้ **MIT License** + diff --git a/check_packages.py b/check_packages.py new file mode 100644 index 0000000..7717e5f --- /dev/null +++ b/check_packages.py @@ -0,0 +1,21 @@ +import importlib + +# รายชื่อแพ็กเกจที่ต้องตรวจสอบ +packages = [ + "pydantic", + "kafka", + "requests", + "dotenv", + "clickhouse_connect", + "minio" +] + +for package in packages: + try: + # ตรวจสอบการนำเข้าโมดูล + module = importlib.import_module(package) + # แสดงเวอร์ชัน (ถ้ามี attribute __version__) + version = getattr(module, "__version__", "ไม่พบข้อมูลเวอร์ชัน") + print(f"{package}: ติดตั้งเรียบร้อย (เวอร์ชัน: {version})") + except ImportError: + print(f"{package}: ยังไม่ได้ติดตั้ง") \ No newline at end of file diff --git a/clickhouse/create_air_quality_table.py b/clickhouse/create_air_quality_table.py new file mode 100644 index 0000000..4b9012c --- /dev/null +++ b/clickhouse/create_air_quality_table.py @@ -0,0 +1,68 @@ +import clickhouse_connect +from dotenv import load_dotenv +import os + +load_dotenv() + +# ----- สร้าง Table ถ้ายังไม่มี ----- +def create_air_quality_table(): + try: + # อ่านค่าจาก .env + ch_host = os.getenv("CLICKHOUSE_HOST").replace("https://", "") + ch_user = os.getenv("CLICKHOUSE_USER") + ch_password = os.getenv("CLICKHOUSE_PASSWORD") + + # สร้าง client ด้วย clickhouse-connect + client = clickhouse_connect.get_client( + host=ch_host, # ระบุ Host โดยไม่ต้องใช้ https:// อีก + port=443, # ใช้พอร์ต HTTPS 443 + username=ch_user, # ชื่อผู้ใช้ + password=ch_password, # รหัสผ่าน + secure=True # ใช้การเชื่อมต่อที่ปลอดภัย (HTTPS) + ) + + # สร้าง Table ด้วยคำสั่ง SQL + create_table_sql = """ + CREATE TABLE IF NOT EXISTS air_quality_db ( + station_id String, + station_nameTH String, + station_nameEN String, + areaTH String, + areaEN String, + station_type String, + latitude Float64, + longitude Float64, + pm25 Float32, + pm10 Float32, + o3 Float32, + co Float32, + no2 Float32, + so2 Float32, + aqi Int32, + main_pollutant String, + record_time DateTime + ) ENGINE = MergeTree() + ORDER BY (station_id, record_time) + """ + client.command(create_table_sql) + verify_air_quality_table(client) + + except Exception as e: + print("❌ ClickHouse Error:", e) + +def verify_air_quality_table(client): + try: + verify_table_sql = """ + SHOW TABLES LIKE 'air_quality_db' + """ + result = client.command(verify_table_sql) + + if result.strip(): + print("✅ Table 'air_quality_db' exists.") + else: + print("❌ Table 'air_quality_db' not found.") + except Exception as e: + print("❌ Error while verifying table:", e) + +if __name__ == '__main__': + create_air_quality_table() diff --git a/pipelines/__init__.py b/pipelines/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipelines/fetch_air_quality.py b/pipelines/fetch_air_quality.py new file mode 100644 index 0000000..144c64e --- /dev/null +++ b/pipelines/fetch_air_quality.py @@ -0,0 +1,18 @@ +import os +from dotenv import load_dotenv +import requests + +load_dotenv() + +def fetch_air_quality(): + url = os.getenv('AIR4THAI_URL') + if not url: + raise ValueError("Environment variable 'AIR4THAI_URL' is not set.") + + try: + response = requests.get(url) + response.raise_for_status() # ตรวจสอบว่าไม่มีข้อผิดพลาด + return response.json() + except requests.exceptions.RequestException as e: + print("Error fetching air quality data:", e) + return None diff --git a/pipelines/load_to_clickhouse.py b/pipelines/load_to_clickhouse.py new file mode 100644 index 0000000..1b887ce --- /dev/null +++ b/pipelines/load_to_clickhouse.py @@ -0,0 +1,64 @@ +from clickhouse_connect import get_client +import os +from dotenv import load_dotenv + +load_dotenv() + +# อ่านค่าจาก .env +ch_host = os.getenv("CLICKHOUSE_HOST").replace("https://", "") +ch_user = os.getenv("CLICKHOUSE_USER") +ch_password = os.getenv("CLICKHOUSE_PASSWORD") + +# สร้าง client ด้วย clickhouse-connect +clickhouse_client = get_client( + host=ch_host, + port=443, + username=ch_user, + password=ch_password, + secure=True +) + +def insert_to_clickhouse(record): + if not record: + print("⚠️ No data to insert.") + return + + def safe_float(value): + if value is None or value == -1 or value == -1.0: + return 0.0 + return float(value) + + try: + clickhouse_client.insert( + table='air_quality_db', + column_names=[ + 'station_id', 'station_nameTH', 'station_nameEN', + 'areaTH', 'areaEN', 'station_type', + 'latitude', 'longitude', + 'pm25', 'pm10', 'o3', 'co', 'no2', 'so2', + 'aqi', 'main_pollutant', 'record_time' + ], + data=[[ + record.get('station_id'), + record.get('station_nameTH'), + record.get('station_nameEN'), + record.get('areaTH'), + record.get('areaEN'), + record.get('station_type'), + safe_float(record.get('latitude')), + safe_float(record.get('longitude')), + safe_float(record.get('pm25')), + safe_float(record.get('pm10')), + safe_float(record.get('o3')), + safe_float(record.get('co')), + safe_float(record.get('no2')), + safe_float(record.get('so2')), + int(record.get('aqi')) if record.get('aqi', -1) != -1 else 0, + record.get('main_pollutant'), + record.get('record_time') + ]] + ) + print(f"Inserted 1 record to ClickHouse.") + except Exception as e: + print(f"Error inserting record into ClickHouse: {e}") + diff --git a/pipelines/my_flow.py b/pipelines/my_flow.py new file mode 100644 index 0000000..415c032 --- /dev/null +++ b/pipelines/my_flow.py @@ -0,0 +1,118 @@ +from pipelines.fetch_air_quality import fetch_air_quality +from pipelines.save_to_minio import save_to_minio +from pipelines.send_to_kafka import send_to_kafka +from pipelines.transform_clean import transform_json +from pipelines.load_to_clickhouse import insert_to_clickhouse + +import os +import json +import time +import requests +from requests.auth import HTTPBasicAuth +from dotenv import load_dotenv + +load_dotenv() + +auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS')) +url = f"{os.getenv('KAFKA_REST_PROXY_URL')}/consumers/my-group" + +def fetch_and_send_to_kafka(kafka_topic): + raw_data = fetch_air_quality() + if raw_data: + object_name = save_to_minio(raw_data) + send_to_kafka(kafka_topic, raw_data) + +def transform_and_load_to_clickhouse(raw_data): + transformed_data = transform_json(raw_data) + if transformed_data: + for record in transformed_data: + #print("record.model_dump(): ", record.model_dump()) + insert_to_clickhouse(record.model_dump()) + +def consume_kafka_and_process(kafka_topic): + # สร้าง Consumer Instance + headers = { + "Content-Type": "application/vnd.kafka.v2+json", + } + data = { + "name": "my_consumer_instance", + "format": "json", + "auto.offset.reset": "earliest", + } + + response = requests.post( + url, + headers=headers, + data=json.dumps(data), + auth=auth, + verify=False # ระวังถ้า SSL เป็น self-signed + ) + if response.status_code != 200: + print(f"Failed to create consumer instance: {response.text}") + return + + consumer_instance = response.json() + base_uri = consumer_instance['base_uri'] + + # Subscribe to topic + subscribe_data = { + "topics": [kafka_topic] + } + sub_resp = requests.post( + f"{base_uri}/subscription", + headers=headers, + data=json.dumps(subscribe_data), + auth=auth, + verify=False + ) + if sub_resp.status_code != 204: + print(f"Failed to subscribe to topic: {sub_resp.text}") + return + + print(f"✅ Subscribed to topic: {kafka_topic}") + + try: + while True: + # Poll for messages + resp = requests.get( + f"{base_uri}/records", + headers={"Accept": "application/vnd.kafka.json.v2+json"}, + auth=auth, + verify=False + ) + if resp.status_code == 200: + records = resp.json() + if records: + for record in records: + raw_data = record['value'] + + # แสดง raw message ที่รับมาจาก Kafka + #print(f"Raw message from Kafka: {raw_data}") + + transform_and_load_to_clickhouse(raw_data) + else: + print("ℹ️ No new records.") + + else: + print(f"⚠️ Error fetching records: {resp.text}") + time.sleep(2) # เพิ่มพัก 2 วินาที ก่อนวนรอบใหม่ + finally: + # Delete Consumer Instance when done + requests.delete( + base_uri, + headers={"Accept": "application/vnd.kafka.v2+json"}, + auth=auth, + verify=False + ) + +def run_flow(): + kafka_topic = "air4thai-stream" + + # Step 1: Fetch and send data to Kafka + fetch_and_send_to_kafka(kafka_topic) + + # Step 2: Consume data from Kafka and load to ClickHouse + consume_kafka_and_process(kafka_topic) + +if __name__ == "__main__": + run_flow() diff --git a/pipelines/save_to_minio.py b/pipelines/save_to_minio.py new file mode 100644 index 0000000..3f11baa --- /dev/null +++ b/pipelines/save_to_minio.py @@ -0,0 +1,32 @@ +import os +from dotenv import load_dotenv +from minio import Minio +import json +from io import BytesIO +from datetime import datetime, timezone + +load_dotenv() + +def save_to_minio(data): + bucket_name = "air-quality" + try: + minio_client = Minio( + endpoint=os.getenv('MINIO_ENDPOINT').replace('https://', '').replace('http://', ''), + access_key=os.getenv('MINIO_ACCESS_KEY'), + secret_key=os.getenv('MINIO_SECRET_KEY'), + secure=os.getenv('MINIO_ENDPOINT').startswith('https') + ) + # ใช้ timezone-aware datetime + timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d-%H%M%S') + object_name = f"raw/{timestamp}.json" + + # เปลี่ยนข้อมูลเป็น JSON และใช้ ensure_ascii=False เพื่อรองรับภาษาไทย + data_json = json.dumps(data, ensure_ascii=False, indent=4) + + # แปลงข้อมูล JSON เป็น bytes และบันทึกลงใน MinIO + data_bytes = BytesIO(data_json.encode('utf-8')) + minio_client.put_object(bucket_name, object_name, data_bytes, length=data_bytes.getbuffer().nbytes) + + return object_name + except Exception as e: + print("MinIO Error:", e) \ No newline at end of file diff --git a/pipelines/send_to_kafka.py b/pipelines/send_to_kafka.py new file mode 100644 index 0000000..1f03be5 --- /dev/null +++ b/pipelines/send_to_kafka.py @@ -0,0 +1,29 @@ +import os +import json +import requests +from dotenv import load_dotenv +from requests.auth import HTTPBasicAuth + +load_dotenv() + +KAFKA_REST_PROXY = "https://kafka-rest-proxy.softwarecraft.tech" # เปลี่ยนให้เป็น https:// +auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS')) + +def send_to_kafka(topic, data): + headers = { + 'Content-Type': 'application/vnd.kafka.json.v2+json' + } + + payload = { + "records": [ + {"value": data} + ] + } + + url = f"{KAFKA_REST_PROXY}/topics/{topic}" + response = requests.post(url, headers=headers, auth=auth, timeout=5 , data=json.dumps(payload), verify=True) + + if response.status_code == 200 or response.status_code == 202: + print(f"Sent data to Kafka topic: {topic}") + else: + print(f"Failed to send to Kafka: {response.status_code} {response.text}") diff --git a/pipelines/transform_clean.py b/pipelines/transform_clean.py new file mode 100644 index 0000000..92d5284 --- /dev/null +++ b/pipelines/transform_clean.py @@ -0,0 +1,68 @@ +from pydantic import BaseModel +from datetime import datetime + +class AirQualitySchema(BaseModel): + station_id: str + station_nameTH: str + station_nameEN: str + areaTH: str + areaEN: str + station_type: str + latitude: float + longitude: float + pm25: float + pm10: float + o3: float + co: float + no2: float + so2: float + aqi: int + main_pollutant: str + record_time: datetime + + class Config: + from_attributes = True # เพื่อให้สามารถแปลงจาก ORM object ได้ + +def transform_json(raw_data: dict): + try: + stations = raw_data.get('stations', []) + transformed = [] + + for station in stations: + AQILast = station.get('AQILast', {}) + AQI = AQILast.get('AQI', {}) + PM25 = AQILast.get('PM25', {}) + PM10 = AQILast.get('PM10', {}) + O3 = AQILast.get('O3', {}) + CO = AQILast.get('CO', {}) + NO2 = AQILast.get('NO2', {}) + SO2 = AQILast.get('SO2', {}) + + # เตรียมข้อมูลให้ตรงกับ AirQualitySchema + data = AirQualitySchema( + station_id=station.get('stationID'), + station_nameTH=station.get('nameTH'), + station_nameEN=station.get('nameEN'), + areaTH=station.get('areaTH'), + areaEN=station.get('areaEN'), + station_type=station.get('stationType'), + latitude=float(station.get('lat', 0)), + longitude=float(station.get('long', 0)), + pm25=float(PM25.get('value', -1)), + pm10=float(PM10.get('value', -1)), + o3=float(O3.get('value', -1)), + co=float(CO.get('value', -1)), + no2=float(NO2.get('value', -1)), + so2=float(SO2.get('value', -1)), + aqi=int(AQI.get('aqi', -1)), + main_pollutant=AQI.get('param', ''), + record_time=datetime.strptime(f"{AQILast.get('date')} {AQILast.get('time')}", "%Y-%m-%d %H:%M") + ) + + transformed.append(data) + + return transformed + except Exception as e: + print(f"Error transforming data: {e}") + return None + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..46b3ae3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +pydantic +kafka-python +requests +python-dotenv +clickhouse-connect +minio \ No newline at end of file diff --git a/test_connections.py b/test_connections.py new file mode 100644 index 0000000..d7abb9e --- /dev/null +++ b/test_connections.py @@ -0,0 +1,64 @@ +# test_connections.py +import os +from dotenv import load_dotenv +import requests +from minio import Minio +import clickhouse_connect +from requests.auth import HTTPBasicAuth + +load_dotenv() + +# ----- 1. Test Kafka REST Proxy ----- +def test_kafka_rest(): + url = f"{os.getenv('KAFKA_REST_PROXY_URL')}/topics" + auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS')) + try: + r = requests.get(url, auth=auth, timeout=5) + r.raise_for_status() + print("✅ Kafka REST Proxy Connected. Topics:", r.json()) + except Exception as e: + print("❌ Kafka REST Proxy Error:", e) + +# ----- 2. Test MinIO ----- +def test_minio(): + try: + minio_client = Minio( + endpoint=os.getenv('MINIO_ENDPOINT').replace('https://', '').replace('http://', ''), + access_key=os.getenv('MINIO_ACCESS_KEY'), + secret_key=os.getenv('MINIO_SECRET_KEY'), + secure=os.getenv('MINIO_ENDPOINT').startswith('https') + ) + buckets = minio_client.list_buckets() + print("✅ MinIO Connected. Buckets:", [b.name for b in buckets]) + except Exception as e: + print("❌ MinIO Error:", e) + +# ----- 3. Test ClickHouse ----- +def test_clickhouse(): + try: + # อ่านค่าจาก .env + ch_host = os.getenv("CLICKHOUSE_HOST").replace("https://", "") + ch_user = os.getenv("CLICKHOUSE_USER") + ch_password = os.getenv("CLICKHOUSE_PASSWORD") + + # สร้าง client ด้วย clickhouse-connect + client = clickhouse_connect.get_client( + host=ch_host, # ระบุ Host โดยไม่ต้องใช้ https:// อีก + port=443, # ใช้พอร์ต HTTPS 443 + username=ch_user, # ชื่อผู้ใช้ + password=ch_password, # รหัสผ่าน + secure=True # ใช้การเชื่อมต่อที่ปลอดภัย (HTTPS) + ) + + # ทดสอบการเชื่อมต่อด้วยคำสั่ง SQL + result = client.query('SELECT now()') + print("✅ ClickHouse Connected. Time:", result.result_rows[0][0]) + + except Exception as e: + print("❌ ClickHouse Error:", e) + +if __name__ == '__main__': + test_kafka_rest() + test_minio() + test_clickhouse() + #test_prefect_api() \ No newline at end of file