From 872174324255c1173ca7ac78b978fdd870a70efa Mon Sep 17 00:00:00 2001 From: Flook Date: Fri, 3 Oct 2025 05:04:38 +0700 Subject: [PATCH] Update README.md --- README.md | 20 + api/__init__.py | 0 .../air_quality_data_ingestion_pipeline.py | 443 ++++++++++++++++++ pipelines/my_flow.py | 118 ----- 4 files changed, 463 insertions(+), 118 deletions(-) create mode 100644 api/__init__.py create mode 100644 pipelines/air_quality_data_ingestion_pipeline.py delete mode 100644 pipelines/my_flow.py diff --git a/README.md b/README.md index 445286e..ebe4b35 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,26 @@ https://analytics.softwarecraft.tech/public/dashboard/0107425a-50aa-4fd6-8d78-d2 --- +## ตัวอย่าง Data-Driven Decision Platform + +MLOps Pipeline + Model Management API + BI Integration + Lakehouse Architecture ขนาดย่อม ที่พัฒนาบนพื้นฐานของโปรเจคนี้ ประกอบด้วย + +1 Data and Prediction Pipeline: มีการทำงานแบบ Async Pipeline (Ingestion และ Prediction) ที่มีการจัดการวงจรชีวิตโมเดล AI ตั้งแต่การพัฒนาไปจนถึงการใช้งานจริง โดยรองรับการรวบรวม จัดเตรียมข้อมูล และการใช้โมเดล MindsDB ในการวิเคราะห์และทำนายแนวโน้ม + +2 Model Registry & Serving: ระบบมี API ครบชุดสำหรับการจัดการและให้บริการโมเดล ซึ่งประกอบด้วย: + - การลงทะเบียน (Register): ลงทะเบียน Metadata ของโมเดล (ทั้ง MindsDB และ Custom Models) ลงใน CKAN Registry + - การให้บริการ (Unified Predict): มี Endpoint เดียว (/v1/model/predict) สำหรับให้บริการทำนายผล โดยระบบจะ Dispatch การเรียกใช้งานไปยัง MindsDB หรือ Custom Model โดยอัตโนมัติ + - การควบคุม (Governance): มีการควบคุมสิทธิ์การเข้าถึง (RBAC) และ Endpoint สำหรับตรวจสอบ สถานะสุขภาพ และ Metadata ของโมเดลใน Production ได้อย่างละเอียด + +3 รองรับการเชื่อมต่อกับ BI Tools (เช่น Metabase) เพื่อให้ผู้ใช้งานสามารถวิเคราะห์สถานการณ์ในอดีตและปัจจุบันได้อย่างมีประสิทธิภาพ โดย BI Tools สมัยใหม่รองรับทั้ง Descriptive Analytics และ Predictive Analytics ผ่านการใช้ AI และ Machine Learning + +4 ระบบใช้แนวคิด Lakehouse Architecture ซึ่งรวมข้อดีของ Data Lake และ Data Warehouse เข้าด้วยกัน มีขีดความสามารถด้านความเร็ว ความหลากหลาย และการรองรับ AI/ML สูง + +ตัวอย่างระบบ (Backend) +https://data-decision-ops.softwarecraft.tech/docs + +--- + ## 📜 License โปรเจคนี้แจกจ่ายภายใต้ **MIT License** diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipelines/air_quality_data_ingestion_pipeline.py b/pipelines/air_quality_data_ingestion_pipeline.py new file mode 100644 index 0000000..ef6eaa3 --- /dev/null +++ b/pipelines/air_quality_data_ingestion_pipeline.py @@ -0,0 +1,443 @@ +import os +import asyncio +import logging +from dotenv import load_dotenv +import httpx +from datetime import datetime, timezone + +# การ import จาก module +# และ Pydantic models ก็จะอยู่ใน transform_clean +from pipelines.fetch_air_quality import fetch_air_quality +from pipelines.save_to_minio import save_to_minio +from pipelines.send_to_kafka import send_to_kafka +from pipelines.transform_clean import transform_json +from pipelines.load_to_clickhouse import insert_to_clickhouse +from services.logger import AsyncPipelineDBLogger +from services.ckan_service import CKANService + +# =============================== +# โหลดตัวแปรสภาพแวดล้อม +# =============================== +load_dotenv() + +KAFKA_USER = os.getenv("KAFKA_API_USER") +KAFKA_PASS = os.getenv("KAFKA_API_PASS") +KAFKA_REST_URL = os.getenv("KAFKA_REST_PROXY_URL") +DLQ_TOPIC = os.getenv("KAFKA_DLQ_TOPIC") +FORCE_HTTPS = os.getenv("FORCE_HTTPS", "false").lower() == "true" +CKAN_ORG_ID = os.getenv("CKAN_ORGANIZATION_ID") + +# =============================== +# การตั้งค่าการบันทึกข้อมูล +# =============================== +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger("air-quality-pipeline") + +ckan_service = CKANService() + +# =============================== +# Retry helper แบบ async +# =============================== +async def retry_async(func, max_retries=3, backoff=2): + for attempt in range(max_retries): + try: + return await func() + except httpx.HTTPError as e: + wait_time = backoff ** attempt + logger.warning(f"การร้องขอล้มเหลว (ครั้งที่ {attempt+1}/{max_retries}): {e}") + if attempt < max_retries - 1: + logger.info(f"กำลังลองใหม่อีกครั้งใน {wait_time} วินาที...") + await asyncio.sleep(wait_time) + else: + logger.error("ลองใหม่ครบตามจำนวนสูงสุดแล้ว ยกเลิกการดำเนินการ") + raise + return None + +# =============================== +# Fetch -> MinIO -> Kafka +# (ฟังก์ชันนี้ไม่ได้ใช้ใน main_async ที่ปรับปรุงแล้ว แต่เก็บไว้) +# =============================== +async def fetch_and_send_to_kafka(kafka_topic): + logger.info("กำลังดึงข้อมูลคุณภาพอากาศ...") + raw_data = await asyncio.to_thread(fetch_air_quality) + + if raw_data: + logger.info("กำลังบันทึกข้อมูลดิบลงใน MinIO...") + object_name = await asyncio.to_thread(save_to_minio, raw_data) + logger.info(f"บันทึกข้อมูลลงใน MinIO เรียบร้อยแล้ว: {object_name}") + + logger.info(f"กำลังส่งข้อมูลไปยัง Kafka topic '{kafka_topic}'...") + await asyncio.to_thread(send_to_kafka, kafka_topic, raw_data) + else: + logger.warning("ไม่พบข้อมูลที่ดึงมา ข้ามการส่งข้อมูลไปยัง Kafka") + +# =============================== +# Flatten records helper +# =============================== +def flatten_records(records): + """ + แปลง list ซ้อน list หรือ object เป็น list ของ dict + """ + flat = [] + if not records: + return flat + for rec in records: + if isinstance(rec, list): + flat.extend(flatten_records(rec)) + # สมมติว่า model_dump ใช้ได้กับ Pydantic models + elif hasattr(rec, "model_dump"): + flat.append(rec.model_dump()) + elif isinstance(rec, dict): + flat.append(rec) + else: + logger.warning(f"ไม่สามารถ insert record ประเภท {type(rec)}: {rec}") + return flat + +# =============================== +# Transform & Load -> ClickHouse + DLQ +# (ฟังก์ชันนี้ไม่ได้ใช้ใน main_async ที่ปรับปรุงแล้ว) +# =============================== +async def transform_and_load(raw_data, kafka_topic=DLQ_TOPIC, batch_size=100): + try: + transformed_data = await asyncio.to_thread(transform_json, raw_data) + if not transformed_data: + raise ValueError("ไม่มีข้อมูลที่ถูกแปลงแล้ว") + + flat_records = flatten_records(transformed_data) + total_inserted = 0 + batch = [] + + for record in flat_records: + batch.append(record) + if len(batch) >= batch_size: + for rec in batch: + await asyncio.to_thread(insert_to_clickhouse, rec) + total_inserted += len(batch) + batch.clear() + + if batch: + for rec in batch: + await asyncio.to_thread(insert_to_clickhouse, rec) + total_inserted += len(batch) + + logger.info(f"แปลงและโหลดข้อมูลสำเร็จ {total_inserted} รายการ") + + except Exception as e: + logger.error(f"การแปลง/โหลดข้อมูลล้มเหลว: {e}") + await asyncio.to_thread(send_to_kafka, kafka_topic, raw_data) + +# =============================== +# Async Kafka consumer (once + batch insert + sequential + smart retry) +# ปรับปรุง: ใช้ run_id ในชื่อ Consumer +# ปรับปรุง: เพิ่ม Retry Loop ในการรอข้อมูลที่ถูกส่งเข้ามา +# =============================== +async def consume_kafka_once(kafka_topic, run_id: int, batch_size=100, max_retries=3) -> int: + # ใช้ run_id ในการสร้างชื่อ Consumer ที่ไม่ซ้ำกัน + consumer_name = f"air-quality-consumer-{run_id}" + group_url = f"{KAFKA_REST_URL}/consumers/my-group" + total_inserted = 0 + + async with httpx.AsyncClient( + auth=(KAFKA_USER, KAFKA_PASS), + timeout=30.0, + follow_redirects=True + ) as client: + + # สร้าง consumer หรือใช้ consumer เดิม + async def create_consumer(): + try: + resp = await client.post( + group_url, + headers={"Content-Type": "application/vnd.kafka.v2+json"}, + json={ + "name": consumer_name, + "format": "json", + "auto.offset.reset": "earliest" + } + ) + if resp.status_code in (200, 201): + return resp.json()["base_uri"] + elif resp.status_code == 409: + logger.info(f"Consumer {consumer_name} มีอยู่แล้ว ใช้งานต่อ") + return f"{group_url}/instances/{consumer_name}" + else: + logger.error(f"สร้าง consumer ล้มเหลว: {resp.text}") + return None + except httpx.HTTPError as e: + logger.error(f"เกิดข้อผิดพลาด HTTP ขณะสร้าง consumer: {e}") + return None + + base_uri = await create_consumer() + if not base_uri: + return 0 + + # ปรับปรุง: จัดการ base_uri ให้ยืดหยุ่นตาม FORCE_HTTPS + if FORCE_HTTPS and base_uri.startswith("http://"): + base_uri = base_uri.replace("http://", "https://") + logger.info("บังคับใช้ HTTPS สำหรับ Kafka REST Proxy") + + # สมัคร topic + try: + sub_resp = await client.post( + f"{base_uri}/subscription", + headers={"Content-Type": "application/vnd.kafka.v2+json"}, + json={"topics": [kafka_topic]} + ) + if sub_resp.status_code not in (204, 409): + logger.error(f"สมัครรับ topic ล้มเหลว: {sub_resp.text}") + return 0 + logger.info(f"สมัครรับ topic '{kafka_topic}' เรียบร้อยแล้ว") + except httpx.HTTPError as e: + logger.error(f"เกิดข้อผิดพลาด HTTP ขณะสมัคร topic: {e}") + return 0 + + # Fetch records + Smart Retry Loop + for attempt in range(max_retries): + try: + # ใช้ Long Polling Timeout 20.0 วินาที + resp = await client.get( + f"{base_uri}/records", + headers={"Accept": "application/vnd.kafka.json.v2+json"}, + timeout=20.0 + ) + + if resp.status_code == 200: + records = resp.json() + + if not records: + # การควบคุม Retry: หากดึงแล้วได้ Array ว่าง + if attempt < max_retries - 1: + wait_time = 2 + (attempt * 1) # Backoff 2s, 3s, 4s... + logger.warning( + f"ไม่พบข้อมูลใหม่ใน Kafka (ครั้งที่ {attempt+1}/{max_retries}) กำลังรอ {wait_time} วินาที..." + ) + await asyncio.sleep(wait_time) + continue # ลองดึงใหม่ใน loop ถัดไป + + logger.info("ไม่พบข้อมูลใหม่ใน Kafka (จบการดึง)") + return total_inserted # ไม่มีข้อมูลใหม่ หรือ Retry ครบแล้ว + + # --- พบข้อมูล: ทำการ Transform & Load --- + + # Transform & flatten using global function + flat_records = [] + for record in records: + transformed = await asyncio.to_thread(transform_json, record["value"]) + flat_records.extend(flatten_records(transformed)) + + # Batch insert (Sequential) + batch = [] + + for rec in flat_records: + batch.append(rec) + if len(batch) >= batch_size: + + # Sequential Insert + for r in batch: + await asyncio.to_thread(insert_to_clickhouse, r) + + total_inserted += len(batch) + batch.clear() + + if batch: + # Sequential Insert สำหรับ Batch สุดท้าย + for r in batch: + await asyncio.to_thread(insert_to_clickhouse, r) + + total_inserted += len(batch) + + logger.info(f"แปลงและโหลดข้อมูล Kafka จำนวน {len(records)} รายการ (รวมโหลด: {total_inserted} รายการ)") + return total_inserted # จบการดึงครั้งเดียวและ return จำนวนที่โหลดสำเร็จ + + else: + logger.warning(f"เกิดข้อผิดพลาดจาก Kafka ({resp.status_code}): {resp.text}") + await asyncio.sleep(2) + + except httpx.HTTPError as e: + logger.warning(f"เกิดข้อผิดพลาด HTTP ขณะดึงข้อมูลจาก Kafka: {e}") + await asyncio.sleep(2) + except Exception as e: + logger.error(f"เกิดข้อผิดพลาดที่ไม่คาดคิดในการประมวลผล Kafka: {e}") + return total_inserted + + logger.error("ไม่สามารถดึงข้อมูลจาก Kafka หลังจาก retry ครบจำนวนครั้ง") + return total_inserted + +# =============================== +# Pipeline: Fetch -> MinIO -> CKAN -> Kafka +# =============================== +async def fetch_minio_ckan_kafka(): + logger.info("กำลังดึงข้อมูลคุณภาพอากาศ...") + raw_data = await asyncio.to_thread(fetch_air_quality) + if not raw_data: + logger.warning("ไม่พบข้อมูลจาก AIR4THAI") + return None, None + + timestamp = datetime.now(timezone.utc).isoformat() + + # Save raw data to MinIO + object_name = None + minio_url = None + try: + object_name = await asyncio.to_thread(save_to_minio, raw_data) + minio_url = f"s3://air-quality/{object_name}" + logger.info(f"บันทึกข้อมูลลงใน MinIO เรียบร้อยแล้ว: {object_name}") + except Exception as e: + logger.error(f"การบันทึก MinIO ล้มเหลว: {e}") + + # Register raw dataset to CKAN + if object_name and minio_url: + try: + await ckan_service.register_dataset( + dataset_name="air_quality_raw", + resource_name=object_name, + resource_url=minio_url, + owner_org=CKAN_ORG_ID, + metadata={ + "source_url": "http://air4thai.com", + "fetch_time": timestamp, + "format": "json", + "media_type": "application/json", + "description": "Raw air quality data from AIR4THAI" + } + ) + logger.info("ลงทะเบียน Raw dataset ใน CKAN เรียบร้อยแล้ว") + except Exception as e: + logger.warning(f"การลงทะเบียน CKAN (Raw) ล้มเหลว: {e}") + pass + + # ส่งไป Kafka + try: + await asyncio.to_thread(send_to_kafka, "air4thai-stream", raw_data) + logger.info("ส่งข้อมูลไปยัง Kafka เรียบร้อยแล้ว") + except Exception as e: + logger.error(f"การส่ง Kafka ล้มเหลว: {e}") + return None, None + + return raw_data, timestamp + +# =============================== +# Pipeline: Consume Kafka -> Transform -> ClickHouse -> CKAN +# ปรับปรุง: รับ run_id และส่งต่อไปยัง consume_kafka_once +# =============================== +async def consume_transform_clickhouse_ckan(kafka_topic, timestamp, run_id: int, batch_size=100) -> int: + # ใช้ run_id ในการเรียก consumer + total_inserted = await consume_kafka_once(kafka_topic, run_id=run_id, batch_size=batch_size) + + # หลังจากโหลดไป ClickHouse ลงทะเบียน analytical dataset + clickhouse_url = "clickhouse://clickhouse-server/air_quality_db" + + try: + await ckan_service.register_dataset( + dataset_name="air_quality_analytics", + resource_name="air_quality_db", + resource_url=clickhouse_url, + owner_org=CKAN_ORG_ID, + metadata={ + "row_count": total_inserted, + "last_updated": timestamp, + "format": "table", + "media_type": "application/sql", + "description": "Analytical dataset in ClickHouse" + } + ) + logger.info("ลงทะเบียน analytical dataset ใน CKAN เรียบร้อยแล้ว") + except Exception as e: + logger.warning(f"การลงทะเบียน CKAN (Analytics) ล้มเหลว: {e}") + + return total_inserted + +# =============================== +# Async main pipeline +# 💡 ปรับปรุง: ส่ง run_id ไปยัง Task 2 +# =============================== +async def main_async(): + pipeline_name = "air4thai-pipeline" + db_logger = AsyncPipelineDBLogger() + run_id = None + task_logs = [] + + try: + # 1. สร้าง run_id + run_id = await db_logger.insert_pipeline_run(pipeline_name, task_logs) + logger.info(f"เริ่มต้น Pipeline Run ID: {run_id}") + + # ---------------------------------------------------- + # 1. Fetch + MinIO + CKAN + Kafka + # ---------------------------------------------------- + task_name_1 = "fetch_minio_ckan_kafka" + start_time_1 = asyncio.get_event_loop().time() + raw_data, timestamp = await fetch_minio_ckan_kafka() + duration_ms_1 = (asyncio.get_event_loop().time() - start_time_1) * 1000 + + status_1 = "success" if raw_data else "failed" + message_1 = "Pipeline Fetch->MinIO->CKAN->Kafka" + task_logs.append({ + "task_name": task_name_1, + "status": status_1, + "message": message_1, + "duration_ms": duration_ms_1 + }) + logger.info(f"Task '{task_name_1}' สถานะ: {status_1}") + + logger.info("หน่วงเวลา 3 วินาที เพื่อให้ข้อมูล Kafka พร้อมสำหรับการดึง...") + await asyncio.sleep(3) + + if raw_data: + # ---------------------------------------------------- + # 2. Consume Kafka + ClickHouse + CKAN + # ---------------------------------------------------- + task_name_2 = "consume_clickhouse_ckan" + start_time_2 = asyncio.get_event_loop().time() + # ส่ง run_id เข้าไป + total_inserted = await consume_transform_clickhouse_ckan("air4thai-stream", timestamp, run_id=run_id) + duration_ms_2 = (asyncio.get_event_loop().time() - start_time_2) * 1000 + + status_2 = "success" + message_2 = f"Pipeline Kafka->ClickHouse->CKAN. โหลด {total_inserted} รายการ" + task_logs.append({ + "task_name": task_name_2, + "status": status_2, + "message": message_2, + "duration_ms": duration_ms_2 + }) + logger.info(f"Task '{task_name_2}' สถานะ: {status_2}") + + # สิ้นสุดสำเร็จ + final_status = "success" + else: + # Task 1 ล้มเหลว + final_status = "failed" + + # อัปเดตสถานะและ task_logs ครั้งเดียวเมื่อจบ + await db_logger.update_run_status(run_id, final_status, task_logs=task_logs) + logger.info(f"Pipeline Run ID: {run_id} เสร็จสมบูรณ์ด้วยสถานะ: {final_status}") + + + except Exception as e: + error_msg = f"เกิดข้อผิดพลาดร้ายแรง: {e}" + logger.error(error_msg, exc_info=True) + + # อัปเดตสถานะเป็น failed + if run_id: + task_logs.append({ + "task_name": "overall_error", + "status": "failed", + "message": error_msg, + "duration_ms": 0 + }) + await db_logger.update_run_status(run_id, "failed", str(e), task_logs=task_logs) + logger.info(f"Pipeline Run ID: {run_id} อัปเดตสถานะเป็น Failed") + else: + logger.error("ไม่สามารถบันทึกสถานะลงฐานข้อมูลได้เนื่องจากไม่มี Run ID") + + finally: + # ปิด httpx client ของ logger + await db_logger.http_client.aclose() + logger.info("ปิดการเชื่อมต่อ DB Logger แล้ว") + +if __name__ == "__main__": + asyncio.run(main_async()) \ No newline at end of file diff --git a/pipelines/my_flow.py b/pipelines/my_flow.py deleted file mode 100644 index 415c032..0000000 --- a/pipelines/my_flow.py +++ /dev/null @@ -1,118 +0,0 @@ -from pipelines.fetch_air_quality import fetch_air_quality -from pipelines.save_to_minio import save_to_minio -from pipelines.send_to_kafka import send_to_kafka -from pipelines.transform_clean import transform_json -from pipelines.load_to_clickhouse import insert_to_clickhouse - -import os -import json -import time -import requests -from requests.auth import HTTPBasicAuth -from dotenv import load_dotenv - -load_dotenv() - -auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS')) -url = f"{os.getenv('KAFKA_REST_PROXY_URL')}/consumers/my-group" - -def fetch_and_send_to_kafka(kafka_topic): - raw_data = fetch_air_quality() - if raw_data: - object_name = save_to_minio(raw_data) - send_to_kafka(kafka_topic, raw_data) - -def transform_and_load_to_clickhouse(raw_data): - transformed_data = transform_json(raw_data) - if transformed_data: - for record in transformed_data: - #print("record.model_dump(): ", record.model_dump()) - insert_to_clickhouse(record.model_dump()) - -def consume_kafka_and_process(kafka_topic): - # สร้าง Consumer Instance - headers = { - "Content-Type": "application/vnd.kafka.v2+json", - } - data = { - "name": "my_consumer_instance", - "format": "json", - "auto.offset.reset": "earliest", - } - - response = requests.post( - url, - headers=headers, - data=json.dumps(data), - auth=auth, - verify=False # ระวังถ้า SSL เป็น self-signed - ) - if response.status_code != 200: - print(f"Failed to create consumer instance: {response.text}") - return - - consumer_instance = response.json() - base_uri = consumer_instance['base_uri'] - - # Subscribe to topic - subscribe_data = { - "topics": [kafka_topic] - } - sub_resp = requests.post( - f"{base_uri}/subscription", - headers=headers, - data=json.dumps(subscribe_data), - auth=auth, - verify=False - ) - if sub_resp.status_code != 204: - print(f"Failed to subscribe to topic: {sub_resp.text}") - return - - print(f"✅ Subscribed to topic: {kafka_topic}") - - try: - while True: - # Poll for messages - resp = requests.get( - f"{base_uri}/records", - headers={"Accept": "application/vnd.kafka.json.v2+json"}, - auth=auth, - verify=False - ) - if resp.status_code == 200: - records = resp.json() - if records: - for record in records: - raw_data = record['value'] - - # แสดง raw message ที่รับมาจาก Kafka - #print(f"Raw message from Kafka: {raw_data}") - - transform_and_load_to_clickhouse(raw_data) - else: - print("ℹ️ No new records.") - - else: - print(f"⚠️ Error fetching records: {resp.text}") - time.sleep(2) # เพิ่มพัก 2 วินาที ก่อนวนรอบใหม่ - finally: - # Delete Consumer Instance when done - requests.delete( - base_uri, - headers={"Accept": "application/vnd.kafka.v2+json"}, - auth=auth, - verify=False - ) - -def run_flow(): - kafka_topic = "air4thai-stream" - - # Step 1: Fetch and send data to Kafka - fetch_and_send_to_kafka(kafka_topic) - - # Step 2: Consume data from Kafka and load to ClickHouse - consume_kafka_and_process(kafka_topic) - -if __name__ == "__main__": - run_flow()