package importer import ( "context" "database/sql" "encoding/csv" "errors" "fmt" "io" "os" "path/filepath" "regexp" "slices" "strconv" "strings" "time" "github.com/jackc/pgx/v5" ) const ( defaultSchema = "public" ReplicaTable = "user_program_info_replica" ) var ( kstLocation = func() *time.Location { loc, err := time.LoadLocation("Asia/Seoul") if err != nil { return time.FixedZone("KST", 9*60*60) } return loc }() userProgramColumns = []string{ "id", "product_name", "login_id", "user_employee_id", "login_version", "login_public_ip", "login_local_ip", "user_company", "user_department", "user_position", "user_login_time", "created_at", "user_family_flag", } timeLayouts = []string{ "2006-01-02 15:04:05.000", "2006-01-02 15:04:05", } ) // EnsureUserProgramReplica ensures the target table exists, then imports one or more CSVs. // csvPath can point to a single file or a directory (all *.csv will be processed in name order). // Logs are written to logDir for every processed file. func EnsureUserProgramReplica(ctx context.Context, conn *pgx.Conn, csvPath, schema, logDir string) error { if schema == "" { schema = defaultSchema } if logDir == "" { logDir = "log" } if err := createReplicaTable(ctx, conn, schema, ReplicaTable); err != nil { return err } files, err := resolveCSVTargets(csvPath) if err != nil { return err } if len(files) == 0 { return fmt.Errorf("no csv files found at %s", csvPath) } for _, file := range files { if err := importSingle(ctx, conn, file, schema, logDir); err != nil { return err } } return nil } // ImportUserProgramUpdates imports all CSV files under updateDir (non-recursive) into an existing replica table. // Each file is processed independently; failure stops the sequence and logs the error. func ImportUserProgramUpdates(ctx context.Context, conn *pgx.Conn, updateDir, schema, logDir string) error { if updateDir == "" { return nil } files, err := resolveCSVTargets(updateDir) if err != nil { return err } if len(files) == 0 { return nil } for _, file := range files { if err := importSingle(ctx, conn, file, schema, logDir); err != nil { return err } } return nil } func tableExists(ctx context.Context, conn *pgx.Conn, schema, table string) (bool, error) { const q = ` SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2 );` var exists bool if err := conn.QueryRow(ctx, q, schema, table).Scan(&exists); err != nil { return false, err } return exists, nil } func createReplicaTable(ctx context.Context, conn *pgx.Conn, schema, table string) error { identifier := pgx.Identifier{schema, table}.Sanitize() ddl := fmt.Sprintf(` CREATE TABLE IF NOT EXISTS %s ( id bigint PRIMARY KEY, product_name text, login_id text, user_employee_id text, login_version text, login_public_ip text, login_local_ip text, user_company text, user_department text, user_position text, user_login_time timestamp, created_at timestamp, user_family_flag boolean );`, identifier) _, err := conn.Exec(ctx, ddl) return err } type importResult struct { rowsCopied int64 rowsUpserted int64 finishedAt time.Time } func copyAndUpsertCSV(ctx context.Context, conn *pgx.Conn, path, schema, table string) (importResult, error) { res := importResult{} f, err := os.Open(path) if err != nil { return res, err } defer f.Close() reader := csv.NewReader(f) reader.FieldsPerRecord = -1 header, err := reader.Read() if err != nil { return res, err } if len(header) != len(userProgramColumns) { return res, fmt.Errorf("unexpected column count in CSV: got %d, want %d", len(header), len(userProgramColumns)) } tx, err := conn.Begin(ctx) if err != nil { return res, err } defer func() { _ = tx.Rollback(ctx) }() tempTable := fmt.Sprintf("%s_import_tmp", table) if _, err := tx.Exec(ctx, fmt.Sprintf(`CREATE TEMP TABLE %s (LIKE %s INCLUDING ALL);`, quoteIdent(tempTable), pgx.Identifier{schema, table}.Sanitize())); err != nil { return res, err } source := &csvSource{ reader: reader, } copied, err := tx.CopyFrom(ctx, pgx.Identifier{tempTable}, userProgramColumns, source) if err != nil { return res, err } if copied == 0 { return res, errors.New("no rows were copied from CSV") } quotedColumns := quoteColumns(userProgramColumns) upsertSQL := fmt.Sprintf(` INSERT INTO %s (%s) SELECT %s FROM %s ON CONFLICT (id) DO UPDATE SET product_name = EXCLUDED.product_name, login_id = EXCLUDED.login_id, user_employee_id = EXCLUDED.user_employee_id, login_version = EXCLUDED.login_version, login_public_ip = EXCLUDED.login_public_ip, login_local_ip = EXCLUDED.login_local_ip, user_company = EXCLUDED.user_company, user_department = EXCLUDED.user_department, user_position = EXCLUDED.user_position, user_login_time = EXCLUDED.user_login_time, created_at = EXCLUDED.created_at, user_family_flag = EXCLUDED.user_family_flag; `, pgx.Identifier{schema, table}.Sanitize(), strings.Join(quotedColumns, ", "), strings.Join(quotedColumns, ", "), quoteIdent(tempTable)) upsertRes, err := tx.Exec(ctx, upsertSQL) if err != nil { return res, err } if err := tx.Commit(ctx); err != nil { return res, err } res.rowsCopied = copied res.rowsUpserted = upsertRes.RowsAffected() res.finishedAt = time.Now() return res, nil } type csvSource struct { reader *csv.Reader record []string err error } func (s *csvSource) Next() bool { if s.err != nil { return false } rec, err := s.reader.Read() if err != nil { if errors.Is(err, io.EOF) { return false } s.err = err return false } s.record = rec return true } func (s *csvSource) Values() ([]any, error) { if len(s.record) != len(userProgramColumns) { return nil, fmt.Errorf("unexpected record length: got %d, want %d", len(s.record), len(userProgramColumns)) } id, err := strconv.ParseInt(s.record[0], 10, 64) if err != nil { return nil, fmt.Errorf("parse id: %w", err) } loginTime, err := parseTimestamp(s.record[10]) if err != nil { return nil, fmt.Errorf("parse user_login_time: %w", err) } createdAt, err := parseTimestamp(s.record[11]) if err != nil { return nil, fmt.Errorf("parse created_at: %w", err) } var familyFlag any if v := s.record[12]; v == "" { familyFlag = nil } else { switch v { case "1", "true", "TRUE", "t", "T": familyFlag = true case "0", "false", "FALSE", "f", "F": familyFlag = false default: parsed, err := strconv.ParseBool(v) if err != nil { return nil, fmt.Errorf("parse user_family_flag: %w", err) } familyFlag = parsed } } return []any{ id, nullOrString(s.record[1]), nullOrString(s.record[2]), nullOrString(s.record[3]), nullOrString(s.record[4]), nullOrString(s.record[5]), nullOrString(s.record[6]), nullOrString(s.record[7]), nullOrString(s.record[8]), nullOrString(s.record[9]), loginTime, createdAt, familyFlag, }, nil } func (s *csvSource) Err() error { return s.err } func parseTimestamp(raw string) (any, error) { if raw == "" { return nil, nil } for _, layout := range timeLayouts { if t, err := time.ParseInLocation(layout, raw, kstLocation); err == nil { return t, nil } } return nil, fmt.Errorf("unsupported timestamp format: %s", raw) } func nullOrString(val string) any { if val == "" { return nil } return val } func importSingle(ctx context.Context, conn *pgx.Conn, csvPath, schema, logDir string) error { startedAt := time.Now() res, err := copyAndUpsertCSV(ctx, conn, csvPath, schema, ReplicaTable) logStatus := "succeeded" logErrMsg := "" if err != nil { logStatus = "failed" logErrMsg = err.Error() } _ = writeImportLog(logDir, importLog{ StartedAt: startedAt, FinishedAt: res.finishedAt, CSVPath: csvPath, Status: logStatus, RowsCopied: res.rowsCopied, RowsUpserted: res.rowsUpserted, Error: logErrMsg, }) return err } func resolveCSVTargets(path string) ([]string, error) { info, err := os.Stat(path) if err != nil { return nil, err } if info.IsDir() { entries, err := os.ReadDir(path) if err != nil { return nil, err } var files []string for _, e := range entries { if e.IsDir() { continue } if strings.HasSuffix(strings.ToLower(e.Name()), ".csv") { files = append(files, filepath.Join(path, e.Name())) } } slices.Sort(files) return files, nil } return []string{path}, nil } type importLog struct { StartedAt time.Time FinishedAt time.Time CSVPath string Status string RowsCopied int64 RowsUpserted int64 Error string LatestDate time.Time } func writeImportLog(logDir string, entry importLog) error { if err := os.MkdirAll(logDir, 0o755); err != nil { return err } now := time.Now().In(kstLocation) if entry.StartedAt.IsZero() { entry.StartedAt = now } filename := fmt.Sprintf("user_program_import_%s.log", now.Format("20060102_150405")) path := filepath.Join(logDir, filename) f, err := os.Create(path) if err != nil { return err } defer f.Close() start := entry.StartedAt.In(kstLocation).Format(time.RFC3339) finish := "" if !entry.FinishedAt.IsZero() { finish = entry.FinishedAt.In(kstLocation).Format(time.RFC3339) } lines := []string{ fmt.Sprintf("status=%s", entry.Status), fmt.Sprintf("csv_path=%s", entry.CSVPath), fmt.Sprintf("started_at=%s", start), fmt.Sprintf("finished_at=%s", finish), fmt.Sprintf("rows_copied=%d", entry.RowsCopied), fmt.Sprintf("rows_upserted=%d", entry.RowsUpserted), } if entry.Error != "" { lines = append(lines, fmt.Sprintf("error=%s", entry.Error)) } if !entry.LatestDate.IsZero() { lines = append(lines, fmt.Sprintf("latest_date=%s", entry.LatestDate.In(kstLocation).Format("2006-01-02"))) } for _, line := range lines { if _, err := f.WriteString(line + "\n"); err != nil { return err } } return nil } func quoteIdent(s string) string { return `"` + strings.ReplaceAll(s, `"`, `""`) + `"` } func quoteColumns(cols []string) []string { out := make([]string, len(cols)) for i, c := range cols { out[i] = quoteIdent(c) } return out } func LatestCreatedDate(ctx context.Context, conn *pgx.Conn, schema, table string) (time.Time, error) { var ts sql.NullTime query := fmt.Sprintf("SELECT MAX(created_at) FROM %s", pgx.Identifier{schema, table}.Sanitize()) if err := conn.QueryRow(ctx, query).Scan(&ts); err != nil { return time.Time{}, err } if !ts.Valid { return time.Time{}, nil } return truncateToKSTDate(ts.Time), nil } func truncateToKSTDate(t time.Time) time.Time { kst := t.In(kstLocation) return time.Date(kst.Year(), kst.Month(), kst.Day(), 0, 0, 0, 0, kstLocation) } func dateFromFilename(path string) (time.Time, error) { base := filepath.Base(path) re := regexp.MustCompile(`(\d{8})`) match := re.FindStringSubmatch(base) if len(match) < 2 { return time.Time{}, fmt.Errorf("no date in filename: %s", base) } return time.ParseInLocation("20060102", match[1], kstLocation) }