Spaces:
Sleeping
Sleeping
File size: 4,391 Bytes
79b10c5 ff80fbe 4d0ec3e f700a32 79b10c5 4d0ec3e 79b10c5 4d0ec3e 26e0ddc 79b10c5 e7c157d 79b10c5 ff80fbe 3e8d00a ff80fbe 1ed50ec ff80fbe 79b10c5 ff80fbe 79b10c5 ff80fbe 79b10c5 ff80fbe 79b10c5 f700a32 79b10c5 ff80fbe f700a32 ff80fbe 79b10c5 ff80fbe 79b10c5 ff80fbe 01f7187 ff80fbe 79b10c5 ff80fbe f700a32 ff80fbe 79b10c5 ff80fbe 54210e7 ff80fbe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pathlib import Path
import shutil
import zipfile
import logging
import tempfile
from typing import Set
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class SiteManager:
def __init__(self):
self.sites_dir = Path("/app/sites")
self.sites_dir.mkdir(parents=True, exist_ok=True)
self.active_sites: Set[str] = set()
self._load_existing_sites()
def _load_existing_sites(self):
"""Load existing sites from disk"""
for site_dir in self.sites_dir.iterdir():
if site_dir.is_dir() and (site_dir / 'index.html').exists():
self.active_sites.add(site_dir.name)
# Mount the site directory
app.mount(f"/{site_dir.name}", StaticFiles(directory=str(site_dir), html=True), name=site_dir.name)
logger.info(f"Loaded site: {site_dir.name}")
async def deploy_site(self, unique_id: str, zip_file: UploadFile) -> dict:
"""Deploy a new site from a ZIP file"""
site_path = self.sites_dir / unique_id
temp_file = None
try:
# Create a temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False)
# Read the uploaded file content
content = await zip_file.read()
# Write to the temporary file
temp_file.write(content)
temp_file.close()
# Extract ZIP contents
with zipfile.ZipFile(temp_file.name) as zip_ref:
# Clear existing site if present
if site_path.exists():
shutil.rmtree(site_path)
# Extract ZIP contents
zip_ref.extractall(site_path)
# Mount the new site
app.mount(f"/{unique_id}", StaticFiles(directory=str(site_path), html=True), name=unique_id)
self.active_sites.add(unique_id)
return {
"status": "success",
"message": f"Site deployed at /{unique_id}",
"url": f"/{unique_id}"
}
except Exception as e:
logger.error(f"Error deploying site {unique_id}: {str(e)}")
if site_path.exists():
shutil.rmtree(site_path)
raise HTTPException(status_code=500, detail=str(e))
finally:
# Clean up the temporary file
if temp_file:
try:
Path(temp_file.name).unlink()
except:
pass
def remove_site(self, unique_id: str) -> bool:
"""Remove a deployed site"""
if unique_id in self.active_sites:
site_path = self.sites_dir / unique_id
if site_path.exists():
shutil.rmtree(site_path)
self.active_sites.remove(unique_id)
return True
return False
# Initialize site manager
site_manager = SiteManager()
@app.post("/deploy/{unique_id}")
async def deploy_site(unique_id: str, file: UploadFile = File(...)):
"""Deploy a new site from a ZIP file"""
if not file.filename.endswith('.zip'):
raise HTTPException(status_code=400, detail="File must be a ZIP archive")
return await site_manager.deploy_site(unique_id, file)
@app.delete("/site/{unique_id}")
async def remove_site(unique_id: str):
"""Remove a deployed site"""
if site_manager.remove_site(unique_id):
return {"status": "success", "message": f"Site {unique_id} removed"}
raise HTTPException(status_code=404, detail="Site not found")
@app.get("/sites")
async def list_sites():
"""List all deployed sites"""
return {"sites": list(site_manager.active_sites)}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "sites_count": len(site_manager.active_sites)}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |