import os import json import requests from dotenv import load_dotenv from requests.auth import HTTPBasicAuth load_dotenv() # ใช้ environment variable แทน hardcoded URL KAFKA_REST_PROXY = os.getenv("KAFKA_REST_PROXY_URL") auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS')) def send_to_kafka(topic, data): headers = { 'Content-Type': 'application/vnd.kafka.json.v2+json' } payload = { "records": [ {"value": data} ] } url = f"{KAFKA_REST_PROXY}/topics/{topic}" response = requests.post(url, headers=headers, auth=auth, timeout=5 , data=json.dumps(payload), verify=True) if response.status_code == 200 or response.status_code == 202: print(f"Sent data to Kafka topic: {topic}") else: print(f"Failed to send to Kafka: {response.status_code} {response.text}")