# test_connections.py import os from dotenv import load_dotenv import requests from minio import Minio import clickhouse_connect from requests.auth import HTTPBasicAuth load_dotenv() # ----- 1. Test Kafka REST Proxy ----- def test_kafka_rest(): url = f"{os.getenv('KAFKA_REST_PROXY_URL')}/topics" auth = HTTPBasicAuth(os.getenv('KAFKA_API_USER'), os.getenv('KAFKA_API_PASS')) try: r = requests.get(url, auth=auth, timeout=5) r.raise_for_status() print("✅ Kafka REST Proxy Connected. Topics:", r.json()) except Exception as e: print("❌ Kafka REST Proxy Error:", e) # ----- 2. Test MinIO ----- def test_minio(): try: minio_client = Minio( endpoint=os.getenv('MINIO_ENDPOINT').replace('https://', '').replace('http://', ''), access_key=os.getenv('MINIO_ACCESS_KEY'), secret_key=os.getenv('MINIO_SECRET_KEY'), secure=os.getenv('MINIO_ENDPOINT').startswith('https') ) buckets = minio_client.list_buckets() print("✅ MinIO Connected. Buckets:", [b.name for b in buckets]) except Exception as e: print("❌ MinIO Error:", e) # ----- 3. Test ClickHouse ----- def test_clickhouse(): try: # อ่านค่าจาก .env ch_host = os.getenv("CLICKHOUSE_HOST").replace("https://", "") ch_user = os.getenv("CLICKHOUSE_USER") ch_password = os.getenv("CLICKHOUSE_PASSWORD") # สร้าง client ด้วย clickhouse-connect client = clickhouse_connect.get_client( host=ch_host, # ระบุ Host โดยไม่ต้องใช้ https:// อีก port=443, # ใช้พอร์ต HTTPS 443 username=ch_user, # ชื่อผู้ใช้ password=ch_password, # รหัสผ่าน secure=True # ใช้การเชื่อมต่อที่ปลอดภัย (HTTPS) ) # ทดสอบการเชื่อมต่อด้วยคำสั่ง SQL result = client.query('SELECT now()') print("✅ ClickHouse Connected. Time:", result.result_rows[0][0]) except Exception as e: print("❌ ClickHouse Error:", e) if __name__ == '__main__': test_kafka_rest() test_minio() test_clickhouse() #test_prefect_api()