forked from baron/baron-sso
107 lines
2.7 KiB
Go
107 lines
2.7 KiB
Go
package repository
|
|
|
|
import (
|
|
"baron-sso-backend/internal/domain"
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2"
|
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
|
)
|
|
|
|
type OathkeeperClickHouseRepository struct {
|
|
conn driver.Conn
|
|
}
|
|
|
|
func NewOathkeeperClickHouseRepository(host string, port int, user, password, db string) (*OathkeeperClickHouseRepository, error) {
|
|
conn, err := clickhouse.Open(&clickhouse.Options{
|
|
Addr: []string{fmt.Sprintf("%s:%d", host, port)},
|
|
Auth: clickhouse.Auth{
|
|
Database: db,
|
|
Username: user,
|
|
Password: password,
|
|
},
|
|
Debug: false,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open ory clickhouse connection: %w", err)
|
|
}
|
|
if err := conn.Ping(context.Background()); err != nil {
|
|
return nil, fmt.Errorf("failed to ping ory clickhouse: %w", err)
|
|
}
|
|
return &OathkeeperClickHouseRepository{conn: conn}, nil
|
|
}
|
|
|
|
func (r *OathkeeperClickHouseRepository) FindPageBySubject(ctx context.Context, subject string, limit int, cursor *domain.AuditCursor) ([]domain.OathkeeperAccessLog, error) {
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
query := `
|
|
SELECT timestamp, request_id, method, path, status, latency_ms, rp, action, target, subject, client_ip, user_agent, decision, trace_id, span_id, raw
|
|
FROM oathkeeper_access_logs
|
|
`
|
|
args := make([]any, 0, 5)
|
|
if subject != "" {
|
|
query += `
|
|
WHERE subject = ?
|
|
`
|
|
args = append(args, subject)
|
|
if cursor != nil {
|
|
query += `
|
|
AND ((timestamp < ?) OR (timestamp = ? AND request_id < ?))
|
|
`
|
|
args = append(args, cursor.Timestamp, cursor.Timestamp, cursor.EventID)
|
|
}
|
|
} else if cursor != nil {
|
|
query += `
|
|
WHERE (timestamp < ?) OR (timestamp = ? AND request_id < ?)
|
|
`
|
|
args = append(args, cursor.Timestamp, cursor.Timestamp, cursor.EventID)
|
|
}
|
|
query += `
|
|
ORDER BY timestamp DESC, request_id DESC
|
|
LIMIT ?
|
|
`
|
|
args = append(args, limit)
|
|
|
|
rows, err := r.conn.Query(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query oathkeeper logs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var logs []domain.OathkeeperAccessLog
|
|
for rows.Next() {
|
|
var log domain.OathkeeperAccessLog
|
|
if err := rows.Scan(
|
|
&log.Timestamp,
|
|
&log.RequestID,
|
|
&log.Method,
|
|
&log.Path,
|
|
&log.Status,
|
|
&log.LatencyMs,
|
|
&log.RP,
|
|
&log.Action,
|
|
&log.Target,
|
|
&log.Subject,
|
|
&log.ClientIP,
|
|
&log.UserAgent,
|
|
&log.Decision,
|
|
&log.TraceID,
|
|
&log.SpanID,
|
|
&log.Raw,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("failed to scan oathkeeper log: %w", err)
|
|
}
|
|
logs = append(logs, log)
|
|
}
|
|
return logs, nil
|
|
}
|
|
|
|
func (r *OathkeeperClickHouseRepository) Ping(ctx context.Context) error {
|
|
if r == nil || r.conn == nil {
|
|
return fmt.Errorf("ory clickhouse connection is nil")
|
|
}
|
|
return r.conn.Ping(ctx)
|
|
}
|