Initial commit of th-air-quality-etl-ml app

This commit is contained in:
Flook 2025-04-28 17:10:48 +07:00
commit 2b7cdd3475
13 changed files with 599 additions and 0 deletions

35
.gitignore vendored Normal file
View File

@ -0,0 +1,35 @@
# Python compiled files
__pycache__/
*.pyc
*.pyo
*.pyd
*.py[cod]
# Virtual Environment
.venv/
env/
venv/
# Security
.env
# IDE specific files
.idea/
.vscode/
*.suo
*.ntvs*
*.njsproj
*.sln
*.iml
*.sublime-workspace
*.sublime-project
# Ignore unnecessary system files
.DS_Store
Thumbs.db
Desktop.ini
Icon?
# OS-specific swap files
*.swp
*.swo

76
README.md Normal file
View File

@ -0,0 +1,76 @@
# Thailand Air Quality ETL & Streaming Pipeline
โปรเจคนี้ออกแบบมาเพื่อทำ **ETL (Extract, Transform, Load)** และ **Streaming Pipeline** สำหรับข้อมูลคุณภาพอากาศในประเทศไทย
โดยสามารถนำข้อมูลที่ได้ไปใช้งานด้าน **Data Visualization** หรือ **AI Model Training** ในอนาคตได้
---
## 📋 ฟีเจอร์หลัก
- **Extract:** ดึงข้อมูลคุณภาพอากาศจากแหล่งข้อมูล (API)
- **Save:** เก็บข้อมูลดิบ (Raw Data) ลง **MinIO** (S3-compatible storage)
- **Stream:** ส่งข้อมูลเข้า **Apache Kafka** ผ่าน **Kafka REST Proxy**
- **Transform:** แปลงข้อมูลให้อยู่ในรูปแบบที่เหมาะสม
- **Load:** บันทึกข้อมูลลงฐานข้อมูล **ClickHouse** เพื่อการวิเคราะห์ต่อไป
---
## ⚙️ เทคโนโลยีที่ใช้
- **Python** (ETL Core)
- **ClickHouse** (Data Warehouse)
- **Kafka** (Message Streaming)
- **MinIO** (S3-compatible Object Storage)
- **Pydantic** (Data Validation)
- **requests** (API Communication)
---
## 🛠️ โครงสร้างระบบ (Data Flow)
```plaintext
[Air Quality API]
[fetch_air_quality()]
[save_to_minio()]
[send_to_kafka()]
[consume_kafka_and_process()]
[transform_json()]
[insert_to_clickhouse()]
```
## 📈 โอกาสในการต่อยอด
โปรเจคนี้สามารถนำไปใช้ในการพัฒนาและขยายระบบได้ตามแนวทางต่าง ๆ ที่กล่าวถึงด้านล่าง:
### 1. **เชื่อมต่อข้อมูลไปยังเครื่องมือ BI**
- สามารถใช้ **BI** เพื่อสร้างแดชบอร์ดสำหรับการวิเคราะห์ข้อมูลคุณภาพอากาศแบบ **Real-Time** หรือ **Historical Data** จาก **ClickHouse**.
- การเชื่อมต่อสามารถทำได้ง่าย ๆ โดยการตั้งค่า **Data Source** สำหรับ **ClickHouse** ในเครื่องมือ BI ที่เลือก โดยการตั้งค่าให้ตรงกับรายละเอียดการเชื่อมต่อในไฟล์ `.env` ที่ได้กำหนดไว้ในโปรเจค
- **BI** สามารถช่วยให้คุณสร้างการวิเคราะห์ในรูปแบบกราฟ แผนภูมิ และรายงานต่าง ๆ เพื่อแสดงผลคุณภาพอากาศในพื้นที่ต่าง ๆ
### 2. **พัฒนาระบบ Real-Time Air Quality Monitoring Dashboard**
- ใช้ข้อมูลจาก **Kafka** ที่กำลังไหลเข้ามาในระบบเพื่อนำมาวิเคราะห์และแสดงผล **Real-Time** บน **Dashboard**.
- สามารถสร้าง **Web Dashboard** ที่แสดงผลแบบ **Live Updates** โดยการใช้งาน **Grafana** หรือ **Custom Web App** ที่ดึงข้อมูลจาก **Kafka** หรือ **ClickHouse**.
- **Real-Time Monitoring** สามารถใช้เพื่อเฝ้าระวังคุณภาพอากาศในแต่ละพื้นที่ได้ทันที พร้อมทั้งแสดงค่าดัชนีคุณภาพอากาศ (AQI) และสารมลพิษต่าง ๆ ที่มีผลกระทบต่อสุขภาพ
### 3. **สร้างโมเดล AI เพื่อทำนายคุณภาพอากาศในอนาคต**
- ข้อมูลจาก **ClickHouse** สามารถนำไปฝึกโมเดล **AI/ML** สำหรับการทำนาย **Air Quality Index (AQI)** ในอนาคต โดยใช้ข้อมูลจากประวัติย้อนหลัง (Historical Data).
- โมเดลที่ได้สามารถช่วยในการคาดการณ์ระดับ **PM2.5**, **PM10**, และสารมลพิษอื่น ๆ เพื่อให้สามารถแจ้งเตือนล่วงหน้าหรือวางแผนเพื่อรับมือกับภาวะมลพิษ
- ตัวอย่างเทคนิคที่สามารถนำมาใช้ เช่น **Time Series Forecasting** ด้วยเครื่องมืออย่าง **ARIMA**, **Prophet**, หรือ **LSTM (Long Short-Term Memory)**
### 4. **สร้างระบบแจ้งเตือนอัตโนมัติเมื่อ AQI เกินเกณฑ์**
- พัฒนาระบบ **Alert System** เพื่อแจ้งเตือนเมื่อค่า **AQI** หรือ **ระดับมลพิษ** เกินเกณฑ์ที่กำหนด
- ระบบสามารถใช้ **Kafka Consumer** ที่รับข้อมูล **Real-Time** เพื่อเช็คว่า AQI เกินเกณฑ์ที่ตั้งไว้หรือไม่ และแจ้งเตือนผ่าน **Email**, **SMS**, หรือ **Push Notification** (ผ่านช่องทางต่าง ๆ เช่น **Twilio**, **Firebase Cloud Messaging**, หรือ **Telegram Bot**)
- การแจ้งเตือนนี้สามารถช่วยในการรับมือกับปัญหามลพิษและเตือนประชาชนให้หลีกเลี่ยงพื้นที่ที่มีมลพิษสูง
---
## 📜 License
โปรเจคนี้แจกจ่ายภายใต้ **MIT License**

