IP기반 Seed 데이터 생성, 업데이트에 반영
This commit is contained in:
@@ -1,12 +1,23 @@
|
||||
package userprogram
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"geoip-rest/internal/geo"
|
||||
)
|
||||
|
||||
type Backend string
|
||||
|
||||
const (
|
||||
BackendMMDB Backend = "mmdb"
|
||||
BackendPostgres Backend = "postgres"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -88,6 +99,42 @@ func NewPathsFromEnv() (Paths, error) {
|
||||
return paths, nil
|
||||
}
|
||||
|
||||
func BackendFromEnv() Backend {
|
||||
val := strings.ToLower(env("GEOIP_BACKEND", string(BackendMMDB)))
|
||||
switch val {
|
||||
case string(BackendMMDB), "":
|
||||
return BackendMMDB
|
||||
case string(BackendPostgres):
|
||||
return BackendPostgres
|
||||
default:
|
||||
return BackendMMDB
|
||||
}
|
||||
}
|
||||
|
||||
func ResolveBackend(cfg geo.Config) (geo.Resolver, error) {
|
||||
switch Backend(cfg.Backend) {
|
||||
case BackendMMDB, "":
|
||||
if cfg.MMDBPath == "" {
|
||||
return nil, errors.New("MMDBPath required for mmdb backend")
|
||||
}
|
||||
return geo.NewResolver(geo.Config{
|
||||
Backend: geo.BackendMMDB,
|
||||
MMDBPath: cfg.MMDBPath,
|
||||
})
|
||||
case BackendPostgres:
|
||||
if cfg.DatabaseURL == "" {
|
||||
return nil, errors.New("DatabaseURL required for postgres backend")
|
||||
}
|
||||
return geo.NewResolver(geo.Config{
|
||||
Backend: geo.BackendPostgres,
|
||||
DatabaseURL: cfg.DatabaseURL,
|
||||
LookupQuery: cfg.LookupQuery,
|
||||
})
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported backend %s", cfg.Backend)
|
||||
}
|
||||
}
|
||||
|
||||
func ParseTargetDate(raw string) (time.Time, error) {
|
||||
if raw == "" {
|
||||
return yesterdayKST(), nil
|
||||
|
||||
192
internal/userprogram/ip_geoinfo.go
Normal file
192
internal/userprogram/ip_geoinfo.go
Normal file
@@ -0,0 +1,192 @@
|
||||
package userprogram
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
|
||||
"geoip-rest/internal/geo"
|
||||
)
|
||||
|
||||
func EnsureIPGeoInfoTable(ctx context.Context, conn *pgx.Conn, schema string) error {
|
||||
ddl := fmt.Sprintf(`
|
||||
CREATE TABLE IF NOT EXISTS %s.ip_geoinfo (
|
||||
id bigserial PRIMARY KEY,
|
||||
ip inet UNIQUE NOT NULL,
|
||||
country text,
|
||||
region text,
|
||||
city text,
|
||||
address text,
|
||||
latitude double precision,
|
||||
longitude double precision
|
||||
);`, pgx.Identifier{schema}.Sanitize())
|
||||
_, err := conn.Exec(ctx, ddl)
|
||||
return err
|
||||
}
|
||||
|
||||
// ExportPublicIPs writes distinct login_public_ip values to a CSV file with header.
|
||||
func ExportPublicIPs(ctx context.Context, conn *pgx.Conn, schema, path string) error {
|
||||
rows, err := conn.Query(ctx, fmt.Sprintf(`
|
||||
SELECT DISTINCT login_public_ip
|
||||
FROM %s.user_program_info_replica
|
||||
WHERE login_public_ip IS NOT NULL AND login_public_ip <> ''
|
||||
ORDER BY login_public_ip;`, pgx.Identifier{schema}.Sanitize()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var ips []string
|
||||
for rows.Next() {
|
||||
var ip string
|
||||
if err := rows.Scan(&ip); err != nil {
|
||||
return err
|
||||
}
|
||||
ips = append(ips, ip)
|
||||
}
|
||||
if rows.Err() != nil {
|
||||
return rows.Err()
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
||||
return err
|
||||
}
|
||||
f, err := os.Create(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if _, err := f.WriteString(`"login_public_ip"` + "\n"); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, ip := range ips {
|
||||
if _, err := f.WriteString(fmt.Sprintf(`"%s"`+"\n", ip)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GenerateIPGeoInfoSQL builds an upsert SQL file for IPs. If onlyNew is true, it skips
|
||||
// IPs already present in ip_geoinfo.
|
||||
func GenerateIPGeoInfoSQL(ctx context.Context, conn *pgx.Conn, schema string, resolver geo.Resolver, output string, onlyNew bool) (int, error) {
|
||||
query := fmt.Sprintf(`
|
||||
SELECT DISTINCT login_public_ip
|
||||
FROM %s.user_program_info_replica r
|
||||
WHERE login_public_ip IS NOT NULL AND login_public_ip <> ''`, pgx.Identifier{schema}.Sanitize())
|
||||
if onlyNew {
|
||||
query += fmt.Sprintf(`
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM %s.ip_geoinfo g WHERE g.ip = r.login_public_ip::inet
|
||||
)`, pgx.Identifier{schema}.Sanitize())
|
||||
}
|
||||
query += ";"
|
||||
|
||||
rows, err := conn.Query(ctx, query)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var ips []string
|
||||
for rows.Next() {
|
||||
var ip string
|
||||
if err := rows.Scan(&ip); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
ips = append(ips, ip)
|
||||
}
|
||||
if rows.Err() != nil {
|
||||
return 0, rows.Err()
|
||||
}
|
||||
if len(ips) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
sort.Strings(ips)
|
||||
|
||||
if err := os.MkdirAll(filepath.Dir(output), 0o755); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
f, err := os.Create(output)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
header := fmt.Sprintf("-- generated at %s KST\n", time.Now().In(kst()).Format(time.RFC3339))
|
||||
header += fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s;\n", schemaIdent(schema))
|
||||
header += fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.ip_geoinfo (
|
||||
id bigserial PRIMARY KEY,
|
||||
ip inet UNIQUE NOT NULL,
|
||||
country text,
|
||||
region text,
|
||||
city text,
|
||||
address text,
|
||||
latitude double precision,
|
||||
longitude double precision
|
||||
);`+"\n", schemaIdent(schema))
|
||||
if _, err := f.WriteString(header); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
count := 0
|
||||
for _, ip := range ips {
|
||||
loc, err := resolver.Lookup(ip)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
stmt := fmt.Sprintf(`INSERT INTO %s.ip_geoinfo (ip, country, region, city, address, latitude, longitude)
|
||||
VALUES ('%s', %s, %s, %s, %s, %f, %f)
|
||||
ON CONFLICT (ip) DO UPDATE SET
|
||||
country = EXCLUDED.country,
|
||||
region = EXCLUDED.region,
|
||||
city = EXCLUDED.city,
|
||||
address = EXCLUDED.address,
|
||||
latitude = EXCLUDED.latitude,
|
||||
longitude = EXCLUDED.longitude;
|
||||
`, schemaIdent(schema), toHostCIDR(ip), quote(loc.Country), quote(loc.Region), quote(loc.City), quote(loc.Address), loc.Latitude, loc.Longitude)
|
||||
if _, err := f.WriteString(stmt); err != nil {
|
||||
return count, err
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func ExecuteSQLFile(ctx context.Context, conn *pgx.Conn, path string) error {
|
||||
content, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = conn.Exec(ctx, string(content))
|
||||
return err
|
||||
}
|
||||
|
||||
func toHostCIDR(ipStr string) string {
|
||||
ip := net.ParseIP(ipStr)
|
||||
if ip == nil {
|
||||
return ""
|
||||
}
|
||||
if ip.To4() != nil {
|
||||
return ip.String() + "/32"
|
||||
}
|
||||
return ip.String() + "/128"
|
||||
}
|
||||
|
||||
func quote(s string) string {
|
||||
return fmt.Sprintf("'%s'", strings.ReplaceAll(s, "'", "''"))
|
||||
}
|
||||
|
||||
func schemaIdent(s string) string {
|
||||
return `"` + strings.ReplaceAll(s, `"`, `""`) + `"`
|
||||
}
|
||||
@@ -4,16 +4,23 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
|
||||
"geoip-rest/internal/geo"
|
||||
"geoip-rest/internal/importer"
|
||||
)
|
||||
|
||||
const defaultMMDBPath = "/initial_data/GeoLite2-City.mmdb"
|
||||
|
||||
type SyncConfig struct {
|
||||
MySQL MySQLConfig
|
||||
DatabaseURL string
|
||||
Backend Backend
|
||||
MMDBPath string
|
||||
LookupQuery string
|
||||
InitialCSV string
|
||||
UpdateDir string
|
||||
LogDir string
|
||||
@@ -34,6 +41,9 @@ func (c *SyncConfig) defaults() {
|
||||
if c.Schema == "" {
|
||||
c.Schema = DefaultSchema
|
||||
}
|
||||
if c.MMDBPath == "" {
|
||||
c.MMDBPath = defaultMMDBPath
|
||||
}
|
||||
if c.Logger == nil {
|
||||
c.Logger = log.Default()
|
||||
}
|
||||
@@ -90,6 +100,10 @@ func Sync(ctx context.Context, cfg SyncConfig) error {
|
||||
return fmt.Errorf("import updates: %w", err)
|
||||
}
|
||||
|
||||
if err := ensureIPGeoInfo(ctx, cfg, conn); err != nil {
|
||||
cfg.Logger.Printf("ip_geoinfo update warning: %v", err)
|
||||
}
|
||||
|
||||
cfg.Logger.Printf("sync complete (last_id=%d -> %d)", lastID, upperID)
|
||||
|
||||
if err := verifyCounts(ctx, cfg, dumper, conn, upperID); err != nil {
|
||||
@@ -116,3 +130,39 @@ func verifyCounts(ctx context.Context, cfg SyncConfig, dumper *Dumper, conn *pgx
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ensureIPGeoInfo(ctx context.Context, cfg SyncConfig, conn *pgx.Conn) error {
|
||||
if err := EnsureIPGeoInfoTable(ctx, conn, cfg.Schema); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ts := time.Now().In(kst()).Format("20060102-150405")
|
||||
ipListPath := filepath.Join(cfg.UpdateDir, fmt.Sprintf("public_ip_list_%s.csv", ts))
|
||||
if err := ExportPublicIPs(ctx, conn, cfg.Schema, ipListPath); err != nil {
|
||||
return fmt.Errorf("export public ip list: %w", err)
|
||||
}
|
||||
|
||||
resolver, err := ResolveBackend(geo.Config{
|
||||
Backend: geo.Backend(cfg.Backend),
|
||||
MMDBPath: cfg.MMDBPath,
|
||||
DatabaseURL: cfg.DatabaseURL,
|
||||
LookupQuery: cfg.LookupQuery,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("init resolver for ip_geoinfo: %w", err)
|
||||
}
|
||||
defer resolver.Close()
|
||||
|
||||
sqlPath := filepath.Join(cfg.UpdateDir, fmt.Sprintf("ip_geoinfo_update-%s.sql", ts))
|
||||
count, err := GenerateIPGeoInfoSQL(ctx, conn, cfg.Schema, resolver, sqlPath, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("generate ip_geoinfo sql: %w", err)
|
||||
}
|
||||
if count == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := ExecuteSQLFile(ctx, conn, sqlPath); err != nil {
|
||||
return fmt.Errorf("execute ip_geoinfo sql: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user