forked from baron/baron-sso
gateway 분리 아키텍처
This commit is contained in:
188
devfront/hydra-rp-dummy.py
Normal file
188
devfront/hydra-rp-dummy.py
Normal file
@@ -0,0 +1,188 @@
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from http import cookiejar
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
|
||||
CLIENT_ID = os.environ["CLIENT_ID"]
|
||||
SUBJECT = os.environ["SUBJECT"]
|
||||
REDIRECT_URI = os.environ["REDIRECT_URI"]
|
||||
SCOPE = os.environ["SCOPE"]
|
||||
STATE = os.environ["STATE"]
|
||||
NONCE = os.environ["NONCE"]
|
||||
ADMIN_BASE = os.environ.get("HYDRA_ADMIN_URL", "http://127.0.0.1:4445")
|
||||
PUBLIC_BASE = os.environ.get("HYDRA_PUBLIC_URL", "http://127.0.0.1:4444")
|
||||
|
||||
|
||||
def _put_json(url: str, payload: dict) -> dict:
|
||||
data = json.dumps(payload).encode("utf-8")
|
||||
req = urllib.request.Request(url, data=data, method="PUT")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||||
body = resp.read().decode("utf-8")
|
||||
return json.loads(body) if body else {}
|
||||
|
||||
|
||||
def accept_login(challenge: str) -> str:
|
||||
url = f"{ADMIN_BASE}/oauth2/auth/requests/login/accept?login_challenge={urllib.parse.quote(challenge)}"
|
||||
payload = {"subject": SUBJECT, "remember": True, "remember_for": 3600}
|
||||
data = _put_json(url, payload)
|
||||
return data.get("redirect_to", "")
|
||||
|
||||
|
||||
def accept_consent(challenge: str) -> str:
|
||||
url = f"{ADMIN_BASE}/oauth2/auth/requests/consent/accept?consent_challenge={urllib.parse.quote(challenge)}"
|
||||
payload = {"grant_scope": ["openid", "profile", "email"], "remember": True, "remember_for": 3600}
|
||||
data = _put_json(url, payload)
|
||||
return data.get("redirect_to", "")
|
||||
|
||||
|
||||
def _location_from_response(url: str, cookie_header: str | None) -> str:
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
if cookie_header:
|
||||
req.add_header("Cookie", cookie_header)
|
||||
opener = urllib.request.build_opener(NoRedirect())
|
||||
try:
|
||||
opener.open(req, timeout=5)
|
||||
except urllib.error.HTTPError as err:
|
||||
return err.headers.get("Location", "")
|
||||
return ""
|
||||
|
||||
|
||||
class NoRedirect(urllib.request.HTTPRedirectHandler):
|
||||
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
||||
raise urllib.error.HTTPError(newurl, code, msg, headers, fp)
|
||||
|
||||
|
||||
class Handler(BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
parsed = urllib.parse.urlparse(self.path)
|
||||
params = urllib.parse.parse_qs(parsed.query)
|
||||
login_challenge = (params.get("login_challenge") or [""])[0]
|
||||
consent_challenge = (params.get("consent_challenge") or [""])[0]
|
||||
login_verifier = (params.get("login_verifier") or [""])[0]
|
||||
consent_verifier = (params.get("consent_verifier") or [""])[0]
|
||||
|
||||
if parsed.path == "/oauth2/auth" and consent_verifier:
|
||||
query = urllib.parse.urlencode({
|
||||
"consent_verifier": consent_verifier,
|
||||
"client_id": (params.get("client_id") or [""])[0],
|
||||
"redirect_uri": (params.get("redirect_uri") or [""])[0],
|
||||
"response_type": (params.get("response_type") or [""])[0],
|
||||
"scope": (params.get("scope") or [""])[0],
|
||||
"state": (params.get("state") or [""])[0],
|
||||
"nonce": (params.get("nonce") or [""])[0],
|
||||
})
|
||||
public_url = f"{PUBLIC_BASE}/oauth2/auth?{query}"
|
||||
location = _location_from_response(public_url, self.headers.get("Cookie"))
|
||||
print(f"consent_verifier_location={location}")
|
||||
if not location:
|
||||
self.send_response(400)
|
||||
self.end_headers()
|
||||
self.wfile.write(b"missing redirect location")
|
||||
return
|
||||
self.send_response(302)
|
||||
self.send_header("Location", location)
|
||||
self.end_headers()
|
||||
return
|
||||
|
||||
if parsed.path == "/oauth2/auth" and login_verifier:
|
||||
query = urllib.parse.urlencode({
|
||||
"login_verifier": login_verifier,
|
||||
"client_id": (params.get("client_id") or [""])[0],
|
||||
"redirect_uri": (params.get("redirect_uri") or [""])[0],
|
||||
"response_type": (params.get("response_type") or [""])[0],
|
||||
"scope": (params.get("scope") or [""])[0],
|
||||
"state": (params.get("state") or [""])[0],
|
||||
"nonce": (params.get("nonce") or [""])[0],
|
||||
})
|
||||
public_url = f"{PUBLIC_BASE}/oauth2/auth?{query}"
|
||||
location = _location_from_response(public_url, self.headers.get("Cookie"))
|
||||
print(f"login_verifier_location={location}")
|
||||
if not location:
|
||||
self.send_response(400)
|
||||
self.end_headers()
|
||||
self.wfile.write(b"missing redirect location")
|
||||
return
|
||||
consent_challenge = urllib.parse.parse_qs(urllib.parse.urlparse(location).query).get(
|
||||
"consent_challenge",
|
||||
[""],
|
||||
)[0]
|
||||
if not consent_challenge:
|
||||
self.send_response(400)
|
||||
self.end_headers()
|
||||
self.wfile.write(f"missing consent_challenge location={location}".encode("utf-8"))
|
||||
return
|
||||
redirect_to = accept_consent(consent_challenge)
|
||||
if not redirect_to:
|
||||
self.send_response(500)
|
||||
self.end_headers()
|
||||
self.wfile.write(b"consent accept failed")
|
||||
return
|
||||
self.send_response(302)
|
||||
self.send_header("Location", redirect_to)
|
||||
self.end_headers()
|
||||
return
|
||||
|
||||
if login_challenge:
|
||||
redirect_to = accept_login(login_challenge)
|
||||
elif consent_challenge:
|
||||
redirect_to = accept_consent(consent_challenge)
|
||||
else:
|
||||
redirect_to = ""
|
||||
|
||||
if not redirect_to:
|
||||
self.send_response(400)
|
||||
self.end_headers()
|
||||
self.wfile.write(b"missing challenge")
|
||||
return
|
||||
|
||||
self.send_response(302)
|
||||
self.send_header("Location", redirect_to)
|
||||
self.end_headers()
|
||||
|
||||
def log_message(self, format, *args):
|
||||
return
|
||||
|
||||
|
||||
class StopAtRedirect(urllib.request.HTTPRedirectHandler):
|
||||
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
||||
if newurl.startswith(REDIRECT_URI):
|
||||
raise urllib.error.HTTPError(newurl, code, msg, headers, fp)
|
||||
return super().redirect_request(req, fp, code, msg, headers, newurl)
|
||||
|
||||
|
||||
def main():
|
||||
server = HTTPServer(("127.0.0.1", 3000), Handler)
|
||||
thread = threading.Thread(target=server.serve_forever, daemon=True)
|
||||
thread.start()
|
||||
|
||||
encoded_redirect = urllib.parse.quote(REDIRECT_URI, safe="")
|
||||
encoded_scope = urllib.parse.quote(SCOPE, safe="")
|
||||
auth_url = (
|
||||
f"{PUBLIC_BASE}/oauth2/auth?response_type=code"
|
||||
f"&client_id={CLIENT_ID}"
|
||||
f"&redirect_uri={encoded_redirect}"
|
||||
f"&scope={encoded_scope}"
|
||||
f"&state={STATE}"
|
||||
f"&nonce={NONCE}"
|
||||
)
|
||||
|
||||
jar = cookiejar.CookieJar()
|
||||
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar), StopAtRedirect())
|
||||
try:
|
||||
opener.open(auth_url, timeout=10)
|
||||
except urllib.error.HTTPError as err:
|
||||
body = err.read().decode("utf-8") if hasattr(err, "read") else ""
|
||||
print(f"error_url={err.geturl()}")
|
||||
print(f"error_code={err.code}")
|
||||
if body:
|
||||
print(f"error_body={body}")
|
||||
finally:
|
||||
server.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user