21
check_packages.py Normal file
View File

@ -0,0 +1,21 @@
import importlib
# รายชื่อแพ็กเกจที่ต้องตรวจสอบ
packages = [
"pydantic",
"kafka",
"requests",
"dotenv",
"clickhouse_connect",
"minio"
]
for package in packages:
try:
# ตรวจสอบการนำเข้าโมดูล
module = importlib.import_module(package)
# แสดงเวอร์ชัน (ถ้ามี attribute __version__)
version = getattr(module, "__version__", "ไม่พบข้อมูลเวอร์ชัน")
print(f"{package}: ติดตั้งเรียบร้อย (เวอร์ชัน: {version})")
except ImportError:
print(f"{package}: ยังไม่ได้ติดตั้ง")

View File

@ -0,0 +1,68 @@
import clickhouse_connect
from dotenv import load_dotenv
import os
load_dotenv()
# ----- สร้าง Table ถ้ายังไม่มี -----
def create_air_quality_table():
try:
# อ่านค่าจาก .env
ch_host = os.getenv("CLICKHOUSE_HOST").replace("https://", "")
ch_user = os.getenv("CLICKHOUSE_USER")
ch_password = os.getenv("CLICKHOUSE_PASSWORD")
# สร้าง client ด้วย clickhouse-connect
client = clickhouse_connect.get_client(
host=ch_host, # ระบุ Host โดยไม่ต้องใช้ https:// อีก
port=443, # ใช้พอร์ต HTTPS 443
username=ch_user, # ชื่อผู้ใช้
password=ch_password, # รหัสผ่าน
secure=True # ใช้การเชื่อมต่อที่ปลอดภัย (HTTPS)
)
# สร้าง Table ด้วยคำสั่ง SQL
create_table_sql = """
CREATE TABLE IF NOT EXISTS air_quality_db (
station_id String,
station_nameTH String,
station_nameEN String,
areaTH String,
areaEN String,
station_type String,
latitude Float64,
longitude Float64,
pm25 Float32,
pm10 Float32,
o3 Float32,
co Float32,
no2 Float32,
so2 Float32,
aqi Int32,
main_pollutant String,
record_time DateTime
) ENGINE = MergeTree()
ORDER BY (station_id, record_time)
"""
client.command(create_table_sql)
verify_air_quality_table(client)
except Exception as e:
print("❌ ClickHouse Error:", e)
def verify_air_quality_table(client):
try:
verify_table_sql = """
SHOW TABLES LIKE 'air_quality_db'
"""
result = client.command(verify_table_sql)
if result.strip():
print("✅ Table 'air_quality_db' exists.")
else:
print("❌ Table 'air_quality_db' not found.")
except Exception as e:
print("❌ Error while verifying table:", e)
if __name__ == '__main__':
create_air_quality_table()

