443 lines
20 KiB
Python
443 lines
20 KiB
Python
import os
|
|
import asyncio
|
|
import logging
|
|
from dotenv import load_dotenv
|
|
import httpx
|
|
from datetime import datetime, timezone
|
|
|
|
# การ import จาก module
|
|
# และ Pydantic models ก็จะอยู่ใน transform_clean
|
|
from pipelines.fetch_air_quality import fetch_air_quality
|
|
from pipelines.save_to_minio import save_to_minio
|
|
from pipelines.send_to_kafka import send_to_kafka
|
|
from pipelines.transform_clean import transform_json
|
|
from pipelines.load_to_clickhouse import insert_to_clickhouse
|
|
from services.logger import AsyncPipelineDBLogger
|
|
from services.ckan_service import CKANService
|
|
|
|
# ===============================
|
|
# โหลดตัวแปรสภาพแวดล้อม
|
|
# ===============================
|
|
load_dotenv()
|
|
|
|
KAFKA_USER = os.getenv("KAFKA_API_USER")
|
|
KAFKA_PASS = os.getenv("KAFKA_API_PASS")
|
|
KAFKA_REST_URL = os.getenv("KAFKA_REST_PROXY_URL")
|
|
DLQ_TOPIC = os.getenv("KAFKA_DLQ_TOPIC")
|
|
FORCE_HTTPS = os.getenv("FORCE_HTTPS", "false").lower() == "true"
|
|
CKAN_ORG_ID = os.getenv("CKAN_ORGANIZATION_ID")
|
|
|
|
# ===============================
|
|
# การตั้งค่าการบันทึกข้อมูล
|
|
# ===============================
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
)
|
|
logger = logging.getLogger("air-quality-pipeline")
|
|
|
|
ckan_service = CKANService()
|
|
|
|
# ===============================
|
|
# Retry helper แบบ async
|
|
# ===============================
|
|
async def retry_async(func, max_retries=3, backoff=2):
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await func()
|
|
except httpx.HTTPError as e:
|
|
wait_time = backoff ** attempt
|
|
logger.warning(f"การร้องขอล้มเหลว (ครั้งที่ {attempt+1}/{max_retries}): {e}")
|
|
if attempt < max_retries - 1:
|
|
logger.info(f"กำลังลองใหม่อีกครั้งใน {wait_time} วินาที...")
|
|
await asyncio.sleep(wait_time)
|
|
else:
|
|
logger.error("ลองใหม่ครบตามจำนวนสูงสุดแล้ว ยกเลิกการดำเนินการ")
|
|
raise
|
|
return None
|
|
|
|
# ===============================
|
|
# Fetch -> MinIO -> Kafka
|
|
# (ฟังก์ชันนี้ไม่ได้ใช้ใน main_async ที่ปรับปรุงแล้ว แต่เก็บไว้)
|
|
# ===============================
|
|
async def fetch_and_send_to_kafka(kafka_topic):
|
|
logger.info("กำลังดึงข้อมูลคุณภาพอากาศ...")
|
|
raw_data = await asyncio.to_thread(fetch_air_quality)
|
|
|
|
if raw_data:
|
|
logger.info("กำลังบันทึกข้อมูลดิบลงใน MinIO...")
|
|
object_name = await asyncio.to_thread(save_to_minio, raw_data)
|
|
logger.info(f"บันทึกข้อมูลลงใน MinIO เรียบร้อยแล้ว: {object_name}")
|
|
|
|
logger.info(f"กำลังส่งข้อมูลไปยัง Kafka topic '{kafka_topic}'...")
|
|
await asyncio.to_thread(send_to_kafka, kafka_topic, raw_data)
|
|
else:
|
|
logger.warning("ไม่พบข้อมูลที่ดึงมา ข้ามการส่งข้อมูลไปยัง Kafka")
|
|
|
|
# ===============================
|
|
# Flatten records helper
|
|
# ===============================
|
|
def flatten_records(records):
|
|
"""
|
|
แปลง list ซ้อน list หรือ object เป็น list ของ dict
|
|
"""
|
|
flat = []
|
|
if not records:
|
|
return flat
|
|
for rec in records:
|
|
if isinstance(rec, list):
|
|
flat.extend(flatten_records(rec))
|
|
# สมมติว่า model_dump ใช้ได้กับ Pydantic models
|
|
elif hasattr(rec, "model_dump"):
|
|
flat.append(rec.model_dump())
|
|
elif isinstance(rec, dict):
|
|
flat.append(rec)
|
|
else:
|
|
logger.warning(f"ไม่สามารถ insert record ประเภท {type(rec)}: {rec}")
|
|
return flat
|
|
|
|
# ===============================
|
|
# Transform & Load -> ClickHouse + DLQ
|
|
# (ฟังก์ชันนี้ไม่ได้ใช้ใน main_async ที่ปรับปรุงแล้ว)
|
|
# ===============================
|
|
async def transform_and_load(raw_data, kafka_topic=DLQ_TOPIC, batch_size=100):
|
|
try:
|
|
transformed_data = await asyncio.to_thread(transform_json, raw_data)
|
|
if not transformed_data:
|
|
raise ValueError("ไม่มีข้อมูลที่ถูกแปลงแล้ว")
|
|
|
|
flat_records = flatten_records(transformed_data)
|
|
total_inserted = 0
|
|
batch = []
|
|
|
|
for record in flat_records:
|
|
batch.append(record)
|
|
if len(batch) >= batch_size:
|
|
for rec in batch:
|
|
await asyncio.to_thread(insert_to_clickhouse, rec)
|
|
total_inserted += len(batch)
|
|
batch.clear()
|
|
|
|
if batch:
|
|
for rec in batch:
|
|
await asyncio.to_thread(insert_to_clickhouse, rec)
|
|
total_inserted += len(batch)
|
|
|
|
logger.info(f"แปลงและโหลดข้อมูลสำเร็จ {total_inserted} รายการ")
|
|
|
|
except Exception as e:
|
|
logger.error(f"การแปลง/โหลดข้อมูลล้มเหลว: {e}")
|
|
await asyncio.to_thread(send_to_kafka, kafka_topic, raw_data)
|
|
|
|
# ===============================
|
|
# Async Kafka consumer (once + batch insert + sequential + smart retry)
|
|
# ปรับปรุง: ใช้ run_id ในชื่อ Consumer
|
|
# ปรับปรุง: เพิ่ม Retry Loop ในการรอข้อมูลที่ถูกส่งเข้ามา
|
|
# ===============================
|
|
async def consume_kafka_once(kafka_topic, run_id: int, batch_size=100, max_retries=3) -> int:
|
|
# ใช้ run_id ในการสร้างชื่อ Consumer ที่ไม่ซ้ำกัน
|
|
consumer_name = f"air-quality-consumer-{run_id}"
|
|
group_url = f"{KAFKA_REST_URL}/consumers/my-group"
|
|
total_inserted = 0
|
|
|
|
async with httpx.AsyncClient(
|
|
auth=(KAFKA_USER, KAFKA_PASS),
|
|
timeout=30.0,
|
|
follow_redirects=True
|
|
) as client:
|
|
|
|
# สร้าง consumer หรือใช้ consumer เดิม
|
|
async def create_consumer():
|
|
try:
|
|
resp = await client.post(
|
|
group_url,
|
|
headers={"Content-Type": "application/vnd.kafka.v2+json"},
|
|
json={
|
|
"name": consumer_name,
|
|
"format": "json",
|
|
"auto.offset.reset": "earliest"
|
|
}
|
|
)
|
|
if resp.status_code in (200, 201):
|
|
return resp.json()["base_uri"]
|
|
elif resp.status_code == 409:
|
|
logger.info(f"Consumer {consumer_name} มีอยู่แล้ว ใช้งานต่อ")
|
|
return f"{group_url}/instances/{consumer_name}"
|
|
else:
|
|
logger.error(f"สร้าง consumer ล้มเหลว: {resp.text}")
|
|
return None
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"เกิดข้อผิดพลาด HTTP ขณะสร้าง consumer: {e}")
|
|
return None
|
|
|
|
base_uri = await create_consumer()
|
|
if not base_uri:
|
|
return 0
|
|
|
|
# ปรับปรุง: จัดการ base_uri ให้ยืดหยุ่นตาม FORCE_HTTPS
|
|
if FORCE_HTTPS and base_uri.startswith("http://"):
|
|
base_uri = base_uri.replace("http://", "https://")
|
|
logger.info("บังคับใช้ HTTPS สำหรับ Kafka REST Proxy")
|
|
|
|
# สมัคร topic
|
|
try:
|
|
sub_resp = await client.post(
|
|
f"{base_uri}/subscription",
|
|
headers={"Content-Type": "application/vnd.kafka.v2+json"},
|
|
json={"topics": [kafka_topic]}
|
|
)
|
|
if sub_resp.status_code not in (204, 409):
|
|
logger.error(f"สมัครรับ topic ล้มเหลว: {sub_resp.text}")
|
|
return 0
|
|
logger.info(f"สมัครรับ topic '{kafka_topic}' เรียบร้อยแล้ว")
|
|
except httpx.HTTPError as e:
|
|
logger.error(f"เกิดข้อผิดพลาด HTTP ขณะสมัคร topic: {e}")
|
|
return 0
|
|
|
|
# Fetch records + Smart Retry Loop
|
|
for attempt in range(max_retries):
|
|
try:
|
|
# ใช้ Long Polling Timeout 20.0 วินาที
|
|
resp = await client.get(
|
|
f"{base_uri}/records",
|
|
headers={"Accept": "application/vnd.kafka.json.v2+json"},
|
|
timeout=20.0
|
|
)
|
|
|
|
if resp.status_code == 200:
|
|
records = resp.json()
|
|
|
|
if not records:
|
|
# การควบคุม Retry: หากดึงแล้วได้ Array ว่าง
|
|
if attempt < max_retries - 1:
|
|
wait_time = 2 + (attempt * 1) # Backoff 2s, 3s, 4s...
|
|
logger.warning(
|
|
f"ไม่พบข้อมูลใหม่ใน Kafka (ครั้งที่ {attempt+1}/{max_retries}) กำลังรอ {wait_time} วินาที..."
|
|
)
|
|
await asyncio.sleep(wait_time)
|
|
continue # ลองดึงใหม่ใน loop ถัดไป
|
|
|
|
logger.info("ไม่พบข้อมูลใหม่ใน Kafka (จบการดึง)")
|
|
return total_inserted # ไม่มีข้อมูลใหม่ หรือ Retry ครบแล้ว
|
|
|
|
# --- พบข้อมูล: ทำการ Transform & Load ---
|
|
|
|
# Transform & flatten using global function
|
|
flat_records = []
|
|
for record in records:
|
|
transformed = await asyncio.to_thread(transform_json, record["value"])
|
|
flat_records.extend(flatten_records(transformed))
|
|
|
|
# Batch insert (Sequential)
|
|
batch = []
|
|
|
|
for rec in flat_records:
|
|
batch.append(rec)
|
|
if len(batch) >= batch_size:
|
|
|
|
# Sequential Insert
|
|
for r in batch:
|
|
await asyncio.to_thread(insert_to_clickhouse, r)
|
|
|
|
total_inserted += len(batch)
|
|
batch.clear()
|
|
|
|
if batch:
|
|
# Sequential Insert สำหรับ Batch สุดท้าย
|
|
for r in batch:
|
|
await asyncio.to_thread(insert_to_clickhouse, r)
|
|
|
|
total_inserted += len(batch)
|
|
|
|
logger.info(f"แปลงและโหลดข้อมูล Kafka จำนวน {len(records)} รายการ (รวมโหลด: {total_inserted} รายการ)")
|
|
return total_inserted # จบการดึงครั้งเดียวและ return จำนวนที่โหลดสำเร็จ
|
|
|
|
else:
|
|
logger.warning(f"เกิดข้อผิดพลาดจาก Kafka ({resp.status_code}): {resp.text}")
|
|
await asyncio.sleep(2)
|
|
|
|
except httpx.HTTPError as e:
|
|
logger.warning(f"เกิดข้อผิดพลาด HTTP ขณะดึงข้อมูลจาก Kafka: {e}")
|
|
await asyncio.sleep(2)
|
|
except Exception as e:
|
|
logger.error(f"เกิดข้อผิดพลาดที่ไม่คาดคิดในการประมวลผล Kafka: {e}")
|
|
return total_inserted
|
|
|
|
logger.error("ไม่สามารถดึงข้อมูลจาก Kafka หลังจาก retry ครบจำนวนครั้ง")
|
|
return total_inserted
|
|
|
|
# ===============================
|
|
# Pipeline: Fetch -> MinIO -> CKAN -> Kafka
|
|
# ===============================
|
|
async def fetch_minio_ckan_kafka():
|
|
logger.info("กำลังดึงข้อมูลคุณภาพอากาศ...")
|
|
raw_data = await asyncio.to_thread(fetch_air_quality)
|
|
if not raw_data:
|
|
logger.warning("ไม่พบข้อมูลจาก AIR4THAI")
|
|
return None, None
|
|
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Save raw data to MinIO
|
|
object_name = None
|
|
minio_url = None
|
|
try:
|
|
object_name = await asyncio.to_thread(save_to_minio, raw_data)
|
|
minio_url = f"s3://air-quality/{object_name}"
|
|
logger.info(f"บันทึกข้อมูลลงใน MinIO เรียบร้อยแล้ว: {object_name}")
|
|
except Exception as e:
|
|
logger.error(f"การบันทึก MinIO ล้มเหลว: {e}")
|
|
|
|
# Register raw dataset to CKAN
|
|
if object_name and minio_url:
|
|
try:
|
|
await ckan_service.register_dataset(
|
|
dataset_name="air_quality_raw",
|
|
resource_name=object_name,
|
|
resource_url=minio_url,
|
|
owner_org=CKAN_ORG_ID,
|
|
metadata={
|
|
"source_url": "http://air4thai.com",
|
|
"fetch_time": timestamp,
|
|
"format": "json",
|
|
"media_type": "application/json",
|
|
"description": "Raw air quality data from AIR4THAI"
|
|
}
|
|
)
|
|
logger.info("ลงทะเบียน Raw dataset ใน CKAN เรียบร้อยแล้ว")
|
|
except Exception as e:
|
|
logger.warning(f"การลงทะเบียน CKAN (Raw) ล้มเหลว: {e}")
|
|
pass
|
|
|
|
# ส่งไป Kafka
|
|
try:
|
|
await asyncio.to_thread(send_to_kafka, "air4thai-stream", raw_data)
|
|
logger.info("ส่งข้อมูลไปยัง Kafka เรียบร้อยแล้ว")
|
|
except Exception as e:
|
|
logger.error(f"การส่ง Kafka ล้มเหลว: {e}")
|
|
return None, None
|
|
|
|
return raw_data, timestamp
|
|
|
|
# ===============================
|
|
# Pipeline: Consume Kafka -> Transform -> ClickHouse -> CKAN
|
|
# ปรับปรุง: รับ run_id และส่งต่อไปยัง consume_kafka_once
|
|
# ===============================
|
|
async def consume_transform_clickhouse_ckan(kafka_topic, timestamp, run_id: int, batch_size=100) -> int:
|
|
# ใช้ run_id ในการเรียก consumer
|
|
total_inserted = await consume_kafka_once(kafka_topic, run_id=run_id, batch_size=batch_size)
|
|
|
|
# หลังจากโหลดไป ClickHouse ลงทะเบียน analytical dataset
|
|
clickhouse_url = "clickhouse://clickhouse-server/air_quality_db"
|
|
|
|
try:
|
|
await ckan_service.register_dataset(
|
|
dataset_name="air_quality_analytics",
|
|
resource_name="air_quality_db",
|
|
resource_url=clickhouse_url,
|
|
owner_org=CKAN_ORG_ID,
|
|
metadata={
|
|
"row_count": total_inserted,
|
|
"last_updated": timestamp,
|
|
"format": "table",
|
|
"media_type": "application/sql",
|
|
"description": "Analytical dataset in ClickHouse"
|
|
}
|
|
)
|
|
logger.info("ลงทะเบียน analytical dataset ใน CKAN เรียบร้อยแล้ว")
|
|
except Exception as e:
|
|
logger.warning(f"การลงทะเบียน CKAN (Analytics) ล้มเหลว: {e}")
|
|
|
|
return total_inserted
|
|
|
|
# ===============================
|
|
# Async main pipeline
|
|
# 💡 ปรับปรุง: ส่ง run_id ไปยัง Task 2
|
|
# ===============================
|
|
async def main_async():
|
|
pipeline_name = "air4thai-pipeline"
|
|
db_logger = AsyncPipelineDBLogger()
|
|
run_id = None
|
|
task_logs = []
|
|
|
|
try:
|
|
# 1. สร้าง run_id
|
|
run_id = await db_logger.insert_pipeline_run(pipeline_name, task_logs)
|
|
logger.info(f"เริ่มต้น Pipeline Run ID: {run_id}")
|
|
|
|
# ----------------------------------------------------
|
|
# 1. Fetch + MinIO + CKAN + Kafka
|
|
# ----------------------------------------------------
|
|
task_name_1 = "fetch_minio_ckan_kafka"
|
|
start_time_1 = asyncio.get_event_loop().time()
|
|
raw_data, timestamp = await fetch_minio_ckan_kafka()
|
|
duration_ms_1 = (asyncio.get_event_loop().time() - start_time_1) * 1000
|
|
|
|
status_1 = "success" if raw_data else "failed"
|
|
message_1 = "Pipeline Fetch->MinIO->CKAN->Kafka"
|
|
task_logs.append({
|
|
"task_name": task_name_1,
|
|
"status": status_1,
|
|
"message": message_1,
|
|
"duration_ms": duration_ms_1
|
|
})
|
|
logger.info(f"Task '{task_name_1}' สถานะ: {status_1}")
|
|
|
|
logger.info("หน่วงเวลา 3 วินาที เพื่อให้ข้อมูล Kafka พร้อมสำหรับการดึง...")
|
|
await asyncio.sleep(3)
|
|
|
|
if raw_data:
|
|
# ----------------------------------------------------
|
|
# 2. Consume Kafka + ClickHouse + CKAN
|
|
# ----------------------------------------------------
|
|
task_name_2 = "consume_clickhouse_ckan"
|
|
start_time_2 = asyncio.get_event_loop().time()
|
|
# ส่ง run_id เข้าไป
|
|
total_inserted = await consume_transform_clickhouse_ckan("air4thai-stream", timestamp, run_id=run_id)
|
|
duration_ms_2 = (asyncio.get_event_loop().time() - start_time_2) * 1000
|
|
|
|
status_2 = "success"
|
|
message_2 = f"Pipeline Kafka->ClickHouse->CKAN. โหลด {total_inserted} รายการ"
|
|
task_logs.append({
|
|
"task_name": task_name_2,
|
|
"status": status_2,
|
|
"message": message_2,
|
|
"duration_ms": duration_ms_2
|
|
})
|
|
logger.info(f"Task '{task_name_2}' สถานะ: {status_2}")
|
|
|
|
# สิ้นสุดสำเร็จ
|
|
final_status = "success"
|
|
else:
|
|
# Task 1 ล้มเหลว
|
|
final_status = "failed"
|
|
|
|
# อัปเดตสถานะและ task_logs ครั้งเดียวเมื่อจบ
|
|
await db_logger.update_run_status(run_id, final_status, task_logs=task_logs)
|
|
logger.info(f"Pipeline Run ID: {run_id} เสร็จสมบูรณ์ด้วยสถานะ: {final_status}")
|
|
|
|
|
|
except Exception as e:
|
|
error_msg = f"เกิดข้อผิดพลาดร้ายแรง: {e}"
|
|
logger.error(error_msg, exc_info=True)
|
|
|
|
# อัปเดตสถานะเป็น failed
|
|
if run_id:
|
|
task_logs.append({
|
|
"task_name": "overall_error",
|
|
"status": "failed",
|
|
"message": error_msg,
|
|
"duration_ms": 0
|
|
})
|
|
await db_logger.update_run_status(run_id, "failed", str(e), task_logs=task_logs)
|
|
logger.info(f"Pipeline Run ID: {run_id} อัปเดตสถานะเป็น Failed")
|
|
else:
|
|
logger.error("ไม่สามารถบันทึกสถานะลงฐานข้อมูลได้เนื่องจากไม่มี Run ID")
|
|
|
|
finally:
|
|
# ปิด httpx client ของ logger
|
|
await db_logger.http_client.aclose()
|
|
logger.info("ปิดการเชื่อมต่อ DB Logger แล้ว")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main_async()) |