Update object storage module to use the official Replit Object Storage SDK and add a route to serve files from storage. Replit-Commit-Author: Agent Replit-Commit-Session-Id: cd9a7d26-a4e5-4215-975c-c59f4ed1f06d Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: db73fc59-d876-4299-b896-e021264939d1 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/d0a1d46d-d203-4308-bc6a-312ac7c0243b/cd9a7d26-a4e5-4215-975c-c59f4ed1f06d/05bPjFc
60 lines
1.8 KiB
Python
60 lines
1.8 KiB
Python
"""
|
|
Object Storage Service for FTC Team Website
|
|
Uses official Replit Object Storage SDK
|
|
"""
|
|
|
|
from replit.object_storage import Client
|
|
from uuid import uuid4
|
|
from werkzeug.utils import secure_filename
|
|
import os
|
|
|
|
|
|
class ObjectStorageService:
|
|
def __init__(self):
|
|
"""Initialize the Replit Object Storage client"""
|
|
self.client = Client()
|
|
self.bucket_name = os.environ.get('OBJECT_STORAGE_BUCKET', 'default')
|
|
|
|
def upload_file(self, file, folder='uploads'):
|
|
"""
|
|
Upload a file to object storage
|
|
|
|
Args:
|
|
file: Werkzeug FileStorage object
|
|
folder: Folder name (prefix) for organization
|
|
|
|
Returns:
|
|
str: Path to the uploaded file for serving
|
|
"""
|
|
if not file or not file.filename:
|
|
return None
|
|
|
|
original_filename = secure_filename(file.filename)
|
|
file_extension = os.path.splitext(original_filename)[1]
|
|
unique_filename = f"{uuid4()}{file_extension}"
|
|
object_name = f"{folder}/{unique_filename}"
|
|
|
|
file_content = file.read()
|
|
|
|
self.client.upload_from_bytes(object_name, file_content)
|
|
|
|
return f"/storage/{object_name}"
|
|
|
|
def get_file_bytes(self, path):
|
|
"""Download a file as bytes"""
|
|
object_name = path.replace("/storage/", "")
|
|
return self.client.download_as_bytes(object_name)
|
|
|
|
def delete_file(self, path):
|
|
"""Delete a file from object storage"""
|
|
try:
|
|
object_name = path.replace("/storage/", "")
|
|
self.client.delete(object_name)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def list_files(self):
|
|
"""List all files in the bucket"""
|
|
return self.client.list()
|