0
pipelines/__init__.py Normal file
View File

View File

@ -0,0 +1,18 @@
import os
from dotenv import load_dotenv
import requests
load_dotenv()
def fetch_air_quality():
url = os.getenv('AIR4THAI_URL')
if not url:
raise ValueError("Environment variable 'AIR4THAI_URL' is not set.")
try:
response = requests.get(url)
response.raise_for_status() # ตรวจสอบว่าไม่มีข้อผิดพลาด
return response.json()
except requests.exceptions.RequestException as e:
print("Error fetching air quality data:", e)
return None

View File

@ -0,0 +1,64 @@
from clickhouse_connect import get_client
import os
from dotenv import load_dotenv
load_dotenv()
# อ่านค่าจาก .env
ch_host = os.getenv("CLICKHOUSE_HOST").replace("https://", "")
ch_user = os.getenv("CLICKHOUSE_USER")
ch_password = os.getenv("CLICKHOUSE_PASSWORD")
# สร้าง client ด้วย clickhouse-connect
clickhouse_client = get_client(
host=ch_host,
port=443,
username=ch_user,
password=ch_password,
secure=True
)
def insert_to_clickhouse(record):
if not record:
print("⚠️ No data to insert.")
return
def safe_float(value):
if value is None or value == -1 or value == -1.0:
return 0.0
return float(value)
try:
clickhouse_client.insert(
table='air_quality_db',
column_names=[
'station_id', 'station_nameTH', 'station_nameEN',
'areaTH', 'areaEN', 'station_type',
'latitude', 'longitude',
'pm25', 'pm10', 'o3', 'co', 'no2', 'so2',
'aqi', 'main_pollutant', 'record_time'
],
data=[[
record.get('station_id'),
record.get('station_nameTH'),
record.get('station_nameEN'),
record.get('areaTH'),
record.get('areaEN'),
record.get('station_type'),
safe_float(record.get('latitude')),
safe_float(record.get('longitude')),
safe_float(record.get('pm25')),
safe_float(record.get('pm10')),
safe_float(record.get('o3')),
safe_float(record.get('co')),
safe_float(record.get('no2')),
safe_float(record.get('so2')),
int(record.get('aqi')) if record.get('aqi', -1) != -1 else 0,
record.get('main_pollutant'),
record.get('record_time')
]]
)
print(f"Inserted 1 record to ClickHouse.")
except Exception as e:
print(f"Error inserting record into ClickHouse: {e}")

118
pipelines/my_flow.py Normal file
View File

