1
0
forked from baron/baron-sso
Files
baron-sso/devfront/hydra-rp-dummy.py
2026-01-30 17:56:42 +09:00

189 lines
7.2 KiB
Python

from http.server import BaseHTTPRequestHandler, HTTPServer
from http import cookiejar
import json
import os
import threading
import urllib.parse
import urllib.request
CLIENT_ID = os.environ["CLIENT_ID"]
SUBJECT = os.environ["SUBJECT"]
REDIRECT_URI = os.environ["REDIRECT_URI"]
SCOPE = os.environ["SCOPE"]
STATE = os.environ["STATE"]
NONCE = os.environ["NONCE"]
ADMIN_BASE = os.environ.get("HYDRA_ADMIN_URL", "http://127.0.0.1:4445")
PUBLIC_BASE = os.environ.get("HYDRA_PUBLIC_URL", "http://127.0.0.1:4444")
def _put_json(url: str, payload: dict) -> dict:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, method="PUT")
req.add_header("Content-Type", "application/json")
with urllib.request.urlopen(req, timeout=5) as resp:
body = resp.read().decode("utf-8")
return json.loads(body) if body else {}
def accept_login(challenge: str) -> str:
url = f"{ADMIN_BASE}/oauth2/auth/requests/login/accept?login_challenge={urllib.parse.quote(challenge)}"
payload = {"subject": SUBJECT, "remember": True, "remember_for": 3600}
data = _put_json(url, payload)
return data.get("redirect_to", "")
def accept_consent(challenge: str) -> str:
url = f"{ADMIN_BASE}/oauth2/auth/requests/consent/accept?consent_challenge={urllib.parse.quote(challenge)}"
payload = {"grant_scope": ["openid", "profile", "email"], "remember": True, "remember_for": 3600}
data = _put_json(url, payload)
return data.get("redirect_to", "")
def _location_from_response(url: str, cookie_header: str | None) -> str:
req = urllib.request.Request(url, method="GET")
if cookie_header:
req.add_header("Cookie", cookie_header)
opener = urllib.request.build_opener(NoRedirect())
try:
opener.open(req, timeout=5)
except urllib.error.HTTPError as err:
return err.headers.get("Location", "")
return ""
class NoRedirect(urllib.request.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl):
raise urllib.error.HTTPError(newurl, code, msg, headers, fp)
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
parsed = urllib.parse.urlparse(self.path)
params = urllib.parse.parse_qs(parsed.query)
login_challenge = (params.get("login_challenge") or [""])[0]
consent_challenge = (params.get("consent_challenge") or [""])[0]
login_verifier = (params.get("login_verifier") or [""])[0]
consent_verifier = (params.get("consent_verifier") or [""])[0]
if parsed.path == "/oauth2/auth" and consent_verifier:
query = urllib.parse.urlencode({
"consent_verifier": consent_verifier,
"client_id": (params.get("client_id") or [""])[0],
"redirect_uri": (params.get("redirect_uri") or [""])[0],
"response_type": (params.get("response_type") or [""])[0],
"scope": (params.get("scope") or [""])[0],
"state": (params.get("state") or [""])[0],
"nonce": (params.get("nonce") or [""])[0],
})
public_url = f"{PUBLIC_BASE}/oauth2/auth?{query}"
location = _location_from_response(public_url, self.headers.get("Cookie"))
print(f"consent_verifier_location={location}")
if not location:
self.send_response(400)
self.end_headers()
self.wfile.write(b"missing redirect location")
return
self.send_response(302)
self.send_header("Location", location)
self.end_headers()
return
if parsed.path == "/oauth2/auth" and login_verifier:
query = urllib.parse.urlencode({
"login_verifier": login_verifier,
"client_id": (params.get("client_id") or [""])[0],
"redirect_uri": (params.get("redirect_uri") or [""])[0],
"response_type": (params.get("response_type") or [""])[0],
"scope": (params.get("scope") or [""])[0],
"state": (params.get("state") or [""])[0],
"nonce": (params.get("nonce") or [""])[0],
})
public_url = f"{PUBLIC_BASE}/oauth2/auth?{query}"
location = _location_from_response(public_url, self.headers.get("Cookie"))
print(f"login_verifier_location={location}")
if not location:
self.send_response(400)
self.end_headers()
self.wfile.write(b"missing redirect location")
return
consent_challenge = urllib.parse.parse_qs(urllib.parse.urlparse(location).query).get(
"consent_challenge",
[""],
)[0]
if not consent_challenge:
self.send_response(400)
self.end_headers()
self.wfile.write(f"missing consent_challenge location={location}".encode("utf-8"))
return
redirect_to = accept_consent(consent_challenge)
if not redirect_to:
self.send_response(500)
self.end_headers()
self.wfile.write(b"consent accept failed")
return
self.send_response(302)
self.send_header("Location", redirect_to)
self.end_headers()
return
if login_challenge:
redirect_to = accept_login(login_challenge)
elif consent_challenge:
redirect_to = accept_consent(consent_challenge)
else:
redirect_to = ""
if not redirect_to:
self.send_response(400)
self.end_headers()
self.wfile.write(b"missing challenge")
return
self.send_response(302)
self.send_header("Location", redirect_to)
self.end_headers()
def log_message(self, format, *args):
return
class StopAtRedirect(urllib.request.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl):
if newurl.startswith(REDIRECT_URI):
raise urllib.error.HTTPError(newurl, code, msg, headers, fp)
return super().redirect_request(req, fp, code, msg, headers, newurl)
def main():
server = HTTPServer(("127.0.0.1", 3000), Handler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
encoded_redirect = urllib.parse.quote(REDIRECT_URI, safe="")
encoded_scope = urllib.parse.quote(SCOPE, safe="")
auth_url = (
f"{PUBLIC_BASE}/oauth2/auth?response_type=code"
f"&client_id={CLIENT_ID}"
f"&redirect_uri={encoded_redirect}"
f"&scope={encoded_scope}"
f"&state={STATE}"
f"&nonce={NONCE}"
)
jar = cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar), StopAtRedirect())
try:
opener.open(auth_url, timeout=10)
except urllib.error.HTTPError as err:
body = err.read().decode("utf-8") if hasattr(err, "read") else ""
print(f"error_url={err.geturl()}")
print(f"error_code={err.code}")
if body:
print(f"error_body={body}")
finally:
server.shutdown()
if __name__ == "__main__":
main()