31 lines
885 B
Python
31 lines
885 B
Python
import os
|
|
import json
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
from requests.auth import HTTPBasicAuth
|
|
|
|
load_dotenv()
|
|
|
|
# ใช้ environment variable แทน hardcoded URL
|
|
KAFKA_REST_PROXY = os.getenv("KAFKA_REST_PROXY_URL")
|
|
auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS'))
|
|
|
|
def send_to_kafka(topic, data):
|
|
headers = {
|
|
'Content-Type': 'application/vnd.kafka.json.v2+json'
|
|
}
|
|
|
|
payload = {
|
|
"records": [
|
|
{"value": data}
|
|
]
|
|
}
|
|
|
|
url = f"{KAFKA_REST_PROXY}/topics/{topic}"
|
|
response = requests.post(url, headers=headers, auth=auth, timeout=5 , data=json.dumps(payload), verify=True)
|
|
|
|
if response.status_code == 200 or response.status_code == 202:
|
|
print(f"Sent data to Kafka topic: {topic}")
|
|
else:
|
|
print(f"Failed to send to Kafka: {response.status_code} {response.text}")
|