import os import asyncio import logging from dotenv import load_dotenv import httpx from datetime import datetime, timezone # การ import จาก module # และ Pydantic models ก็จะอยู่ใน transform_clean from pipelines.fetch_air_quality import fetch_air_quality from pipelines.save_to_minio import save_to_minio from pipelines.send_to_kafka import send_to_kafka from pipelines.transform_clean import transform_json from pipelines.load_to_clickhouse import insert_to_clickhouse from services.logger import AsyncPipelineDBLogger from services.ckan_service import CKANService # =============================== # โหลดตัวแปรสภาพแวดล้อม # =============================== load_dotenv() KAFKA_USER = os.getenv("KAFKA_API_USER") KAFKA_PASS = os.getenv("KAFKA_API_PASS") KAFKA_REST_URL = os.getenv("KAFKA_REST_PROXY_URL") DLQ_TOPIC = os.getenv("KAFKA_DLQ_TOPIC") FORCE_HTTPS = os.getenv("FORCE_HTTPS", "false").lower() == "true" CKAN_ORG_ID = os.getenv("CKAN_ORGANIZATION_ID") # =============================== # การตั้งค่าการบันทึกข้อมูล # =============================== logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("air-quality-pipeline") ckan_service = CKANService() # =============================== # Retry helper แบบ async # =============================== async def retry_async(func, max_retries=3, backoff=2): for attempt in range(max_retries): try: return await func() except httpx.HTTPError as e: wait_time = backoff ** attempt logger.warning(f"การร้องขอล้มเหลว (ครั้งที่ {attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: logger.info(f"กำลังลองใหม่อีกครั้งใน {wait_time} วินาที...") await asyncio.sleep(wait_time) else: logger.error("ลองใหม่ครบตามจำนวนสูงสุดแล้ว ยกเลิกการดำเนินการ") raise return None # =============================== # Fetch -> MinIO -> Kafka # (ฟังก์ชันนี้ไม่ได้ใช้ใน main_async ที่ปรับปรุงแล้ว แต่เก็บไว้) # =============================== async def fetch_and_send_to_kafka(kafka_topic): logger.info("กำลังดึงข้อมูลคุณภาพอากาศ...") raw_data = await asyncio.to_thread(fetch_air_quality) if raw_data: logger.info("กำลังบันทึกข้อมูลดิบลงใน MinIO...") object_name = await asyncio.to_thread(save_to_minio, raw_data) logger.info(f"บันทึกข้อมูลลงใน MinIO เรียบร้อยแล้ว: {object_name}") logger.info(f"กำลังส่งข้อมูลไปยัง Kafka topic '{kafka_topic}'...") await asyncio.to_thread(send_to_kafka, kafka_topic, raw_data) else: logger.warning("ไม่พบข้อมูลที่ดึงมา ข้ามการส่งข้อมูลไปยัง Kafka") # =============================== # Flatten records helper # =============================== def flatten_records(records): """ แปลง list ซ้อน list หรือ object เป็น list ของ dict """ flat = [] if not records: return flat for rec in records: if isinstance(rec, list): flat.extend(flatten_records(rec)) # สมมติว่า model_dump ใช้ได้กับ Pydantic models elif hasattr(rec, "model_dump"): flat.append(rec.model_dump()) elif isinstance(rec, dict): flat.append(rec) else: logger.warning(f"ไม่สามารถ insert record ประเภท {type(rec)}: {rec}") return flat # =============================== # Transform & Load -> ClickHouse + DLQ # (ฟังก์ชันนี้ไม่ได้ใช้ใน main_async ที่ปรับปรุงแล้ว) # =============================== async def transform_and_load(raw_data, kafka_topic=DLQ_TOPIC, batch_size=100): try: transformed_data = await asyncio.to_thread(transform_json, raw_data) if not transformed_data: raise ValueError("ไม่มีข้อมูลที่ถูกแปลงแล้ว") flat_records = flatten_records(transformed_data) total_inserted = 0 batch = [] for record in flat_records: batch.append(record) if len(batch) >= batch_size: for rec in batch: await asyncio.to_thread(insert_to_clickhouse, rec) total_inserted += len(batch) batch.clear() if batch: for rec in batch: await asyncio.to_thread(insert_to_clickhouse, rec) total_inserted += len(batch) logger.info(f"แปลงและโหลดข้อมูลสำเร็จ {total_inserted} รายการ") except Exception as e: logger.error(f"การแปลง/โหลดข้อมูลล้มเหลว: {e}") await asyncio.to_thread(send_to_kafka, kafka_topic, raw_data) # =============================== # Async Kafka consumer (once + batch insert + sequential + smart retry) # ปรับปรุง: ใช้ run_id ในชื่อ Consumer # ปรับปรุง: เพิ่ม Retry Loop ในการรอข้อมูลที่ถูกส่งเข้ามา # =============================== async def consume_kafka_once(kafka_topic, run_id: int, batch_size=100, max_retries=3) -> int: # ใช้ run_id ในการสร้างชื่อ Consumer ที่ไม่ซ้ำกัน consumer_name = f"air-quality-consumer-{run_id}" group_url = f"{KAFKA_REST_URL}/consumers/my-group" total_inserted = 0 async with httpx.AsyncClient( auth=(KAFKA_USER, KAFKA_PASS), timeout=30.0, follow_redirects=True ) as client: # สร้าง consumer หรือใช้ consumer เดิม async def create_consumer(): try: resp = await client.post( group_url, headers={"Content-Type": "application/vnd.kafka.v2+json"}, json={ "name": consumer_name, "format": "json", "auto.offset.reset": "earliest" } ) if resp.status_code in (200, 201): return resp.json()["base_uri"] elif resp.status_code == 409: logger.info(f"Consumer {consumer_name} มีอยู่แล้ว ใช้งานต่อ") return f"{group_url}/instances/{consumer_name}" else: logger.error(f"สร้าง consumer ล้มเหลว: {resp.text}") return None except httpx.HTTPError as e: logger.error(f"เกิดข้อผิดพลาด HTTP ขณะสร้าง consumer: {e}") return None base_uri = await create_consumer() if not base_uri: return 0 # ปรับปรุง: จัดการ base_uri ให้ยืดหยุ่นตาม FORCE_HTTPS if FORCE_HTTPS and base_uri.startswith("http://"): base_uri = base_uri.replace("http://", "https://") logger.info("บังคับใช้ HTTPS สำหรับ Kafka REST Proxy") # สมัคร topic try: sub_resp = await client.post( f"{base_uri}/subscription", headers={"Content-Type": "application/vnd.kafka.v2+json"}, json={"topics": [kafka_topic]} ) if sub_resp.status_code not in (204, 409): logger.error(f"สมัครรับ topic ล้มเหลว: {sub_resp.text}") return 0 logger.info(f"สมัครรับ topic '{kafka_topic}' เรียบร้อยแล้ว") except httpx.HTTPError as e: logger.error(f"เกิดข้อผิดพลาด HTTP ขณะสมัคร topic: {e}") return 0 # Fetch records + Smart Retry Loop for attempt in range(max_retries): try: # ใช้ Long Polling Timeout 20.0 วินาที resp = await client.get( f"{base_uri}/records", headers={"Accept": "application/vnd.kafka.json.v2+json"}, timeout=20.0 ) if resp.status_code == 200: records = resp.json() if not records: # การควบคุม Retry: หากดึงแล้วได้ Array ว่าง if attempt < max_retries - 1: wait_time = 2 + (attempt * 1) # Backoff 2s, 3s, 4s... logger.warning( f"ไม่พบข้อมูลใหม่ใน Kafka (ครั้งที่ {attempt+1}/{max_retries}) กำลังรอ {wait_time} วินาที..." ) await asyncio.sleep(wait_time) continue # ลองดึงใหม่ใน loop ถัดไป logger.info("ไม่พบข้อมูลใหม่ใน Kafka (จบการดึง)") return total_inserted # ไม่มีข้อมูลใหม่ หรือ Retry ครบแล้ว # --- พบข้อมูล: ทำการ Transform & Load --- # Transform & flatten using global function flat_records = [] for record in records: transformed = await asyncio.to_thread(transform_json, record["value"]) flat_records.extend(flatten_records(transformed)) # Batch insert (Sequential) batch = [] for rec in flat_records: batch.append(rec) if len(batch) >= batch_size: # Sequential Insert for r in batch: await asyncio.to_thread(insert_to_clickhouse, r) total_inserted += len(batch) batch.clear() if batch: # Sequential Insert สำหรับ Batch สุดท้าย for r in batch: await asyncio.to_thread(insert_to_clickhouse, r) total_inserted += len(batch) logger.info(f"แปลงและโหลดข้อมูล Kafka จำนวน {len(records)} รายการ (รวมโหลด: {total_inserted} รายการ)") return total_inserted # จบการดึงครั้งเดียวและ return จำนวนที่โหลดสำเร็จ else: logger.warning(f"เกิดข้อผิดพลาดจาก Kafka ({resp.status_code}): {resp.text}") await asyncio.sleep(2) except httpx.HTTPError as e: logger.warning(f"เกิดข้อผิดพลาด HTTP ขณะดึงข้อมูลจาก Kafka: {e}") await asyncio.sleep(2) except Exception as e: logger.error(f"เกิดข้อผิดพลาดที่ไม่คาดคิดในการประมวลผล Kafka: {e}") return total_inserted logger.error("ไม่สามารถดึงข้อมูลจาก Kafka หลังจาก retry ครบจำนวนครั้ง") return total_inserted # =============================== # Pipeline: Fetch -> MinIO -> CKAN -> Kafka # =============================== async def fetch_minio_ckan_kafka(): logger.info("กำลังดึงข้อมูลคุณภาพอากาศ...") raw_data = await asyncio.to_thread(fetch_air_quality) if not raw_data: logger.warning("ไม่พบข้อมูลจาก AIR4THAI") return None, None timestamp = datetime.now(timezone.utc).isoformat() # Save raw data to MinIO object_name = None minio_url = None try: object_name = await asyncio.to_thread(save_to_minio, raw_data) minio_url = f"s3://air-quality/{object_name}" logger.info(f"บันทึกข้อมูลลงใน MinIO เรียบร้อยแล้ว: {object_name}") except Exception as e: logger.error(f"การบันทึก MinIO ล้มเหลว: {e}") # Register raw dataset to CKAN if object_name and minio_url: try: await ckan_service.register_dataset( dataset_name="air_quality_raw", resource_name=object_name, resource_url=minio_url, owner_org=CKAN_ORG_ID, metadata={ "source_url": "http://air4thai.com", "fetch_time": timestamp, "format": "json", "media_type": "application/json", "description": "Raw air quality data from AIR4THAI" } ) logger.info("ลงทะเบียน Raw dataset ใน CKAN เรียบร้อยแล้ว") except Exception as e: logger.warning(f"การลงทะเบียน CKAN (Raw) ล้มเหลว: {e}") pass # ส่งไป Kafka try: await asyncio.to_thread(send_to_kafka, "air4thai-stream", raw_data) logger.info("ส่งข้อมูลไปยัง Kafka เรียบร้อยแล้ว") except Exception as e: logger.error(f"การส่ง Kafka ล้มเหลว: {e}") return None, None return raw_data, timestamp # =============================== # Pipeline: Consume Kafka -> Transform -> ClickHouse -> CKAN # ปรับปรุง: รับ run_id และส่งต่อไปยัง consume_kafka_once # =============================== async def consume_transform_clickhouse_ckan(kafka_topic, timestamp, run_id: int, batch_size=100) -> int: # ใช้ run_id ในการเรียก consumer total_inserted = await consume_kafka_once(kafka_topic, run_id=run_id, batch_size=batch_size) # หลังจากโหลดไป ClickHouse ลงทะเบียน analytical dataset clickhouse_url = "clickhouse://clickhouse-server/air_quality_db" try: await ckan_service.register_dataset( dataset_name="air_quality_analytics", resource_name="air_quality_db", resource_url=clickhouse_url, owner_org=CKAN_ORG_ID, metadata={ "row_count": total_inserted, "last_updated": timestamp, "format": "table", "media_type": "application/sql", "description": "Analytical dataset in ClickHouse" } ) logger.info("ลงทะเบียน analytical dataset ใน CKAN เรียบร้อยแล้ว") except Exception as e: logger.warning(f"การลงทะเบียน CKAN (Analytics) ล้มเหลว: {e}") return total_inserted # =============================== # Async main pipeline # 💡 ปรับปรุง: ส่ง run_id ไปยัง Task 2 # =============================== async def main_async(): pipeline_name = "air4thai-pipeline" db_logger = AsyncPipelineDBLogger() run_id = None task_logs = [] try: # 1. สร้าง run_id run_id = await db_logger.insert_pipeline_run(pipeline_name, task_logs) logger.info(f"เริ่มต้น Pipeline Run ID: {run_id}") # ---------------------------------------------------- # 1. Fetch + MinIO + CKAN + Kafka # ---------------------------------------------------- task_name_1 = "fetch_minio_ckan_kafka" start_time_1 = asyncio.get_event_loop().time() raw_data, timestamp = await fetch_minio_ckan_kafka() duration_ms_1 = (asyncio.get_event_loop().time() - start_time_1) * 1000 status_1 = "success" if raw_data else "failed" message_1 = "Pipeline Fetch->MinIO->CKAN->Kafka" task_logs.append({ "task_name": task_name_1, "status": status_1, "message": message_1, "duration_ms": duration_ms_1 }) logger.info(f"Task '{task_name_1}' สถานะ: {status_1}") logger.info("หน่วงเวลา 3 วินาที เพื่อให้ข้อมูล Kafka พร้อมสำหรับการดึง...") await asyncio.sleep(3) if raw_data: # ---------------------------------------------------- # 2. Consume Kafka + ClickHouse + CKAN # ---------------------------------------------------- task_name_2 = "consume_clickhouse_ckan" start_time_2 = asyncio.get_event_loop().time() # ส่ง run_id เข้าไป total_inserted = await consume_transform_clickhouse_ckan("air4thai-stream", timestamp, run_id=run_id) duration_ms_2 = (asyncio.get_event_loop().time() - start_time_2) * 1000 status_2 = "success" message_2 = f"Pipeline Kafka->ClickHouse->CKAN. โหลด {total_inserted} รายการ" task_logs.append({ "task_name": task_name_2, "status": status_2, "message": message_2, "duration_ms": duration_ms_2 }) logger.info(f"Task '{task_name_2}' สถานะ: {status_2}") # สิ้นสุดสำเร็จ final_status = "success" else: # Task 1 ล้มเหลว final_status = "failed" # อัปเดตสถานะและ task_logs ครั้งเดียวเมื่อจบ await db_logger.update_run_status(run_id, final_status, task_logs=task_logs) logger.info(f"Pipeline Run ID: {run_id} เสร็จสมบูรณ์ด้วยสถานะ: {final_status}") except Exception as e: error_msg = f"เกิดข้อผิดพลาดร้ายแรง: {e}" logger.error(error_msg, exc_info=True) # อัปเดตสถานะเป็น failed if run_id: task_logs.append({ "task_name": "overall_error", "status": "failed", "message": error_msg, "duration_ms": 0 }) await db_logger.update_run_status(run_id, "failed", str(e), task_logs=task_logs) logger.info(f"Pipeline Run ID: {run_id} อัปเดตสถานะเป็น Failed") else: logger.error("ไม่สามารถบันทึกสถานะลงฐานข้อมูลได้เนื่องจากไม่มี Run ID") finally: # ปิด httpx client ของ logger await db_logger.http_client.aclose() logger.info("ปิดการเชื่อมต่อ DB Logger แล้ว") if __name__ == "__main__": asyncio.run(main_async())