@ -0,0 +1,118 @@
from pipelines.fetch_air_quality import fetch_air_quality
from pipelines.save_to_minio import save_to_minio
from pipelines.send_to_kafka import send_to_kafka
from pipelines.transform_clean import transform_json
from pipelines.load_to_clickhouse import insert_to_clickhouse
import os
import json
import time
import requests
from requests.auth import HTTPBasicAuth
from dotenv import load_dotenv
load_dotenv()
auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS'))
url = f"{os.getenv('KAFKA_REST_PROXY_URL')}/consumers/my-group"
def fetch_and_send_to_kafka(kafka_topic):
raw_data = fetch_air_quality()
if raw_data:
object_name = save_to_minio(raw_data)
send_to_kafka(kafka_topic, raw_data)
def transform_and_load_to_clickhouse(raw_data):
transformed_data = transform_json(raw_data)
if transformed_data:
for record in transformed_data:
#print("record.model_dump(): ", record.model_dump())
insert_to_clickhouse(record.model_dump())
def consume_kafka_and_process(kafka_topic):
# สร้าง Consumer Instance
headers = {
"Content-Type": "application/vnd.kafka.v2+json",
}
data = {
"name": "my_consumer_instance",
"format": "json",
"auto.offset.reset": "earliest",
}
response = requests.post(
url,
headers=headers,
data=json.dumps(data),
auth=auth,
verify=False # ระวังถ้า SSL เป็น self-signed
)
if response.status_code != 200:
print(f"Failed to create consumer instance: {response.text}")
return
consumer_instance = response.json()
base_uri = consumer_instance['base_uri']
# Subscribe to topic
subscribe_data = {
"topics": [kafka_topic]
}
sub_resp = requests.post(
f"{base_uri}/subscription",
headers=headers,
data=json.dumps(subscribe_data),
auth=auth,
verify=False
)
if sub_resp.status_code != 204:
print(f"Failed to subscribe to topic: {sub_resp.text}")
return
print(f"✅ Subscribed to topic: {kafka_topic}")
try:
while True:
# Poll for messages
resp = requests.get(
f"{base_uri}/records",
headers={"Accept": "application/vnd.kafka.json.v2+json"},
auth=auth,
verify=False
)
if resp.status_code == 200:
records = resp.json()
if records:
for record in records:
raw_data = record['value']
# แสดง raw message ที่รับมาจาก Kafka
#print(f"Raw message from Kafka: {raw_data}")
transform_and_load_to_clickhouse(raw_data)
else:
print(" No new records.")
else:
print(f"⚠️ Error fetching records: {resp.text}")
time.sleep(2) # เพิ่มพัก 2 วินาที ก่อนวนรอบใหม่
finally:
# Delete Consumer Instance when done
requests.delete(
base_uri,
headers={"Accept": "application/vnd.kafka.v2+json"},
auth=auth,
verify=False
)
def run_flow():
kafka_topic = "air4thai-stream"
# Step 1: Fetch and send data to Kafka
fetch_and_send_to_kafka(kafka_topic)
# Step 2: Consume data from Kafka and load to ClickHouse
consume_kafka_and_process(kafka_topic)
if __name__ == "__main__":
run_flow()

View File

@ -0,0 +1,32 @@
import os
from dotenv import load_dotenv
from minio import Minio
import json
from io import BytesIO
from datetime import datetime, timezone
load_dotenv()
def save_to_minio(data):
bucket_name = "air-quality"
try:
minio_client = Minio(
endpoint=os.getenv('MINIO_ENDPOINT').replace('https://', '').replace('http://', ''),
access_key=os.getenv('MINIO_ACCESS_KEY'),
secret_key=os.getenv('MINIO_SECRET_KEY'),
secure=os.getenv('MINIO_ENDPOINT').startswith('https')
)
# ใช้ timezone-aware datetime
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d-%H%M%S')
object_name = f"raw/{timestamp}.json"
# เปลี่ยนข้อมูลเป็น JSON และใช้ ensure_ascii=False เพื่อรองรับภาษาไทย
data_json = json.dumps(data, ensure_ascii=False, indent=4)
# แปลงข้อมูล JSON เป็น bytes และบันทึกลงใน MinIO
data_bytes = BytesIO(data_json.encode('utf-8'))
minio_client.put_object(bucket_name, object_name, data_bytes, length=data_bytes.getbuffer().nbytes)
return object_name
except Exception as e:
print("MinIO Error:", e)

View File

@ -0,0 +1,29 @@
import os
import json
import requests
from dotenv import load_dotenv
from requests.auth import HTTPBasicAuth
load_dotenv()
KAFKA_REST_PROXY = "https://kafka-rest-proxy.softwarecraft.tech" # เปลี่ยนให้เป็น https://
auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS'))
def send_to_kafka(topic, data):
headers = {
'Content-Type': 'application/vnd.kafka.json.v2+json'
}
payload = {
"records": [
{"value": data}
]
}
url = f"{KAFKA_REST_PROXY}/topics/{topic}"
response = requests.post(url, headers=headers, auth=auth, timeout=5 , data=json.dumps(payload), verify=True)
if response.status_code == 200 or response.status_code == 202:
print(f"Sent data to Kafka topic: {topic}")
else:
print(f"Failed to send to Kafka: {response.status_code} {response.text}")

View File

@ -0,0 +1,68 @@
from pydantic import BaseModel
from datetime import datetime
class AirQualitySchema(BaseModel):
station_id: str
station_nameTH: str
station_nameEN: str
areaTH: str
areaEN: str
station_type: str
latitude: float
longitude: float
pm25: float
pm10: float
o3: float
co: float
no2: float
so2: float
aqi: int
main_pollutant: str
record_time: datetime
class Config:
from_attributes = True # เพื่อให้สามารถแปลงจาก ORM object ได้
def transform_json(raw_data: dict):
try:
stations = raw_data.get('stations', [])
transformed = []
for station in stations:
AQILast = station.get('AQILast', {})
AQI = AQILast.get('AQI', {})
PM25 = AQILast.get('PM25', {})
PM10 = AQILast.get('PM10', {})
O3 = AQILast.get('O3', {})
CO = AQILast.get('CO', {})
NO2 = AQILast.get('NO2', {})
SO2 = AQILast.get('SO2', {})
# เตรียมข้อมูลให้ตรงกับ AirQualitySchema
data = AirQualitySchema(
station_id=station.get('stationID'),
station_nameTH=station.get('nameTH'),
station_nameEN=station.get('nameEN'),
areaTH=station.get('areaTH'),
areaEN=station.get('areaEN'),
station_type=station.get('stationType'),
latitude=float(station.get('lat', 0)),
longitude=float(station.get('long', 0)),
pm25=float(PM25.get('value', -1)),
pm10=float(PM10.get('value', -1)),
o3=float(O3.get('value', -1)),
co=float(CO.get('value', -1)),
no2=float(NO2.get('value', -1)),
so2=float(SO2.get('value', -1)),
aqi=int(AQI.get('aqi', -1)),
main_pollutant=AQI.get('param', ''),
record_time=datetime.strptime(f"{AQILast.get('date')} {AQILast.get('time')}", "%Y-%m-%d %H:%M")
)
transformed.append(data)
return transformed
except Exception as e:
print(f"Error transforming data: {e}")
return None

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
pydantic
kafka-python
requests
python-dotenv
clickhouse-connect
minio

64
test_connections.py Normal file
View File

@ -0,0 +1,64 @@
# test_connections.py
import os
from dotenv import load_dotenv
import requests
from minio import Minio
import clickhouse_connect
from requests.auth import HTTPBasicAuth
load_dotenv()
# ----- 1. Test Kafka REST Proxy -----
def test_kafka_rest():
url = f"{os.getenv('KAFKA_REST_PROXY_URL')}/topics"
auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS'))
try:
r = requests.get(url, auth=auth, timeout=5)
r.raise_for_status()
print("✅ Kafka REST Proxy Connected. Topics:", r.json())
except Exception as e:
print("❌ Kafka REST Proxy Error:", e)
# ----- 2. Test MinIO -----
def test_minio():
try:
minio_client = Minio(
endpoint=os.getenv('MINIO_ENDPOINT').replace('https://', '').replace('http://', ''),
access_key=os.getenv('MINIO_ACCESS_KEY'),
secret_key=os.getenv('MINIO_SECRET_KEY'),
secure=os.getenv('MINIO_ENDPOINT').startswith('https')
)
buckets = minio_client.list_buckets()
print("✅ MinIO Connected. Buckets:", [b.name for b in buckets])
except Exception as e:
print("❌ MinIO Error:", e)
# ----- 3. Test ClickHouse -----
def test_clickhouse():
try:
# อ่านค่าจาก .env
ch_host = os.getenv("CLICKHOUSE_HOST").replace("https://", "")
ch_user = os.getenv("CLICKHOUSE_USER")
ch_password = os.getenv("CLICKHOUSE_PASSWORD")
# สร้าง client ด้วย clickhouse-connect
client = clickhouse_connect.get_client(
host=ch_host, # ระบุ Host โดยไม่ต้องใช้ https:// อีก
port=443, # ใช้พอร์ต HTTPS 443
username=ch_user, # ชื่อผู้ใช้
password=ch_password, # รหัสผ่าน
secure=True # ใช้การเชื่อมต่อที่ปลอดภัย (HTTPS)
)
# ทดสอบการเชื่อมต่อด้วยคำสั่ง SQL
result = client.query('SELECT now()')
print("✅ ClickHouse Connected. Time:", result.result_rows[0][0])
except Exception as e:
print("❌ ClickHouse Error:", e)
if __name__ == '__main__':
test_kafka_rest()
test_minio()
test_clickhouse()
#test_prefect_api()