119 lines
3.6 KiB
Python
119 lines
3.6 KiB
Python
from pipelines.fetch_air_quality import fetch_air_quality
|
||
from pipelines.save_to_minio import save_to_minio
|
||
from pipelines.send_to_kafka import send_to_kafka
|
||
from pipelines.transform_clean import transform_json
|
||
from pipelines.load_to_clickhouse import insert_to_clickhouse
|
||
|
||
import os
|
||
import json
|
||
import time
|
||
import requests
|
||
from requests.auth import HTTPBasicAuth
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS'))
|
||
url = f"{os.getenv('KAFKA_REST_PROXY_URL')}/consumers/my-group"
|
||
|
||
def fetch_and_send_to_kafka(kafka_topic):
|
||
raw_data = fetch_air_quality()
|
||
if raw_data:
|
||
object_name = save_to_minio(raw_data)
|
||
send_to_kafka(kafka_topic, raw_data)
|
||
|
||
def transform_and_load_to_clickhouse(raw_data):
|
||
transformed_data = transform_json(raw_data)
|
||
if transformed_data:
|
||
for record in transformed_data:
|
||
#print("record.model_dump(): ", record.model_dump())
|
||
insert_to_clickhouse(record.model_dump())
|
||
|
||
def consume_kafka_and_process(kafka_topic):
|
||
# สร้าง Consumer Instance
|
||
headers = {
|
||
"Content-Type": "application/vnd.kafka.v2+json",
|
||
}
|
||
data = {
|
||
"name": "my_consumer_instance",
|
||
"format": "json",
|
||
"auto.offset.reset": "earliest",
|
||
}
|
||
|
||
response = requests.post(
|
||
url,
|
||
headers=headers,
|
||
data=json.dumps(data),
|
||
auth=auth,
|
||
verify=False # ระวังถ้า SSL เป็น self-signed
|
||
)
|
||
if response.status_code != 200:
|
||
print(f"Failed to create consumer instance: {response.text}")
|
||
return
|
||
|
||
consumer_instance = response.json()
|
||
base_uri = consumer_instance['base_uri']
|
||
|
||
# Subscribe to topic
|
||
subscribe_data = {
|
||
"topics": [kafka_topic]
|
||
}
|
||
sub_resp = requests.post(
|
||
f"{base_uri}/subscription",
|
||
headers=headers,
|
||
data=json.dumps(subscribe_data),
|
||
auth=auth,
|
||
verify=False
|
||
)
|
||
if sub_resp.status_code != 204:
|
||
print(f"Failed to subscribe to topic: {sub_resp.text}")
|
||
return
|
||
|
||
print(f"✅ Subscribed to topic: {kafka_topic}")
|
||
|
||
try:
|
||
while True:
|
||
# Poll for messages
|
||
resp = requests.get(
|
||
f"{base_uri}/records",
|
||
headers={"Accept": "application/vnd.kafka.json.v2+json"},
|
||
auth=auth,
|
||
verify=False
|
||
)
|
||
if resp.status_code == 200:
|
||
records = resp.json()
|
||
if records:
|
||
for record in records:
|
||
raw_data = record['value']
|
||
|
||
# แสดง raw message ที่รับมาจาก Kafka
|
||
#print(f"Raw message from Kafka: {raw_data}")
|
||
|
||
transform_and_load_to_clickhouse(raw_data)
|
||
else:
|
||
print("ℹ️ No new records.")
|
||
|
||
else:
|
||
print(f"⚠️ Error fetching records: {resp.text}")
|
||
time.sleep(2) # เพิ่มพัก 2 วินาที ก่อนวนรอบใหม่
|
||
finally:
|
||
# Delete Consumer Instance when done
|
||
requests.delete(
|
||
base_uri,
|
||
headers={"Accept": "application/vnd.kafka.v2+json"},
|
||
auth=auth,
|
||
verify=False
|
||
)
|
||
|
||
def run_flow():
|
||
kafka_topic = "air4thai-stream"
|
||
|
||
# Step 1: Fetch and send data to Kafka
|
||
fetch_and_send_to_kafka(kafka_topic)
|
||
|
||
# Step 2: Consume data from Kafka and load to ClickHouse
|
||
consume_kafka_and_process(kafka_topic)
|
||
|
||
if __name__ == "__main__":
|
||
run_flow()
|