import inspect import os from pathlib import Path from typing import Any from authlib.integrations.starlette_client import OAuth, OAuthError from dotenv import load_dotenv from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from starlette.middleware.sessions import SessionMiddleware load_dotenv() BASE_DIR = Path(__file__).resolve().parent ALLOWED_DOMAIN = "dbxcorp.co.kr" ALLOWED_EMAILS = { "king@dbxcorp.co.kr", "julie@dbxcorp.co.kr", "ellen@dbxcorp.co.kr", "bj@dbxcorp.co.kr", } def env(name: str, default: str = "") -> str: return os.getenv(name, default).strip() def build_google_oauth() -> OAuth: oauth = OAuth() oauth.register( name="google", client_id=env("GOOGLE_CLIENT_ID"), client_secret=env("GOOGLE_CLIENT_SECRET"), server_metadata_url="https://accounts.google.com/.well-known/openid-configuration", client_kwargs={"scope": "openid email profile"}, ) return oauth app = FastAPI(title="DBX 메인 페이지") app.add_middleware( SessionMiddleware, secret_key=env("SESSION_SECRET_KEY", "change-this-session-secret"), https_only=env("SESSION_COOKIE_SECURE", "true").lower() == "true", same_site="lax", max_age=60 * 60 * 8, ) app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static") templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) oauth = build_google_oauth() def public_url_for(request: Request, route_name: str) -> str: public_base_url = env("PUBLIC_BASE_URL").rstrip("/") if public_base_url: return f"{public_base_url}{request.url_for(route_name).path}" return str(request.url_for(route_name)) def get_user(request: Request) -> dict[str, Any] | None: user = request.session.get("user") return user if isinstance(user, dict) else None def render_template( request: Request, name: str, context: dict[str, Any] | None = None, status_code: int = 200, ) -> HTMLResponse: template_context = {"request": request, **(context or {})} first_param = next(iter(inspect.signature(templates.TemplateResponse).parameters)) if first_param == "request": return templates.TemplateResponse( request, name, template_context, status_code=status_code, ) return templates.TemplateResponse( name, template_context, status_code=status_code, ) def is_allowed_google_user(userinfo: dict[str, Any]) -> tuple[bool, str]: email = str(userinfo.get("email", "")).lower().strip() email_verified = bool(userinfo.get("email_verified")) domain = email.rsplit("@", 1)[-1] if "@" in email else "" if not email_verified: return False, "Google 계정 이메일 인증이 확인되지 않았습니다." if domain != ALLOWED_DOMAIN: return False, "회사 Google Workspace 계정만 접속할 수 있습니다." if email not in ALLOWED_EMAILS: return False, "접속 허용 목록에 없는 계정입니다." return True, "" @app.get("/", response_class=HTMLResponse) async def home(request: Request) -> HTMLResponse: user = get_user(request) if not user: return render_template(request, "login.html") menu_items = [ { "title": "CS 발주 업무", "description": "CS 발주 업무 페이지로 이동", "url": env("CS_ORDER_URL", "#"), }, { "title": "고객 주문리스트 프로그램", "description": "고객 주문리스트 프로그램으로 이동", "url": env("CUSTOMER_ORDER_LIST_URL", "#"), }, ] return render_template( request, "main.html", {"user": user, "menu_items": menu_items}, ) @app.get("/login") async def login(request: Request): if not env("GOOGLE_CLIENT_ID") or not env("GOOGLE_CLIENT_SECRET"): return render_template( request, "denied.html", {"reason": "Google OAuth 환경 변수가 아직 설정되지 않았습니다."}, status_code=500, ) redirect_uri = public_url_for(request, "auth_google") return await oauth.google.authorize_redirect( request, redirect_uri, hd=ALLOWED_DOMAIN, prompt="select_account", ) @app.get("/auth/google") async def auth_google(request: Request): try: token = await oauth.google.authorize_access_token(request) userinfo = token.get("userinfo") if userinfo is None: userinfo = await oauth.google.userinfo(token=token) except OAuthError as exc: return render_template( request, "denied.html", {"reason": f"Google 로그인 실패: {exc.error}"}, status_code=401, ) allowed, reason = is_allowed_google_user(dict(userinfo)) if not allowed: request.session.clear() return render_template( request, "denied.html", {"reason": reason}, status_code=403, ) request.session["user"] = { "email": str(userinfo.get("email", "")).lower().strip(), "name": userinfo.get("name") or userinfo.get("email"), "picture": userinfo.get("picture", ""), } return RedirectResponse(url="/", status_code=303) @app.get("/logout") async def logout(request: Request) -> RedirectResponse: request.session.clear() return RedirectResponse(url="/", status_code=303) @app.get("/healthz") async def healthz() -> dict[str, str]: return {"status": "ok"}