th-air-quality-etl-ml/pipelines/load_to_clickhouse.py

65 lines
2.1 KiB
Python

from clickhouse_connect import get_client
import os
from dotenv import load_dotenv
load_dotenv()
# อ่านค่าจาก .env
ch_host = os.getenv("CLICKHOUSE_HOST").replace("https://", "")
ch_user = os.getenv("CLICKHOUSE_USER")
ch_password = os.getenv("CLICKHOUSE_PASSWORD")
# สร้าง client ด้วย clickhouse-connect
clickhouse_client = get_client(
host=ch_host,
port=443,
username=ch_user,
password=ch_password,
secure=True
)
def insert_to_clickhouse(record):
if not record:
print("⚠️ No data to insert.")
return
def safe_float(value):
if value is None or value == -1 or value == -1.0:
return 0.0
return float(value)
try:
clickhouse_client.insert(
table='air_quality_db',
column_names=[
'station_id', 'station_nameTH', 'station_nameEN',
'areaTH', 'areaEN', 'station_type',
'latitude', 'longitude',
'pm25', 'pm10', 'o3', 'co', 'no2', 'so2',
'aqi', 'main_pollutant', 'record_time'
],
data=[[
record.get('station_id'),
record.get('station_nameTH'),
record.get('station_nameEN'),
record.get('areaTH'),
record.get('areaEN'),
record.get('station_type'),
safe_float(record.get('latitude')),
safe_float(record.get('longitude')),
safe_float(record.get('pm25')),
safe_float(record.get('pm10')),
safe_float(record.get('o3')),
safe_float(record.get('co')),
safe_float(record.get('no2')),
safe_float(record.get('so2')),
int(record.get('aqi')) if record.get('aqi', -1) != -1 else 0,
record.get('main_pollutant'),
record.get('record_time')
]]
)
print(f"Inserted 1 record to ClickHouse.")
except Exception as e:
print(f"Error inserting record into ClickHouse: {e}")