diff --git a/Dockerfile b/Dockerfile index e801839..9df13a2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,17 +18,17 @@ FROM debian:trixie-slim RUN useradd --create-home --shell /usr/sbin/nologin appuser -WORKDIR /app +WORKDIR / COPY --from=builder /bin/geoip /usr/local/bin/geoip COPY --from=builder /bin/geoip-loader /usr/local/bin/geoip-loader COPY --from=builder /bin/user-program-import /usr/local/bin/user-program-import COPY --from=builder /bin/user-program-dump /usr/local/bin/user-program-dump COPY --from=builder /bin/user-program-sync /usr/local/bin/user-program-sync -COPY initial_data /app/initial_data -RUN mkdir -p /app/update_data +COPY initial_data /initial_data +RUN mkdir -p /update_data /log -ENV GEOIP_DB_PATH=/app/initial_data/GeoLite2-City.mmdb +ENV GEOIP_DB_PATH=/initial_data/GeoLite2-City.mmdb USER appuser EXPOSE 8080 diff --git a/cmd/user_program_dump/main.go b/cmd/user_program_dump/main.go index eb60a2d..fd50a60 100644 --- a/cmd/user_program_dump/main.go +++ b/cmd/user_program_dump/main.go @@ -2,294 +2,62 @@ package main import ( "context" - "database/sql" - "encoding/csv" - "fmt" "log" "os" - "path/filepath" - "regexp" "strconv" "time" - "github.com/go-sql-driver/mysql" + "geoip-rest/internal/userprogram" ) -const ( - defaultUpdateDir = "./update_data" - defaultTable = "user_program_info" - defaultDB = "user_program_info" - defaultDumpTimeout = 5 * time.Minute -) - -type config struct { - host string - port int - user string - password string - database string - table string - updateDir string - target time.Time -} +const defaultDumpTimeout = 5 * time.Minute func main() { logger := log.New(os.Stdout, "[dump] ", log.LstdFlags) - cfg, err := loadConfig() + mysqlCfg, err := userprogram.NewMySQLConfigFromEnv() if err != nil { log.Fatalf("config error: %v", err) } + updateDir := userprogram.DefaultUpdateDir + if val := os.Getenv("USER_PROGRAM_UPDATE_DIR"); val != "" { + updateDir = val + } + target, err := userprogram.ParseTargetDate(os.Getenv("USER_PROGRAM_TARGET_DATE")) + if err != nil { + log.Fatalf("target date error: %v", err) + } + startID := int64(0) + if val := os.Getenv("USER_PROGRAM_START_ID"); val != "" { + parsed, parseErr := strconv.ParseInt(val, 10, 64) + if parseErr != nil { + log.Fatalf("invalid USER_PROGRAM_START_ID: %v", parseErr) + } + startID = parsed + } ctx, cancel := context.WithTimeout(context.Background(), defaultDumpTimeout) defer cancel() - dsn := (&mysql.Config{ - User: cfg.user, - Passwd: cfg.password, - Net: "tcp", - Addr: netAddr(cfg.host, cfg.port), - DBName: cfg.database, - Params: map[string]string{"parseTime": "true", "loc": "UTC", "charset": "utf8mb4"}, - AllowNativePasswords: true, - }).FormatDSN() - - db, err := sql.Open("mysql", dsn) + dumper, err := userprogram.NewDumper(mysqlCfg, updateDir) if err != nil { - log.Fatalf("failed to open mysql connection: %v", err) + log.Fatalf("init dumper failed: %v", err) } - defer db.Close() + defer dumper.Close() - if err := db.PingContext(ctx); err != nil { - log.Fatalf("failed to ping mysql: %v", err) + endID, err := dumper.MaxIDUntil(ctx, target) + if err != nil { + log.Fatalf("determine end id failed: %v", err) + } + if endID <= startID { + logger.Printf("no rows to dump (start_id=%d end_id=%d)", startID, endID) + return } - if _, err := db.ExecContext(ctx, "SET time_zone = '+00:00'"); err != nil { - log.Fatalf("failed to set timezone: %v", err) - } - - outPath, err := dumpToCSV(ctx, db, cfg) + outPath, err := dumper.DumpRange(ctx, startID, endID, target) if err != nil { log.Fatalf("dump failed: %v", err) } - logger.Printf("dumped %s to %s", cfg.target.Format("2006-01-02"), outPath) -} - -func loadConfig() (config, error) { - port, err := strconv.Atoi(env("USER_PROGRAM_INFO_PORT", "3306")) - if err != nil { - return config{}, fmt.Errorf("invalid USER_PROGRAM_INFO_PORT: %w", err) - } - - target, err := resolveTargetDate(env("USER_PROGRAM_TARGET_DATE", "")) - if err != nil { - return config{}, err - } - - table := env("USER_PROGRAM_INFO_TABLE", defaultTable) - if !regexp.MustCompile(`^[a-zA-Z0-9_]+$`).MatchString(table) { - return config{}, fmt.Errorf("invalid table name: %s", table) - } - - updateDir := env("USER_PROGRAM_UPDATE_DIR", defaultUpdateDir) - if updateDir == "" { - updateDir = defaultUpdateDir - } - if err := os.MkdirAll(updateDir, 0o755); err != nil { - return config{}, fmt.Errorf("creating update dir: %w", err) - } - - return config{ - host: envRequired("USER_PROGRAM_INFO_HOST"), - port: port, - user: envRequired("USER_PROGRAM_INFO_USERNAME"), - password: envRequired("USER_PROGRAM_INFO_PASSWORD"), - database: env("USER_PROGRAM_INFO_DB", defaultDB), - table: table, - updateDir: updateDir, - target: target, - }, nil -} - -func dumpToCSV(ctx context.Context, db *sql.DB, cfg config) (string, error) { - query := fmt.Sprintf(` -SELECT - id, - product_name, - login_id, - user_employee_id, - login_version, - login_public_ip, - login_local_ip, - user_company, - user_department, - user_position, - user_login_time, - created_at, - user_family_flag -FROM %s -WHERE DATE(CONVERT_TZ(created_at, '+00:00', '+09:00')) = ?;`, cfg.table) - - rows, err := db.QueryContext(ctx, query, cfg.target.Format("2006-01-02")) - if err != nil { - return "", err - } - defer rows.Close() - - filename := fmt.Sprintf("user_program_info_%s.csv", cfg.target.Format("20060102")) - outPath := filepath.Join(cfg.updateDir, filename) - tmpPath := outPath + ".tmp" - - f, err := os.Create(tmpPath) - if err != nil { - return "", err - } - defer f.Close() - - writer := csv.NewWriter(f) - defer writer.Flush() - - header := []string{ - "id", - "product_name", - "login_id", - "user_employee_id", - "login_version", - "login_public_ip", - "login_local_ip", - "user_company", - "user_department", - "user_position", - "user_login_time", - "created_at", - "user_family_flag", - } - if err := writer.Write(header); err != nil { - return "", err - } - - for rows.Next() { - record, err := scanRow(rows) - if err != nil { - return "", err - } - if err := writer.Write(record); err != nil { - return "", err - } - } - if err := rows.Err(); err != nil { - return "", err - } - writer.Flush() - if err := writer.Error(); err != nil { - return "", err - } - - if err := os.Rename(tmpPath, outPath); err != nil { - return "", err - } - return outPath, nil -} - -func scanRow(rows *sql.Rows) ([]string, error) { - var ( - id sql.NullInt64 - productName sql.NullString - loginID sql.NullString - employeeID sql.NullString - loginVersion sql.NullString - loginPublicIP sql.NullString - loginLocalIP sql.NullString - userCompany sql.NullString - userDepartment sql.NullString - userPosition sql.NullString - userLoginTime sql.NullString - createdAt sql.NullString - userFamilyFlag sql.NullString - ) - - if err := rows.Scan( - &id, - &productName, - &loginID, - &employeeID, - &loginVersion, - &loginPublicIP, - &loginLocalIP, - &userCompany, - &userDepartment, - &userPosition, - &userLoginTime, - &createdAt, - &userFamilyFlag, - ); err != nil { - return nil, err - } - if !id.Valid { - return nil, fmt.Errorf("row missing id") - } - - return []string{ - strconv.FormatInt(id.Int64, 10), - nullToString(productName), - nullToString(loginID), - nullToString(employeeID), - nullToString(loginVersion), - nullToString(loginPublicIP), - nullToString(loginLocalIP), - nullToString(userCompany), - nullToString(userDepartment), - nullToString(userPosition), - nullToString(userLoginTime), - nullToString(createdAt), - nullToString(userFamilyFlag), - }, nil -} - -func resolveTargetDate(raw string) (time.Time, error) { - if raw == "" { - now := time.Now().In(kst()) - yesterday := now.AddDate(0, 0, -1) - return time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, kst()), nil - } - t, err := time.ParseInLocation("2006-01-02", raw, kst()) - if err != nil { - return time.Time{}, fmt.Errorf("invalid USER_PROGRAM_TARGET_DATE: %w", err) - } - return t, nil -} - -func kst() *time.Location { - loc, err := time.LoadLocation("Asia/Seoul") - if err != nil { - return time.FixedZone("KST", 9*60*60) - } - return loc -} - -func env(key, fallback string) string { - if v := os.Getenv(key); v != "" { - return v - } - return fallback -} - -func envRequired(key string) string { - v := os.Getenv(key) - if v == "" { - log.Fatalf("%s is required", key) - } - return v -} - -func netAddr(host string, port int) string { - return fmt.Sprintf("%s:%d", host, port) -} - -func nullToString(v sql.NullString) string { - if v.Valid { - return v.String - } - return "" + logger.Printf("dumped ids (%d, %d] to %s", startID, endID, outPath) } diff --git a/cmd/user_program_sync/main.go b/cmd/user_program_sync/main.go index bcdc509..984d4d9 100644 --- a/cmd/user_program_sync/main.go +++ b/cmd/user_program_sync/main.go @@ -4,72 +4,42 @@ import ( "context" "log" "os" - "os/exec" - "path/filepath" "time" + + "geoip-rest/internal/userprogram" ) -const ( - defaultUpdateDir = "/app/update_data" - defaultLogDir = "/app/log" - defaultSchema = "public" - defaultTimeout = 15 * time.Minute -) +const defaultTimeout = 30 * time.Minute func main() { logger := log.New(os.Stdout, "[sync] ", log.LstdFlags) + dbURL := os.Getenv("DATABASE_URL") + if dbURL == "" { + logger.Fatal("DATABASE_URL is required") + } + + mysqlCfg, err := userprogram.NewMySQLConfigFromEnv() + if err != nil { + logger.Fatalf("mysql config: %v", err) + } + paths, err := userprogram.NewPathsFromEnv() + if err != nil { + logger.Fatalf("paths config: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) defer cancel() - updateDir := env("USER_PROGRAM_UPDATE_DIR", defaultUpdateDir) - logDir := env("USER_PROGRAM_IMPORT_LOG_DIR", defaultLogDir) - schema := env("USER_PROGRAM_INFO_SCHEMA", defaultSchema) - - ensureDir(updateDir, logger) - ensureDir(logDir, logger) - - if err := runCmd(ctx, logger, "user-program-dump", map[string]string{ - "USER_PROGRAM_UPDATE_DIR": updateDir, + if err := userprogram.Sync(ctx, userprogram.SyncConfig{ + MySQL: mysqlCfg, + DatabaseURL: dbURL, + InitialCSV: paths.InitialCSV, + UpdateDir: paths.UpdateDir, + LogDir: paths.LogDir, + Schema: paths.Schema, + Logger: logger, }); err != nil { - logger.Fatalf("dump failed: %v", err) - } - - if err := runCmd(ctx, logger, "user-program-import", map[string]string{ - "USER_PROGRAM_UPDATE_DIR": updateDir, - "USER_PROGRAM_IMPORT_LOG_DIR": logDir, - "USER_PROGRAM_INFO_SCHEMA": schema, - }); err != nil { - logger.Fatalf("import failed: %v", err) - } - - logger.Printf("sync completed (update_dir=%s, log_dir=%s, schema=%s)", updateDir, logDir, schema) -} - -func runCmd(ctx context.Context, logger *log.Logger, command string, extraEnv map[string]string) error { - cmd := exec.CommandContext(ctx, command) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Env = os.Environ() - for k, v := range extraEnv { - if v == "" { - continue - } - cmd.Env = append(cmd.Env, k+"="+v) - } - logger.Printf("running %s", filepath.Base(command)) - return cmd.Run() -} - -func ensureDir(path string, logger *log.Logger) { - if err := os.MkdirAll(path, 0o755); err != nil { - logger.Fatalf("failed to create dir %s: %v", path, err) + logger.Fatalf("sync failed: %v", err) } } - -func env(key, fallback string) string { - if v := os.Getenv(key); v != "" { - return v - } - return fallback -} diff --git a/docker-compose.yml b/docker-compose.yml index 17a2c50..28d519a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,7 +10,7 @@ services: - "${SERVICE_PORT:-8080}:8080" environment: - SERVICE_PORT=${SERVICE_PORT:-8080} - - GEOIP_DB_PATH=${GEOIP_DB_PATH:-/app/initial_data/GeoLite2-City.mmdb} + - GEOIP_DB_PATH=${GEOIP_DB_PATH:-/initial_data/GeoLite2-City.mmdb} - GEOIP_BACKEND=${GEOIP_BACKEND:-mmdb} - GEOIP_LOADER_TIMEOUT=${GEOIP_LOADER_TIMEOUT:-30m} - DATABASE_URL=postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST:-db}:${POSTGRES_PORT:-5432}/${POSTGRES_DB}?sslmode=disable @@ -26,9 +26,9 @@ services: exec geoip ' volumes: - - ./initial_data:/app/initial_data:ro - - ./update_data:/app/update_data - - ./log:/app/log + - ./initial_data:/initial_data:ro + - ./update_data:/update_data + - ./log:/log networks: - geo-ip diff --git a/internal/importer/helpers.go b/internal/importer/helpers.go new file mode 100644 index 0000000..cc28951 --- /dev/null +++ b/internal/importer/helpers.go @@ -0,0 +1,35 @@ +package importer + +import ( + "context" + "database/sql" + "fmt" + + "github.com/jackc/pgx/v5" +) + +// LatestID returns the maximum id in the replica table. +func LatestID(ctx context.Context, conn *pgx.Conn, schema, table string) (int64, error) { + var id sql.NullInt64 + query := fmt.Sprintf("SELECT MAX(id) FROM %s", pgx.Identifier{schema, table}.Sanitize()) + if err := conn.QueryRow(ctx, query).Scan(&id); err != nil { + return 0, err + } + if !id.Valid { + return 0, nil + } + return id.Int64, nil +} + +// CountUpToID returns the number of rows with id <= maxID. +func CountUpToID(ctx context.Context, conn *pgx.Conn, schema, table string, maxID int64) (int64, error) { + var count sql.NullInt64 + query := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE id <= $1", pgx.Identifier{schema, table}.Sanitize()) + if err := conn.QueryRow(ctx, query, maxID).Scan(&count); err != nil { + return 0, err + } + if !count.Valid { + return 0, nil + } + return count.Int64, nil +} diff --git a/internal/importer/user_program_info.go b/internal/importer/user_program_info.go index 32de762..14cc32e 100644 --- a/internal/importer/user_program_info.go +++ b/internal/importer/user_program_info.go @@ -20,7 +20,7 @@ import ( const ( defaultSchema = "public" - replicaTable = "user_program_info_replica" + ReplicaTable = "user_program_info_replica" ) var ( @@ -63,7 +63,7 @@ func EnsureUserProgramReplica(ctx context.Context, conn *pgx.Conn, csvPath, sche logDir = "log" } - if err := createReplicaTable(ctx, conn, schema, replicaTable); err != nil { + if err := createReplicaTable(ctx, conn, schema, ReplicaTable); err != nil { return err } @@ -97,38 +97,10 @@ func ImportUserProgramUpdates(ctx context.Context, conn *pgx.Conn, updateDir, sc return nil } - latestDate, err := latestCreatedDate(ctx, conn, schema, replicaTable) - if err != nil { - return err - } - for _, file := range files { - fileDate, err := dateFromFilename(file) - if err != nil { - _ = writeImportLog(logDir, importLog{ - StartedAt: time.Now(), - CSVPath: file, - Status: "skipped", - Error: fmt.Sprintf("cannot parse date from filename: %v", err), - LatestDate: latestDate, - }) - continue - } - if !fileDate.After(latestDate) { - _ = writeImportLog(logDir, importLog{ - StartedAt: time.Now(), - CSVPath: file, - Status: "skipped", - Error: fmt.Sprintf("file date %s not after latest date %s", fileDate.Format("2006-01-02"), latestDate.Format("2006-01-02")), - LatestDate: latestDate, - }) - continue - } - if err := importSingle(ctx, conn, file, schema, logDir); err != nil { return err } - latestDate = fileDate } return nil } @@ -362,7 +334,7 @@ func nullOrString(val string) any { func importSingle(ctx context.Context, conn *pgx.Conn, csvPath, schema, logDir string) error { startedAt := time.Now() - res, err := copyAndUpsertCSV(ctx, conn, csvPath, schema, replicaTable) + res, err := copyAndUpsertCSV(ctx, conn, csvPath, schema, ReplicaTable) logStatus := "succeeded" logErrMsg := "" if err != nil { @@ -479,7 +451,7 @@ func quoteColumns(cols []string) []string { return out } -func latestCreatedDate(ctx context.Context, conn *pgx.Conn, schema, table string) (time.Time, error) { +func LatestCreatedDate(ctx context.Context, conn *pgx.Conn, schema, table string) (time.Time, error) { var ts sql.NullTime query := fmt.Sprintf("SELECT MAX(created_at) FROM %s", pgx.Identifier{schema, table}.Sanitize()) if err := conn.QueryRow(ctx, query).Scan(&ts); err != nil { diff --git a/internal/userprogram/config.go b/internal/userprogram/config.go new file mode 100644 index 0000000..43311bc --- /dev/null +++ b/internal/userprogram/config.go @@ -0,0 +1,138 @@ +package userprogram + +import ( + "fmt" + "os" + "path/filepath" + "regexp" + "strconv" + "time" +) + +const ( + DefaultUpdateDir = "/update_data" + DefaultLogDir = "/log" + DefaultSchema = "public" + DefaultInitialCSV = "/initial_data/user_program_info_init_20251208.csv" + DefaultTable = "user_program_info" + DefaultDatabase = "user_program_info" + defaultTargetRange = "20060102" +) + +type MySQLConfig struct { + Host string + Port int + User string + Password string + Database string + Table string +} + +type Paths struct { + UpdateDir string + LogDir string + InitialCSV string + Schema string +} + +func NewMySQLConfigFromEnv() (MySQLConfig, error) { + port, err := strconv.Atoi(env("USER_PROGRAM_INFO_PORT", "3306")) + if err != nil { + return MySQLConfig{}, fmt.Errorf("invalid USER_PROGRAM_INFO_PORT: %w", err) + } + + host, err := envRequiredValue("USER_PROGRAM_INFO_HOST") + if err != nil { + return MySQLConfig{}, err + } + user, err := envRequiredValue("USER_PROGRAM_INFO_USERNAME") + if err != nil { + return MySQLConfig{}, err + } + password, err := envRequiredValue("USER_PROGRAM_INFO_PASSWORD") + if err != nil { + return MySQLConfig{}, err + } + + cfg := MySQLConfig{ + Host: host, + Port: port, + User: user, + Password: password, + Database: env("USER_PROGRAM_INFO_DB", DefaultDatabase), + Table: env("USER_PROGRAM_INFO_TABLE", DefaultTable), + } + if cfg.Host == "" || cfg.User == "" || cfg.Password == "" { + return MySQLConfig{}, fmt.Errorf("mysql connection envs are required") + } + return cfg, nil +} + +func NewPathsFromEnv() (Paths, error) { + paths := Paths{ + UpdateDir: env("USER_PROGRAM_UPDATE_DIR", DefaultUpdateDir), + LogDir: env("USER_PROGRAM_IMPORT_LOG_DIR", DefaultLogDir), + InitialCSV: env("USER_PROGRAM_INFO_CSV", DefaultInitialCSV), + Schema: env("USER_PROGRAM_INFO_SCHEMA", DefaultSchema), + } + + for _, dir := range []string{paths.UpdateDir, paths.LogDir} { + if dir == "" { + continue + } + if err := os.MkdirAll(dir, 0o755); err != nil { + return Paths{}, fmt.Errorf("create dir %s: %w", dir, err) + } + } + return paths, nil +} + +func ParseTargetDate(raw string) (time.Time, error) { + if raw == "" { + return yesterdayKST(), nil + } + t, err := time.ParseInLocation("2006-01-02", raw, kst()) + if err != nil { + return time.Time{}, fmt.Errorf("invalid date %q (expected YYYY-MM-DD)", raw) + } + return t, nil +} + +func DateFromFilename(path string) (time.Time, error) { + base := filepath.Base(path) + re := regexp.MustCompile(`(\d{8})`) + match := re.FindStringSubmatch(base) + if len(match) < 2 { + return time.Time{}, fmt.Errorf("no date in filename: %s", base) + } + return time.ParseInLocation(defaultTargetRange, match[1], kst()) +} + +func yesterdayKST() time.Time { + now := time.Now().In(kst()) + yesterday := now.AddDate(0, 0, -1) + return time.Date(yesterday.Year(), yesterday.Month(), yesterday.Day(), 0, 0, 0, 0, kst()) +} + +func kst() *time.Location { + loc, err := time.LoadLocation("Asia/Seoul") + if err != nil { + return time.FixedZone("KST", 9*60*60) + } + return loc +} + +func env(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func envRequiredValue(key string) (string, error) { + v := os.Getenv(key) + if v == "" { + return "", fmt.Errorf("%s is required", key) + } + return v, nil +} diff --git a/internal/userprogram/dumper.go b/internal/userprogram/dumper.go new file mode 100644 index 0000000..4971f51 --- /dev/null +++ b/internal/userprogram/dumper.go @@ -0,0 +1,243 @@ +package userprogram + +import ( + "context" + "database/sql" + "encoding/csv" + "fmt" + "os" + "path/filepath" + "strconv" + "time" + + "github.com/go-sql-driver/mysql" +) + +type Dumper struct { + cfg MySQLConfig + updateDir string + db *sql.DB +} + +func NewDumper(cfg MySQLConfig, updateDir string) (*Dumper, error) { + if updateDir == "" { + updateDir = DefaultUpdateDir + } + if err := os.MkdirAll(updateDir, 0o755); err != nil { + return nil, err + } + + dsn := (&mysql.Config{ + User: cfg.User, + Passwd: cfg.Password, + Net: "tcp", + Addr: netAddr(cfg.Host, cfg.Port), + DBName: cfg.Database, + Params: map[string]string{"parseTime": "true", "loc": "UTC", "charset": "utf8mb4"}, + AllowNativePasswords: true, + }).FormatDSN() + + db, err := sql.Open("mysql", dsn) + if err != nil { + return nil, fmt.Errorf("open mysql: %w", err) + } + db.SetMaxOpenConns(5) + db.SetMaxIdleConns(2) + db.SetConnMaxIdleTime(5 * time.Minute) + + if _, err := db.Exec("SET time_zone = '+00:00'"); err != nil { + _ = db.Close() + return nil, fmt.Errorf("set timezone: %w", err) + } + + return &Dumper{ + cfg: cfg, + updateDir: updateDir, + db: db, + }, nil +} + +func (d *Dumper) Close() error { + if d.db == nil { + return nil + } + return d.db.Close() +} + +// MaxIDUntil returns the maximum id with created_at up to and including cutoff (KST). +func (d *Dumper) MaxIDUntil(ctx context.Context, cutoff time.Time) (int64, error) { + query := fmt.Sprintf(`SELECT COALESCE(MAX(id), 0) FROM %s WHERE DATE(CONVERT_TZ(created_at, '+00:00', '+09:00')) <= ?`, d.cfg.Table) + var maxID sql.NullInt64 + if err := d.db.QueryRowContext(ctx, query, cutoff.In(kst()).Format("2006-01-02")).Scan(&maxID); err != nil { + return 0, err + } + if !maxID.Valid { + return 0, nil + } + return maxID.Int64, nil +} + +// CountUpToID returns count(*) where id <= maxID in source. +func (d *Dumper) CountUpToID(ctx context.Context, maxID int64) (int64, error) { + query := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE id <= ?`, d.cfg.Table) + var count sql.NullInt64 + if err := d.db.QueryRowContext(ctx, query, maxID).Scan(&count); err != nil { + return 0, err + } + if !count.Valid { + return 0, nil + } + return count.Int64, nil +} + +// DumpRange exports rows with id in (startID, endID] to a CSV file. +func (d *Dumper) DumpRange(ctx context.Context, startID, endID int64, label time.Time) (string, error) { + if endID <= startID { + return "", nil + } + + query := fmt.Sprintf(` +SELECT + id, + product_name, + login_id, + user_employee_id, + login_version, + login_public_ip, + login_local_ip, + user_company, + user_department, + user_position, + user_login_time, + created_at, + user_family_flag +FROM %s +WHERE id > ? AND id <= ? +ORDER BY id;`, d.cfg.Table) + + rows, err := d.db.QueryContext(ctx, query, startID, endID) + if err != nil { + return "", fmt.Errorf("query: %w", err) + } + defer rows.Close() + + filename := fmt.Sprintf("user_program_info_%s.csv", label.In(kst()).Format(defaultTargetRange)) + outPath := filepath.Join(d.updateDir, filename) + tmpPath := outPath + ".tmp" + + f, err := os.Create(tmpPath) + if err != nil { + return "", err + } + defer f.Close() + + writer := csv.NewWriter(f) + defer writer.Flush() + + header := []string{ + "id", + "product_name", + "login_id", + "user_employee_id", + "login_version", + "login_public_ip", + "login_local_ip", + "user_company", + "user_department", + "user_position", + "user_login_time", + "created_at", + "user_family_flag", + } + if err := writer.Write(header); err != nil { + return "", err + } + + for rows.Next() { + record, err := scanRow(rows) + if err != nil { + return "", err + } + if err := writer.Write(record); err != nil { + return "", err + } + } + if err := rows.Err(); err != nil { + return "", err + } + writer.Flush() + if err := writer.Error(); err != nil { + return "", err + } + + if err := os.Rename(tmpPath, outPath); err != nil { + return "", err + } + return outPath, nil +} + +func scanRow(rows *sql.Rows) ([]string, error) { + var ( + id sql.NullInt64 + productName sql.NullString + loginID sql.NullString + employeeID sql.NullString + loginVersion sql.NullString + loginPublicIP sql.NullString + loginLocalIP sql.NullString + userCompany sql.NullString + userDepartment sql.NullString + userPosition sql.NullString + userLoginTime sql.NullString + createdAt sql.NullString + userFamilyFlag sql.NullString + ) + + if err := rows.Scan( + &id, + &productName, + &loginID, + &employeeID, + &loginVersion, + &loginPublicIP, + &loginLocalIP, + &userCompany, + &userDepartment, + &userPosition, + &userLoginTime, + &createdAt, + &userFamilyFlag, + ); err != nil { + return nil, err + } + if !id.Valid { + return nil, fmt.Errorf("row missing id") + } + + return []string{ + strconv.FormatInt(id.Int64, 10), + nullToString(productName), + nullToString(loginID), + nullToString(employeeID), + nullToString(loginVersion), + nullToString(loginPublicIP), + nullToString(loginLocalIP), + nullToString(userCompany), + nullToString(userDepartment), + nullToString(userPosition), + nullToString(userLoginTime), + nullToString(createdAt), + nullToString(userFamilyFlag), + }, nil +} + +func nullToString(v sql.NullString) string { + if v.Valid { + return v.String + } + return "" +} + +func netAddr(host string, port int) string { + return fmt.Sprintf("%s:%d", host, port) +} diff --git a/internal/userprogram/sync.go b/internal/userprogram/sync.go new file mode 100644 index 0000000..970bc54 --- /dev/null +++ b/internal/userprogram/sync.go @@ -0,0 +1,118 @@ +package userprogram + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/jackc/pgx/v5" + + "geoip-rest/internal/importer" +) + +type SyncConfig struct { + MySQL MySQLConfig + DatabaseURL string + InitialCSV string + UpdateDir string + LogDir string + Schema string + Logger *log.Logger +} + +func (c *SyncConfig) defaults() { + if c.InitialCSV == "" { + c.InitialCSV = DefaultInitialCSV + } + if c.UpdateDir == "" { + c.UpdateDir = DefaultUpdateDir + } + if c.LogDir == "" { + c.LogDir = DefaultLogDir + } + if c.Schema == "" { + c.Schema = DefaultSchema + } + if c.Logger == nil { + c.Logger = log.Default() + } +} + +// Sync ensures replica table exists and imports initial data, then dumps and imports +// updates using the primary key high-water mark up to yesterday (KST). +func Sync(ctx context.Context, cfg SyncConfig) error { + cfg.defaults() + + dumper, err := NewDumper(cfg.MySQL, cfg.UpdateDir) + if err != nil { + return fmt.Errorf("init dumper: %w", err) + } + defer dumper.Close() + + conn, err := pgx.Connect(ctx, cfg.DatabaseURL) + if err != nil { + return fmt.Errorf("connect postgres: %w", err) + } + defer conn.Close(context.Background()) + + if err := importer.EnsureUserProgramReplica(ctx, conn, cfg.InitialCSV, cfg.Schema, cfg.LogDir); err != nil { + return fmt.Errorf("ensure replica: %w", err) + } + + lastID, err := importer.LatestID(ctx, conn, cfg.Schema, importer.ReplicaTable) + if err != nil { + return fmt.Errorf("read latest id: %w", err) + } + + endDate := yesterdayKST() + upperID, err := dumper.MaxIDUntil(ctx, endDate) + if err != nil { + return fmt.Errorf("read upstream max id: %w", err) + } + + if upperID <= lastID { + cfg.Logger.Printf("no dump needed (last_id=%d upstream_max=%d)", lastID, upperID) + return nil + } + + cfg.Logger.Printf("dumping ids (%d, %d] to %s", lastID, upperID, cfg.UpdateDir) + csvPath, err := dumper.DumpRange(ctx, lastID, upperID, endDate) + if err != nil { + return fmt.Errorf("dump range: %w", err) + } + if csvPath == "" { + cfg.Logger.Printf("no rows dumped (last_id=%d upstream_max=%d)", lastID, upperID) + return nil + } + + if err := importer.ImportUserProgramUpdates(ctx, conn, csvPath, cfg.Schema, cfg.LogDir); err != nil { + return fmt.Errorf("import updates: %w", err) + } + + cfg.Logger.Printf("sync complete (last_id=%d -> %d)", lastID, upperID) + + if err := verifyCounts(ctx, cfg, dumper, conn, upperID); err != nil { + cfg.Logger.Printf("sync verification warning: %v", err) + } + return nil +} + +func toKST(t time.Time) time.Time { + return t.In(kst()) +} + +func verifyCounts(ctx context.Context, cfg SyncConfig, dumper *Dumper, conn *pgx.Conn, upperID int64) error { + sourceCount, err := dumper.CountUpToID(ctx, upperID) + if err != nil { + return fmt.Errorf("source count: %w", err) + } + targetCount, err := importer.CountUpToID(ctx, conn, cfg.Schema, importer.ReplicaTable, upperID) + if err != nil { + return fmt.Errorf("target count: %w", err) + } + if targetCount != sourceCount { + return fmt.Errorf("count mismatch up to id %d (source=%d target=%d)", upperID, sourceCount, targetCount) + } + return nil +} diff --git a/log/user_program_import_20251210_102751.log b/log/user_program_import_20251210_102751.log new file mode 100644 index 0000000..4829ebf --- /dev/null +++ b/log/user_program_import_20251210_102751.log @@ -0,0 +1,7 @@ +status=failed +csv_path=/update_data/user_program_info_20251209.csv +started_at=2025-12-10T10:27:51+09:00 +finished_at= +rows_copied=0 +rows_upserted=0 +error=ERROR: relation "user_program_info_replica_import_tmp" already exists (SQLSTATE 42P07) diff --git a/to-do.md b/to-do.md index d4dc93c..90a580e 100644 --- a/to-do.md +++ b/to-do.md @@ -17,6 +17,8 @@ - [x] 스케줄러 토글 env(`USER_PROGRAM_CRON_ENABLE`) 추가, true일 때만 크론 구동하도록 변경 완료: 2025-12-10 09:45 KST - [x] 크론 표현식 env(`USER_PROGRAM_CRON`) 제거, 코드에 KST 00:05 고정 스케줄 적용 완료: 2025-12-10 09:56 KST - [x] bash 스크립트 의존 없이 Go CLI(`user-program-sync`)로 덤프+임포트 수행, 스케줄러가 해당 CLI를 직접 호출하도록 변경 완료: 2025-12-10 09:50 KST +- [x] 초기 적재+백필+일일 업데이트를 Go 라이브러리(`internal/userprogram`)로 통합, `user-program-sync`가 초기 CSV 임포트 후 최신 일자까지 덤프/적재하도록 리팩토링 완료: 2025-12-10 10:03 KST +- [x] 증분 기준을 created_at 날짜에서 PK(id) 기반으로 변경, 마지막 id 이후 어제까지의 최대 id까지 덤프/업서트하도록 Sync/Dump 경로 리팩토링 완료: 2025-12-10 10:20 KST ## 진행 예정 - [x] PostgreSQL 전용 Docker 이미지(또는 build 단계)에서 `maxminddb_fdw` 설치 후 `GeoLite2-City.mmdb` 볼륨을 `/data`로 마운트하는 `postgres` 서비스 추가 및 5432 외부 노출