1
0
forked from baron/baron-sso
Files
baron-sso/backend/internal/repository/clickhouse_repo.go

127 lines
2.8 KiB
Go

package repository
import (
"baron-sso-backend/internal/domain"
"context"
"fmt"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)
type ClickHouseRepository struct {
conn driver.Conn
}
func NewClickHouseRepository(host string, port int, user, password, db string) (*ClickHouseRepository, error) {
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", host, port)},
Auth: clickhouse.Auth{
Database: db,
Username: user,
Password: password,
},
Debug: false,
})
if err != nil {
return nil, fmt.Errorf("failed to open clickhouse connection: %w", err)
}
if err := conn.Ping(context.Background()); err != nil {
return nil, fmt.Errorf("failed to ping clickhouse: %w", err)
}
// Ensure Table Exists
// Note: In production, use migrations.
query := `
CREATE TABLE IF NOT EXISTS audit_logs (
timestamp DateTime DEFAULT now(),
user_id String,
event_type String,
status String,
ip_address String,
user_agent String,
device_id String,
details String
) ENGINE = MergeTree()
ORDER BY timestamp
`
if err := conn.Exec(context.Background(), query); err != nil {
return nil, fmt.Errorf("failed to create table: %w", err)
}
return &ClickHouseRepository{conn: conn}, nil
}
func (r *ClickHouseRepository) Create(log *domain.AuditLog) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if log.Timestamp.IsZero() {
log.Timestamp = time.Now()
}
query := `
INSERT INTO audit_logs (timestamp, user_id, event_type, status, ip_address, user_agent, device_id, details)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`
return r.conn.Exec(ctx, query,
log.Timestamp,
log.UserID,
log.EventType,
log.Status,
log.IPAddress,
log.UserAgent,
log.DeviceID,
log.Details,
)
}
func (r *ClickHouseRepository) FindAll(ctx context.Context, limit, offset int) ([]domain.AuditLog, error) {
if limit <= 0 {
limit = 50
}
if offset < 0 {
offset = 0
}
query := `
SELECT timestamp, user_id, event_type, status, ip_address, user_agent, device_id, details
FROM audit_logs
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
`
rows, err := r.conn.Query(ctx, query, limit, offset)
if err != nil {
return nil, fmt.Errorf("failed to query audit logs: %w", err)
}
defer rows.Close()
var logs []domain.AuditLog
for rows.Next() {
var log domain.AuditLog
if err := rows.Scan(
&log.Timestamp,
&log.UserID,
&log.EventType,
&log.Status,
&log.IPAddress,
&log.UserAgent,
&log.DeviceID,
&log.Details,
); err != nil {
return nil, fmt.Errorf("failed to scan audit log: %w", err)
}
logs = append(logs, log)
}
return logs, nil
}
func (r *ClickHouseRepository) Ping(ctx context.Context) error {
if r.conn == nil {
return fmt.Errorf("clickhouse connection is nil")
}
return r.conn.Ping(ctx)
}