32 lines
1.3 KiB
Python

import os
from dotenv import load_dotenv
from minio import Minio
import json
from io import BytesIO
from datetime import datetime, timezone
load_dotenv()
def save_to_minio(data):
bucket_name = "air-quality"
try:
minio_client = Minio(
endpoint=os.getenv('MINIO_ENDPOINT').replace('https://', '').replace('http://', ''),
access_key=os.getenv('MINIO_ACCESS_KEY'),
secret_key=os.getenv('MINIO_SECRET_KEY'),
secure=os.getenv('MINIO_ENDPOINT').startswith('https')
)
# ใช้ timezone-aware datetime
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d-%H%M%S')
object_name = f"raw/{timestamp}.json"
# เปลี่ยนข้อมูลเป็น JSON และใช้ ensure_ascii=False เพื่อรองรับภาษาไทย
data_json = json.dumps(data, ensure_ascii=False, indent=4)
# แปลงข้อมูล JSON เป็น bytes และบันทึกลงใน MinIO
data_bytes = BytesIO(data_json.encode('utf-8'))
minio_client.put_object(bucket_name, object_name, data_bytes, length=data_bytes.getbuffer().nbytes)
return object_name
except Exception as e:
print("MinIO Error:", e)