Update object storage module to use the official Replit Object Storage SDK and add a route to serve files from storage. Replit-Commit-Author: Agent Replit-Commit-Session-Id: cd9a7d26-a4e5-4215-975c-c59f4ed1f06d Replit-Commit-Checkpoint-Type: full_checkpoint Replit-Commit-Event-Id: db73fc59-d876-4299-b896-e021264939d1 Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/d0a1d46d-d203-4308-bc6a-312ac7c0243b/cd9a7d26-a4e5-4215-975c-c59f4ed1f06d/05bPjFc
442 lines
15 KiB
Python
442 lines
15 KiB
Python
import os
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify
|
|
from werkzeug.utils import secure_filename
|
|
from datetime import datetime
|
|
from object_storage import ObjectStorageService
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ.get('SECRET_KEY', 'techturb-secret-key-change-in-production')
|
|
app.config['UPLOAD_FOLDER'] = 'static/images'
|
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
|
|
|
|
ADMIN_PASSWORD = 'techturb123'
|
|
|
|
# Initialize Object Storage Service
|
|
try:
|
|
storage_service = ObjectStorageService()
|
|
USING_OBJECT_STORAGE = True
|
|
print(f"Object Storage initialized successfully - using bucket: {storage_service.bucket_name}")
|
|
except ValueError as e:
|
|
print(f"Warning: Object Storage not configured - {e}")
|
|
print("Files will be saved locally (ephemeral storage)")
|
|
storage_service = None
|
|
USING_OBJECT_STORAGE = False
|
|
except Exception as e:
|
|
print(f"Error initializing Object Storage: {e}")
|
|
storage_service = None
|
|
USING_OBJECT_STORAGE = False
|
|
|
|
def get_db_connection():
|
|
conn = psycopg2.connect(os.environ['DATABASE_URL'])
|
|
return conn
|
|
|
|
def upload_file_to_storage(file, folder='uploads'):
|
|
"""
|
|
Upload file to object storage if available, otherwise fall back to local storage
|
|
Returns the file path/URL
|
|
"""
|
|
if not file or not file.filename:
|
|
return None
|
|
|
|
if USING_OBJECT_STORAGE and storage_service:
|
|
try:
|
|
result = storage_service.upload_file(file, folder)
|
|
print(f"File uploaded to object storage: {result}")
|
|
return result
|
|
except Exception as e:
|
|
print(f"Error uploading to object storage: {e}")
|
|
filename = secure_filename(file.filename)
|
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
|
file.save(filepath)
|
|
return f'images/{filename}'
|
|
else:
|
|
filename = secure_filename(file.filename)
|
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
|
file.save(filepath)
|
|
return f'images/{filename}'
|
|
|
|
@app.route('/')
|
|
@app.route('/home')
|
|
def home():
|
|
conn = get_db_connection()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
cur.execute('SELECT * FROM stats ORDER BY key')
|
|
stats = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
return render_template('home.html', stats=stats)
|
|
|
|
@app.route('/contributors')
|
|
def contributors():
|
|
conn = get_db_connection()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
cur.execute('SELECT * FROM members ORDER BY display_order')
|
|
members = cur.fetchall()
|
|
cur.execute('SELECT * FROM mentors ORDER BY display_order')
|
|
mentors = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
return render_template('contributors.html', members=members, mentors=mentors)
|
|
|
|
@app.route('/robot')
|
|
def robot():
|
|
return render_template('robot.html')
|
|
|
|
@app.route('/competitions')
|
|
def competitions():
|
|
conn = get_db_connection()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
cur.execute('SELECT * FROM competitions ORDER BY display_order')
|
|
competitions = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
competitions_by_season = {}
|
|
for comp in competitions:
|
|
season = comp['season']
|
|
if season not in competitions_by_season:
|
|
competitions_by_season[season] = []
|
|
competitions_by_season[season].append(comp)
|
|
|
|
return render_template('competitions.html', competitions_by_season=competitions_by_season)
|
|
|
|
@app.route('/contact')
|
|
def contact():
|
|
return render_template('contact.html')
|
|
|
|
@app.route('/sponsors')
|
|
def sponsors():
|
|
conn = get_db_connection()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
cur.execute('SELECT * FROM sponsors ORDER BY display_order')
|
|
sponsors = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
return render_template('sponsors.html', sponsors=sponsors)
|
|
|
|
@app.route('/robots')
|
|
@app.route('/robots/<type>')
|
|
def robots(type=None):
|
|
if type is None:
|
|
return render_template('robots.html')
|
|
else:
|
|
return render_template(f'robots-{type}.html')
|
|
|
|
@app.route('/admin/login', methods=['GET', 'POST'])
|
|
def admin_login():
|
|
if request.method == 'POST':
|
|
password = request.form.get('password')
|
|
if password == ADMIN_PASSWORD:
|
|
session['admin_logged_in'] = True
|
|
return redirect(url_for('admin_stats'))
|
|
else:
|
|
flash('Incorrect password', 'error')
|
|
return render_template('admin/login.html')
|
|
|
|
@app.route('/admin/logout')
|
|
def admin_logout():
|
|
session.pop('admin_logged_in', None)
|
|
return redirect(url_for('home'))
|
|
|
|
def admin_required(f):
|
|
def decorated_function(*args, **kwargs):
|
|
if not session.get('admin_logged_in'):
|
|
return redirect(url_for('admin_login'))
|
|
return f(*args, **kwargs)
|
|
decorated_function.__name__ = f.__name__
|
|
return decorated_function
|
|
|
|
@app.route('/admin/stats')
|
|
@admin_required
|
|
def admin_stats():
|
|
conn = get_db_connection()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
cur.execute('SELECT * FROM stats ORDER BY key')
|
|
stats = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
return render_template('admin/stats.html', stats=stats)
|
|
|
|
@app.route('/admin/stats/update', methods=['POST'])
|
|
@admin_required
|
|
def update_stat():
|
|
stat_id = request.form.get('id')
|
|
value = request.form.get('value')
|
|
label = request.form.get('label')
|
|
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute('UPDATE stats SET value = %s, label = %s WHERE id = %s', (value, label, stat_id))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
flash('Stat updated successfully', 'success')
|
|
return redirect(url_for('admin_stats'))
|
|
|
|
@app.route('/admin/members')
|
|
@admin_required
|
|
def admin_members():
|
|
conn = get_db_connection()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
cur.execute('SELECT * FROM members ORDER BY display_order')
|
|
members = cur.fetchall()
|
|
cur.execute('SELECT * FROM mentors ORDER BY display_order')
|
|
mentors = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
return render_template('admin/members.html', members=members, mentors=mentors)
|
|
|
|
@app.route('/admin/member/add', methods=['POST'])
|
|
@admin_required
|
|
def add_member():
|
|
name = request.form.get('name')
|
|
role = request.form.get('role')
|
|
member_type = request.form.get('type')
|
|
|
|
# Upload image to object storage
|
|
image_path = 'images/default.jpg'
|
|
if 'image' in request.files:
|
|
file = request.files['image']
|
|
uploaded_path = upload_file_to_storage(file, folder='members')
|
|
if uploaded_path:
|
|
image_path = uploaded_path
|
|
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
|
|
if member_type == 'mentor':
|
|
cur.execute('INSERT INTO mentors (name, role, image_path) VALUES (%s, %s, %s)', (name, role, image_path))
|
|
else:
|
|
cur.execute('INSERT INTO members (name, role, image_path) VALUES (%s, %s, %s)', (name, role, image_path))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
flash(f'{"Mentor" if member_type == "mentor" else "Member"} added successfully', 'success')
|
|
return redirect(url_for('admin_members'))
|
|
|
|
@app.route('/admin/member/update', methods=['POST'])
|
|
@admin_required
|
|
def update_member():
|
|
member_id = request.form.get('id')
|
|
name = request.form.get('name')
|
|
role = request.form.get('role')
|
|
member_type = request.form.get('type')
|
|
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
|
|
# Upload image to object storage
|
|
image_path = None
|
|
if 'image' in request.files:
|
|
file = request.files['image']
|
|
image_path = upload_file_to_storage(file, folder='members')
|
|
|
|
if member_type == 'mentor':
|
|
if image_path:
|
|
cur.execute('UPDATE mentors SET name = %s, role = %s, image_path = %s WHERE id = %s', (name, role, image_path, member_id))
|
|
else:
|
|
cur.execute('UPDATE mentors SET name = %s, role = %s WHERE id = %s', (name, role, member_id))
|
|
else:
|
|
if image_path:
|
|
cur.execute('UPDATE members SET name = %s, role = %s, image_path = %s WHERE id = %s', (name, role, image_path, member_id))
|
|
else:
|
|
cur.execute('UPDATE members SET name = %s, role = %s WHERE id = %s', (name, role, member_id))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
flash(f'{"Mentor" if member_type == "mentor" else "Member"} updated successfully', 'success')
|
|
return redirect(url_for('admin_members'))
|
|
|
|
@app.route('/admin/member/delete', methods=['POST'])
|
|
@admin_required
|
|
def delete_member():
|
|
member_id = request.form.get('id')
|
|
member_type = request.form.get('type')
|
|
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
|
|
if member_type == 'mentor':
|
|
cur.execute('DELETE FROM mentors WHERE id = %s', (member_id,))
|
|
else:
|
|
cur.execute('DELETE FROM members WHERE id = %s', (member_id,))
|
|
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
flash(f'{"Mentor" if member_type == "mentor" else "Member"} deleted successfully', 'success')
|
|
return redirect(url_for('admin_members'))
|
|
|
|
@app.route('/admin/competitions')
|
|
@admin_required
|
|
def admin_competitions():
|
|
conn = get_db_connection()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
cur.execute('SELECT * FROM competitions ORDER BY display_order')
|
|
competitions = cur.fetchall()
|
|
cur.execute('SELECT DISTINCT season FROM competitions ORDER BY season DESC')
|
|
seasons = [row['season'] for row in cur.fetchall()]
|
|
cur.close()
|
|
conn.close()
|
|
return render_template('admin/competitions.html', competitions=competitions, seasons=seasons)
|
|
|
|
@app.route('/admin/competition/add', methods=['POST'])
|
|
@admin_required
|
|
def add_competition():
|
|
season = request.form.get('season')
|
|
event_name = request.form.get('event_name')
|
|
date = request.form.get('date')
|
|
description = request.form.get('description')
|
|
awards_raw = request.form.get('awards')
|
|
awards = '|'.join([line.strip() for line in awards_raw.split('\n') if line.strip()])
|
|
|
|
# Upload image to object storage
|
|
image_path = None
|
|
if 'image' in request.files:
|
|
file = request.files['image']
|
|
image_path = upload_file_to_storage(file, folder='competitions')
|
|
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute('INSERT INTO competitions (season, event_name, date, description, awards, image_path) VALUES (%s, %s, %s, %s, %s, %s)',
|
|
(season, event_name, date, description, awards, image_path))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
flash('Competition added successfully', 'success')
|
|
return redirect(url_for('admin_competitions'))
|
|
|
|
@app.route('/admin/competition/edit', methods=['POST'])
|
|
@admin_required
|
|
def edit_competition():
|
|
competition_id = request.form.get('id')
|
|
season = request.form.get('season')
|
|
event_name = request.form.get('event_name')
|
|
date = request.form.get('date')
|
|
description = request.form.get('description')
|
|
awards_raw = request.form.get('awards')
|
|
awards = '|'.join([line.strip() for line in awards_raw.split('\n') if line.strip()]) if awards_raw else ''
|
|
|
|
conn = get_db_connection()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
|
# Get current competition to check for existing image
|
|
cur.execute('SELECT image_path FROM competitions WHERE id = %s', (competition_id,))
|
|
current_comp = cur.fetchone()
|
|
image_path = current_comp['image_path'] if current_comp else None
|
|
|
|
# Handle new image upload to object storage
|
|
if 'image' in request.files:
|
|
file = request.files['image']
|
|
uploaded_path = upload_file_to_storage(file, folder='competitions')
|
|
if uploaded_path:
|
|
image_path = uploaded_path
|
|
|
|
# Update competition
|
|
cur.execute('''UPDATE competitions
|
|
SET season = %s, event_name = %s, date = %s,
|
|
description = %s, awards = %s, image_path = %s
|
|
WHERE id = %s''',
|
|
(season, event_name, date, description, awards, image_path, competition_id))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
flash('Competition updated successfully', 'success')
|
|
return redirect(url_for('admin_competitions'))
|
|
|
|
@app.route('/admin/competition/delete', methods=['POST'])
|
|
@admin_required
|
|
def delete_competition():
|
|
competition_id = request.form.get('id')
|
|
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute('DELETE FROM competitions WHERE id = %s', (competition_id,))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
flash('Competition deleted successfully', 'success')
|
|
return redirect(url_for('admin_competitions'))
|
|
|
|
@app.route('/admin/sponsors')
|
|
@admin_required
|
|
def admin_sponsors():
|
|
conn = get_db_connection()
|
|
cur = conn.cursor(cursor_factory=RealDictCursor)
|
|
cur.execute('SELECT * FROM sponsors ORDER BY display_order')
|
|
sponsors = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
return render_template('admin/sponsors.html', sponsors=sponsors)
|
|
|
|
@app.route('/admin/sponsor/add', methods=['POST'])
|
|
@admin_required
|
|
def add_sponsor():
|
|
name = request.form.get('name')
|
|
website_url = request.form.get('website_url')
|
|
|
|
# Upload logo to object storage
|
|
logo_path = None
|
|
if 'logo' in request.files:
|
|
file = request.files['logo']
|
|
logo_path = upload_file_to_storage(file, folder='sponsors')
|
|
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute('INSERT INTO sponsors (name, logo_path, website_url) VALUES (%s, %s, %s)', (name, logo_path, website_url))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
flash('Sponsor added successfully', 'success')
|
|
return redirect(url_for('admin_sponsors'))
|
|
|
|
@app.route('/admin/sponsor/delete', methods=['POST'])
|
|
@admin_required
|
|
def delete_sponsor():
|
|
sponsor_id = request.form.get('id')
|
|
|
|
conn = get_db_connection()
|
|
cur = conn.cursor()
|
|
cur.execute('DELETE FROM sponsors WHERE id = %s', (sponsor_id,))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
flash('Sponsor deleted successfully', 'success')
|
|
return redirect(url_for('admin_sponsors'))
|
|
|
|
@app.route('/storage/<path:filepath>')
|
|
def serve_storage_file(filepath):
|
|
"""Serve files from object storage"""
|
|
try:
|
|
if USING_OBJECT_STORAGE and storage_service:
|
|
file_bytes = storage_service.get_file_bytes(f"/storage/{filepath}")
|
|
ext = filepath.rsplit('.', 1)[-1].lower() if '.' in filepath else ''
|
|
content_types = {
|
|
'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png',
|
|
'gif': 'image/gif', 'webp': 'image/webp', 'svg': 'image/svg+xml'
|
|
}
|
|
content_type = content_types.get(ext, 'application/octet-stream')
|
|
from flask import Response
|
|
return Response(file_bytes, mimetype=content_type)
|
|
else:
|
|
return "Object storage not configured", 404
|
|
except Exception as e:
|
|
print(f"Error serving file {filepath}: {e}")
|
|
return "File not found", 404
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True, host='0.0.0.0', port=5000)
|