Files
geoip-rest/internal/importer/user_program_info.go
2025-12-09 19:29:34 +09:00

508 lines
12 KiB
Go

package importer
import (
"context"
"database/sql"
"encoding/csv"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"slices"
"strconv"
"strings"
"time"
"github.com/jackc/pgx/v5"
)
const (
defaultSchema = "public"
replicaTable = "user_program_info_replica"
)
var (
kstLocation = func() *time.Location {
loc, err := time.LoadLocation("Asia/Seoul")
if err != nil {
return time.FixedZone("KST", 9*60*60)
}
return loc
}()
userProgramColumns = []string{
"id",
"product_name",
"login_id",
"user_employee_id",
"login_version",
"login_public_ip",
"login_local_ip",
"user_company",
"user_department",
"user_position",
"user_login_time",
"created_at",
"user_family_flag",
}
timeLayouts = []string{
"2006-01-02 15:04:05.000",
"2006-01-02 15:04:05",
}
)
// EnsureUserProgramReplica ensures the target table exists, then imports one or more CSVs.
// csvPath can point to a single file or a directory (all *.csv will be processed in name order).
// Logs are written to logDir for every processed file.
func EnsureUserProgramReplica(ctx context.Context, conn *pgx.Conn, csvPath, schema, logDir string) error {
if schema == "" {
schema = defaultSchema
}
if logDir == "" {
logDir = "log"
}
if err := createReplicaTable(ctx, conn, schema, replicaTable); err != nil {
return err
}
files, err := resolveCSVTargets(csvPath)
if err != nil {
return err
}
if len(files) == 0 {
return fmt.Errorf("no csv files found at %s", csvPath)
}
for _, file := range files {
if err := importSingle(ctx, conn, file, schema, logDir); err != nil {
return err
}
}
return nil
}
// ImportUserProgramUpdates imports all CSV files under updateDir (non-recursive) into an existing replica table.
// Each file is processed independently; failure stops the sequence and logs the error.
func ImportUserProgramUpdates(ctx context.Context, conn *pgx.Conn, updateDir, schema, logDir string) error {
if updateDir == "" {
return nil
}
files, err := resolveCSVTargets(updateDir)
if err != nil {
return err
}
if len(files) == 0 {
return nil
}
latestDate, err := latestCreatedDate(ctx, conn, schema, replicaTable)
if err != nil {
return err
}
for _, file := range files {
fileDate, err := dateFromFilename(file)
if err != nil {
_ = writeImportLog(logDir, importLog{
StartedAt: time.Now(),
CSVPath: file,
Status: "skipped",
Error: fmt.Sprintf("cannot parse date from filename: %v", err),
LatestDate: latestDate,
})
continue
}
if !fileDate.After(latestDate) {
_ = writeImportLog(logDir, importLog{
StartedAt: time.Now(),
CSVPath: file,
Status: "skipped",
Error: fmt.Sprintf("file date %s not after latest date %s", fileDate.Format("2006-01-02"), latestDate.Format("2006-01-02")),
LatestDate: latestDate,
})
continue
}
if err := importSingle(ctx, conn, file, schema, logDir); err != nil {
return err
}
latestDate = fileDate
}
return nil
}
func tableExists(ctx context.Context, conn *pgx.Conn, schema, table string) (bool, error) {
const q = `
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = $1 AND table_name = $2
);`
var exists bool
if err := conn.QueryRow(ctx, q, schema, table).Scan(&exists); err != nil {
return false, err
}
return exists, nil
}
func createReplicaTable(ctx context.Context, conn *pgx.Conn, schema, table string) error {
identifier := pgx.Identifier{schema, table}.Sanitize()
ddl := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id bigint PRIMARY KEY,
product_name text,
login_id text,
user_employee_id text,
login_version text,
login_public_ip text,
login_local_ip text,
user_company text,
user_department text,
user_position text,
user_login_time timestamp,
created_at timestamp,
user_family_flag boolean
);`, identifier)
_, err := conn.Exec(ctx, ddl)
return err
}
type importResult struct {
rowsCopied int64
rowsUpserted int64
finishedAt time.Time
}
func copyAndUpsertCSV(ctx context.Context, conn *pgx.Conn, path, schema, table string) (importResult, error) {
res := importResult{}
f, err := os.Open(path)
if err != nil {
return res, err
}
defer f.Close()
reader := csv.NewReader(f)
reader.FieldsPerRecord = -1
header, err := reader.Read()
if err != nil {
return res, err
}
if len(header) != len(userProgramColumns) {
return res, fmt.Errorf("unexpected column count in CSV: got %d, want %d", len(header), len(userProgramColumns))
}
tx, err := conn.Begin(ctx)
if err != nil {
return res, err
}
defer func() {
_ = tx.Rollback(ctx)
}()
tempTable := fmt.Sprintf("%s_import_tmp", table)
if _, err := tx.Exec(ctx, fmt.Sprintf(`CREATE TEMP TABLE %s (LIKE %s INCLUDING ALL);`, quoteIdent(tempTable), pgx.Identifier{schema, table}.Sanitize())); err != nil {
return res, err
}
source := &csvSource{
reader: reader,
}
copied, err := tx.CopyFrom(ctx, pgx.Identifier{tempTable}, userProgramColumns, source)
if err != nil {
return res, err
}
if copied == 0 {
return res, errors.New("no rows were copied from CSV")
}
quotedColumns := quoteColumns(userProgramColumns)
upsertSQL := fmt.Sprintf(`
INSERT INTO %s (%s)
SELECT %s FROM %s
ON CONFLICT (id) DO UPDATE SET
product_name = EXCLUDED.product_name,
login_id = EXCLUDED.login_id,
user_employee_id = EXCLUDED.user_employee_id,
login_version = EXCLUDED.login_version,
login_public_ip = EXCLUDED.login_public_ip,
login_local_ip = EXCLUDED.login_local_ip,
user_company = EXCLUDED.user_company,
user_department = EXCLUDED.user_department,
user_position = EXCLUDED.user_position,
user_login_time = EXCLUDED.user_login_time,
created_at = EXCLUDED.created_at,
user_family_flag = EXCLUDED.user_family_flag;
`, pgx.Identifier{schema, table}.Sanitize(), strings.Join(quotedColumns, ", "), strings.Join(quotedColumns, ", "), quoteIdent(tempTable))
upsertRes, err := tx.Exec(ctx, upsertSQL)
if err != nil {
return res, err
}
if err := tx.Commit(ctx); err != nil {
return res, err
}
res.rowsCopied = copied
res.rowsUpserted = upsertRes.RowsAffected()
res.finishedAt = time.Now()
return res, nil
}
type csvSource struct {
reader *csv.Reader
record []string
err error
}
func (s *csvSource) Next() bool {
if s.err != nil {
return false
}
rec, err := s.reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
return false
}
s.err = err
return false
}
s.record = rec
return true
}
func (s *csvSource) Values() ([]any, error) {
if len(s.record) != len(userProgramColumns) {
return nil, fmt.Errorf("unexpected record length: got %d, want %d", len(s.record), len(userProgramColumns))
}
id, err := strconv.ParseInt(s.record[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("parse id: %w", err)
}
loginTime, err := parseTimestamp(s.record[10])
if err != nil {
return nil, fmt.Errorf("parse user_login_time: %w", err)
}
createdAt, err := parseTimestamp(s.record[11])
if err != nil {
return nil, fmt.Errorf("parse created_at: %w", err)
}
var familyFlag any
if v := s.record[12]; v == "" {
familyFlag = nil
} else {
switch v {
case "1", "true", "TRUE", "t", "T":
familyFlag = true
case "0", "false", "FALSE", "f", "F":
familyFlag = false
default:
parsed, err := strconv.ParseBool(v)
if err != nil {
return nil, fmt.Errorf("parse user_family_flag: %w", err)
}
familyFlag = parsed
}
}
return []any{
id,
nullOrString(s.record[1]),
nullOrString(s.record[2]),
nullOrString(s.record[3]),
nullOrString(s.record[4]),
nullOrString(s.record[5]),
nullOrString(s.record[6]),
nullOrString(s.record[7]),
nullOrString(s.record[8]),
nullOrString(s.record[9]),
loginTime,
createdAt,
familyFlag,
}, nil
}
func (s *csvSource) Err() error {
return s.err
}
func parseTimestamp(raw string) (any, error) {
if raw == "" {
return nil, nil
}
for _, layout := range timeLayouts {
if t, err := time.ParseInLocation(layout, raw, kstLocation); err == nil {
return t, nil
}
}
return nil, fmt.Errorf("unsupported timestamp format: %s", raw)
}
func nullOrString(val string) any {
if val == "" {
return nil
}
return val
}
func importSingle(ctx context.Context, conn *pgx.Conn, csvPath, schema, logDir string) error {
startedAt := time.Now()
res, err := copyAndUpsertCSV(ctx, conn, csvPath, schema, replicaTable)
logStatus := "succeeded"
logErrMsg := ""
if err != nil {
logStatus = "failed"
logErrMsg = err.Error()
}
_ = writeImportLog(logDir, importLog{
StartedAt: startedAt,
FinishedAt: res.finishedAt,
CSVPath: csvPath,
Status: logStatus,
RowsCopied: res.rowsCopied,
RowsUpserted: res.rowsUpserted,
Error: logErrMsg,
})
return err
}
func resolveCSVTargets(path string) ([]string, error) {
info, err := os.Stat(path)
if err != nil {
return nil, err
}
if info.IsDir() {
entries, err := os.ReadDir(path)
if err != nil {
return nil, err
}
var files []string
for _, e := range entries {
if e.IsDir() {
continue
}
if strings.HasSuffix(strings.ToLower(e.Name()), ".csv") {
files = append(files, filepath.Join(path, e.Name()))
}
}
slices.Sort(files)
return files, nil
}
return []string{path}, nil
}
type importLog struct {
StartedAt time.Time
FinishedAt time.Time
CSVPath string
Status string
RowsCopied int64
RowsUpserted int64
Error string
LatestDate time.Time
}
func writeImportLog(logDir string, entry importLog) error {
if err := os.MkdirAll(logDir, 0o755); err != nil {
return err
}
now := time.Now().In(kstLocation)
if entry.StartedAt.IsZero() {
entry.StartedAt = now
}
filename := fmt.Sprintf("user_program_import_%s.log", now.Format("20060102_150405"))
path := filepath.Join(logDir, filename)
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
start := entry.StartedAt.In(kstLocation).Format(time.RFC3339)
finish := ""
if !entry.FinishedAt.IsZero() {
finish = entry.FinishedAt.In(kstLocation).Format(time.RFC3339)
}
lines := []string{
fmt.Sprintf("status=%s", entry.Status),
fmt.Sprintf("csv_path=%s", entry.CSVPath),
fmt.Sprintf("started_at=%s", start),
fmt.Sprintf("finished_at=%s", finish),
fmt.Sprintf("rows_copied=%d", entry.RowsCopied),
fmt.Sprintf("rows_upserted=%d", entry.RowsUpserted),
}
if entry.Error != "" {
lines = append(lines, fmt.Sprintf("error=%s", entry.Error))
}
if !entry.LatestDate.IsZero() {
lines = append(lines, fmt.Sprintf("latest_date=%s", entry.LatestDate.In(kstLocation).Format("2006-01-02")))
}
for _, line := range lines {
if _, err := f.WriteString(line + "\n"); err != nil {
return err
}
}
return nil
}
func quoteIdent(s string) string {
return `"` + strings.ReplaceAll(s, `"`, `""`) + `"`
}
func quoteColumns(cols []string) []string {
out := make([]string, len(cols))
for i, c := range cols {
out[i] = quoteIdent(c)
}
return out
}
func latestCreatedDate(ctx context.Context, conn *pgx.Conn, schema, table string) (time.Time, error) {
var ts sql.NullTime
query := fmt.Sprintf("SELECT MAX(created_at) FROM %s", pgx.Identifier{schema, table}.Sanitize())
if err := conn.QueryRow(ctx, query).Scan(&ts); err != nil {
return time.Time{}, err
}
if !ts.Valid {
return time.Time{}, nil
}
return truncateToKSTDate(ts.Time), nil
}
func truncateToKSTDate(t time.Time) time.Time {
kst := t.In(kstLocation)
return time.Date(kst.Year(), kst.Month(), kst.Day(), 0, 0, 0, 0, kstLocation)
}
func dateFromFilename(path string) (time.Time, error) {
base := filepath.Base(path)
re := regexp.MustCompile(`(\d{8})`)
match := re.FindStringSubmatch(base)
if len(match) < 2 {
return time.Time{}, fmt.Errorf("no date in filename: %s", base)
}
return time.ParseInLocation("20060102", match[1], kstLocation)
}