package userprogram import ( "context" "fmt" "log" "os" "path/filepath" "time" "github.com/jackc/pgx/v5" "geoip-rest/internal/geo" "geoip-rest/internal/importer" ) const defaultMMDBPath = "/initial_data/GeoLite2-City.mmdb" type SyncConfig struct { MySQL MySQLConfig DatabaseURL string Backend Backend MMDBPath string LookupQuery string InitialCSV string UpdateDir string LogDir string Schema string Logger *log.Logger } func (c *SyncConfig) defaults() { if c.InitialCSV == "" { c.InitialCSV = DefaultInitialCSV } if c.UpdateDir == "" { c.UpdateDir = DefaultUpdateDir } if c.LogDir == "" { c.LogDir = DefaultLogDir } if c.Schema == "" { c.Schema = DefaultSchema } if c.MMDBPath == "" { c.MMDBPath = defaultMMDBPath } if c.Logger == nil { c.Logger = log.Default() } } // Sync ensures replica table exists and imports initial data, then dumps and imports // updates using the primary key high-water mark up to yesterday (KST). func Sync(ctx context.Context, cfg SyncConfig) error { cfg.defaults() dumper, err := NewDumper(cfg.MySQL, cfg.UpdateDir) if err != nil { return fmt.Errorf("init dumper: %w", err) } defer dumper.Close() conn, err := pgx.Connect(ctx, cfg.DatabaseURL) if err != nil { return fmt.Errorf("connect postgres: %w", err) } defer conn.Close(context.Background()) if err := importer.EnsureUserProgramReplica(ctx, conn, cfg.InitialCSV, cfg.Schema, cfg.LogDir); err != nil { return fmt.Errorf("ensure replica: %w", err) } lastID, err := importer.LatestID(ctx, conn, cfg.Schema, importer.ReplicaTable) if err != nil { return fmt.Errorf("read latest id: %w", err) } endDate := yesterdayKST() upperID, err := dumper.MaxIDUntil(ctx, endDate) if err != nil { return fmt.Errorf("read upstream max id: %w", err) } if upperID <= lastID { cfg.Logger.Printf("no dump needed (last_id=%d upstream_max=%d)", lastID, upperID) return nil } cfg.Logger.Printf("dumping ids (%d, %d] to %s", lastID, upperID, cfg.UpdateDir) csvPath, err := dumper.DumpRange(ctx, lastID, upperID, endDate) if err != nil { return fmt.Errorf("dump range: %w", err) } if csvPath == "" { cfg.Logger.Printf("no rows dumped (last_id=%d upstream_max=%d)", lastID, upperID) return nil } if err := importer.ImportUserProgramUpdates(ctx, conn, csvPath, cfg.Schema, cfg.LogDir); err != nil { return fmt.Errorf("import updates: %w", err) } if err := ensureIPGeoInfo(ctx, cfg, conn); err != nil { cfg.Logger.Printf("ip_geoinfo update warning: %v", err) } cfg.Logger.Printf("sync complete (last_id=%d -> %d)", lastID, upperID) if err := verifyCounts(ctx, cfg, dumper, conn, upperID); err != nil { cfg.Logger.Printf("sync verification warning: %v", err) } return nil } func toKST(t time.Time) time.Time { return t.In(kst()) } func verifyCounts(ctx context.Context, cfg SyncConfig, dumper *Dumper, conn *pgx.Conn, upperID int64) error { sourceCount, err := dumper.CountUpToID(ctx, upperID) if err != nil { return fmt.Errorf("source count: %w", err) } targetCount, err := importer.CountUpToID(ctx, conn, cfg.Schema, importer.ReplicaTable, upperID) if err != nil { return fmt.Errorf("target count: %w", err) } if targetCount != sourceCount { return fmt.Errorf("count mismatch up to id %d (source=%d target=%d)", upperID, sourceCount, targetCount) } return nil } func ensureIPGeoInfo(ctx context.Context, cfg SyncConfig, conn *pgx.Conn) error { exists, err := ipGeoInfoExists(ctx, conn, cfg.Schema) if err != nil { return err } if !exists { seedPath := filepath.Join("/initial_data", "ip_geoinfo_seed_20251208.sql") if _, err := os.Stat(seedPath); err == nil { if err := ExecuteSQLFile(ctx, conn, seedPath); err != nil { return fmt.Errorf("execute seed sql: %w", err) } exists = true } } if err := EnsureIPGeoInfoTable(ctx, conn, cfg.Schema); err != nil { return err } ts := time.Now().In(kst()).Format("20060102-150405") ipListPath := filepath.Join(cfg.UpdateDir, fmt.Sprintf("public_ip_list_%s.csv", ts)) if err := ExportPublicIPs(ctx, conn, cfg.Schema, ipListPath); err != nil { return fmt.Errorf("export public ip list: %w", err) } resolver, err := ResolveBackend(geo.Config{ Backend: geo.Backend(cfg.Backend), MMDBPath: cfg.MMDBPath, DatabaseURL: cfg.DatabaseURL, LookupQuery: cfg.LookupQuery, }) if err != nil { return fmt.Errorf("init resolver for ip_geoinfo: %w", err) } defer resolver.Close() sqlPath := filepath.Join(cfg.UpdateDir, fmt.Sprintf("ip_geoinfo_update-%s.sql", ts)) count, err := GenerateIPGeoInfoSQL(ctx, conn, cfg.Schema, resolver, sqlPath, true) if err != nil { return fmt.Errorf("generate ip_geoinfo sql: %w", err) } if count == 0 { if !exists { return fmt.Errorf("seeded ip_geoinfo but no new IPs found for update") } return nil } if err := ExecuteSQLFile(ctx, conn, sqlPath); err != nil { return fmt.Errorf("execute ip_geoinfo sql: %w", err) } return nil }