from pipelines.fetch_air_quality import fetch_air_quality from pipelines.save_to_minio import save_to_minio from pipelines.send_to_kafka import send_to_kafka from pipelines.transform_clean import transform_json from pipelines.load_to_clickhouse import insert_to_clickhouse import os import json import time import requests from requests.auth import HTTPBasicAuth from dotenv import load_dotenv load_dotenv() auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS')) url = f"{os.getenv('KAFKA_REST_PROXY_URL')}/consumers/my-group" def fetch_and_send_to_kafka(kafka_topic): raw_data = fetch_air_quality() if raw_data: object_name = save_to_minio(raw_data) send_to_kafka(kafka_topic, raw_data) def transform_and_load_to_clickhouse(raw_data): transformed_data = transform_json(raw_data) if transformed_data: for record in transformed_data: #print("record.model_dump(): ", record.model_dump()) insert_to_clickhouse(record.model_dump()) def consume_kafka_and_process(kafka_topic): # สร้าง Consumer Instance headers = { "Content-Type": "application/vnd.kafka.v2+json", } data = { "name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest", } response = requests.post( url, headers=headers, data=json.dumps(data), auth=auth, verify=False # ระวังถ้า SSL เป็น self-signed ) if response.status_code != 200: print(f"Failed to create consumer instance: {response.text}") return consumer_instance = response.json() base_uri = consumer_instance['base_uri'] # Subscribe to topic subscribe_data = { "topics": [kafka_topic] } sub_resp = requests.post( f"{base_uri}/subscription", headers=headers, data=json.dumps(subscribe_data), auth=auth, verify=False ) if sub_resp.status_code != 204: print(f"Failed to subscribe to topic: {sub_resp.text}") return print(f"✅ Subscribed to topic: {kafka_topic}") try: while True: # Poll for messages resp = requests.get( f"{base_uri}/records", headers={"Accept": "application/vnd.kafka.json.v2+json"}, auth=auth, verify=False ) if resp.status_code == 200: records = resp.json() if records: for record in records: raw_data = record['value'] # แสดง raw message ที่รับมาจาก Kafka #print(f"Raw message from Kafka: {raw_data}") transform_and_load_to_clickhouse(raw_data) else: print("ℹ️ No new records.") else: print(f"⚠️ Error fetching records: {resp.text}") time.sleep(2) # เพิ่มพัก 2 วินาที ก่อนวนรอบใหม่ finally: # Delete Consumer Instance when done requests.delete( base_uri, headers={"Accept": "application/vnd.kafka.v2+json"}, auth=auth, verify=False ) def run_flow(): kafka_topic = "air4thai-stream" # Step 1: Fetch and send data to Kafka fetch_and_send_to_kafka(kafka_topic) # Step 2: Consume data from Kafka and load to ClickHouse consume_kafka_and_process(kafka_topic) if __name__ == "__main__": run_flow()