from http.server import BaseHTTPRequestHandler, HTTPServer from http import cookiejar import json import os import threading import urllib.parse import urllib.request CLIENT_ID = os.environ["CLIENT_ID"] SUBJECT = os.environ["SUBJECT"] REDIRECT_URI = os.environ["REDIRECT_URI"] SCOPE = os.environ["SCOPE"] STATE = os.environ["STATE"] NONCE = os.environ["NONCE"] ADMIN_BASE = os.environ.get("HYDRA_ADMIN_URL", "http://127.0.0.1:4445") PUBLIC_BASE = os.environ.get("HYDRA_PUBLIC_URL", "http://127.0.0.1:4444") def _put_json(url: str, payload: dict) -> dict: data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, data=data, method="PUT") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=5) as resp: body = resp.read().decode("utf-8") return json.loads(body) if body else {} def accept_login(challenge: str) -> str: url = f"{ADMIN_BASE}/oauth2/auth/requests/login/accept?login_challenge={urllib.parse.quote(challenge)}" payload = {"subject": SUBJECT, "remember": True, "remember_for": 3600} data = _put_json(url, payload) return data.get("redirect_to", "") def accept_consent(challenge: str) -> str: url = f"{ADMIN_BASE}/oauth2/auth/requests/consent/accept?consent_challenge={urllib.parse.quote(challenge)}" payload = {"grant_scope": ["openid", "profile", "email"], "remember": True, "remember_for": 3600} data = _put_json(url, payload) return data.get("redirect_to", "") def _location_from_response(url: str, cookie_header: str | None) -> str: req = urllib.request.Request(url, method="GET") if cookie_header: req.add_header("Cookie", cookie_header) opener = urllib.request.build_opener(NoRedirect()) try: opener.open(req, timeout=5) except urllib.error.HTTPError as err: return err.headers.get("Location", "") return "" class NoRedirect(urllib.request.HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): raise urllib.error.HTTPError(newurl, code, msg, headers, fp) class Handler(BaseHTTPRequestHandler): def do_GET(self): parsed = urllib.parse.urlparse(self.path) params = urllib.parse.parse_qs(parsed.query) login_challenge = (params.get("login_challenge") or [""])[0] consent_challenge = (params.get("consent_challenge") or [""])[0] login_verifier = (params.get("login_verifier") or [""])[0] consent_verifier = (params.get("consent_verifier") or [""])[0] if parsed.path == "/oauth2/auth" and consent_verifier: query = urllib.parse.urlencode({ "consent_verifier": consent_verifier, "client_id": (params.get("client_id") or [""])[0], "redirect_uri": (params.get("redirect_uri") or [""])[0], "response_type": (params.get("response_type") or [""])[0], "scope": (params.get("scope") or [""])[0], "state": (params.get("state") or [""])[0], "nonce": (params.get("nonce") or [""])[0], }) public_url = f"{PUBLIC_BASE}/oauth2/auth?{query}" location = _location_from_response(public_url, self.headers.get("Cookie")) print(f"consent_verifier_location={location}") if not location: self.send_response(400) self.end_headers() self.wfile.write(b"missing redirect location") return self.send_response(302) self.send_header("Location", location) self.end_headers() return if parsed.path == "/oauth2/auth" and login_verifier: query = urllib.parse.urlencode({ "login_verifier": login_verifier, "client_id": (params.get("client_id") or [""])[0], "redirect_uri": (params.get("redirect_uri") or [""])[0], "response_type": (params.get("response_type") or [""])[0], "scope": (params.get("scope") or [""])[0], "state": (params.get("state") or [""])[0], "nonce": (params.get("nonce") or [""])[0], }) public_url = f"{PUBLIC_BASE}/oauth2/auth?{query}" location = _location_from_response(public_url, self.headers.get("Cookie")) print(f"login_verifier_location={location}") if not location: self.send_response(400) self.end_headers() self.wfile.write(b"missing redirect location") return consent_challenge = urllib.parse.parse_qs(urllib.parse.urlparse(location).query).get( "consent_challenge", [""], )[0] if not consent_challenge: self.send_response(400) self.end_headers() self.wfile.write(f"missing consent_challenge location={location}".encode("utf-8")) return redirect_to = accept_consent(consent_challenge) if not redirect_to: self.send_response(500) self.end_headers() self.wfile.write(b"consent accept failed") return self.send_response(302) self.send_header("Location", redirect_to) self.end_headers() return if login_challenge: redirect_to = accept_login(login_challenge) elif consent_challenge: redirect_to = accept_consent(consent_challenge) else: redirect_to = "" if not redirect_to: self.send_response(400) self.end_headers() self.wfile.write(b"missing challenge") return self.send_response(302) self.send_header("Location", redirect_to) self.end_headers() def log_message(self, format, *args): return class StopAtRedirect(urllib.request.HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): if newurl.startswith(REDIRECT_URI): raise urllib.error.HTTPError(newurl, code, msg, headers, fp) return super().redirect_request(req, fp, code, msg, headers, newurl) def main(): server = HTTPServer(("127.0.0.1", 3000), Handler) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() encoded_redirect = urllib.parse.quote(REDIRECT_URI, safe="") encoded_scope = urllib.parse.quote(SCOPE, safe="") auth_url = ( f"{PUBLIC_BASE}/oauth2/auth?response_type=code" f"&client_id={CLIENT_ID}" f"&redirect_uri={encoded_redirect}" f"&scope={encoded_scope}" f"&state={STATE}" f"&nonce={NONCE}" ) jar = cookiejar.CookieJar() opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar), StopAtRedirect()) try: opener.open(auth_url, timeout=10) except urllib.error.HTTPError as err: body = err.read().decode("utf-8") if hasattr(err, "read") else "" print(f"error_url={err.geturl()}") print(f"error_code={err.code}") if body: print(f"error_body={body}") finally: server.shutdown() if __name__ == "__main__": main()