th-air-quality-etl-ml/pipelines/send_to_kafka.py

30 lines
895 B
Python

import os
import json
import requests
from dotenv import load_dotenv
from requests.auth import HTTPBasicAuth
load_dotenv()
KAFKA_REST_PROXY = "https://kafka-rest-proxy.softwarecraft.tech" # เปลี่ยนให้เป็น https://
auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS'))
def send_to_kafka(topic, data):
headers = {
'Content-Type': 'application/vnd.kafka.json.v2+json'
}
payload = {
"records": [
{"value": data}
]
}
url = f"{KAFKA_REST_PROXY}/topics/{topic}"
response = requests.post(url, headers=headers, auth=auth, timeout=5 , data=json.dumps(payload), verify=True)
if response.status_code == 200 or response.status_code == 202:
print(f"Sent data to Kafka topic: {topic}")
else:
print(f"Failed to send to Kafka: {response.status_code} {response.text}")