494 lines
11 KiB
Go
494 lines
11 KiB
Go
package importer
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/csv"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
const (
|
|
defaultSchema = "public"
|
|
ReplicaTable = "user_program_info_replica"
|
|
)
|
|
|
|
var (
|
|
kstLocation = func() *time.Location {
|
|
loc, err := time.LoadLocation("Asia/Seoul")
|
|
if err != nil {
|
|
return time.FixedZone("KST", 9*60*60)
|
|
}
|
|
return loc
|
|
}()
|
|
userProgramColumns = []string{
|
|
"id",
|
|
"product_name",
|
|
"login_id",
|
|
"user_employee_id",
|
|
"login_version",
|
|
"login_public_ip",
|
|
"login_local_ip",
|
|
"user_company",
|
|
"user_department",
|
|
"user_position",
|
|
"user_login_time",
|
|
"created_at",
|
|
"user_family_flag",
|
|
}
|
|
timeLayouts = []string{
|
|
"2006-01-02 15:04:05.000",
|
|
"2006-01-02 15:04:05",
|
|
time.RFC3339,
|
|
"2006-01-02T15:04:05.000Z07:00",
|
|
}
|
|
)
|
|
|
|
// EnsureUserProgramReplica ensures the target table exists, then imports one or more CSVs.
|
|
// csvPath can point to a single file or a directory (all *.csv will be processed in name order).
|
|
// Logs are written to logDir for every processed file.
|
|
func EnsureUserProgramReplica(ctx context.Context, conn *pgx.Conn, csvPath, schema, logDir string) error {
|
|
if schema == "" {
|
|
schema = defaultSchema
|
|
}
|
|
if logDir == "" {
|
|
logDir = "log"
|
|
}
|
|
|
|
if err := ensureSchema(ctx, conn, schema); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := createReplicaTable(ctx, conn, schema, ReplicaTable); err != nil {
|
|
return err
|
|
}
|
|
|
|
files, err := resolveCSVTargets(csvPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(files) == 0 {
|
|
return fmt.Errorf("no csv files found at %s", csvPath)
|
|
}
|
|
|
|
for _, file := range files {
|
|
if err := importSingle(ctx, conn, file, schema, logDir); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ImportUserProgramUpdates imports all CSV files under updateDir (non-recursive) into an existing replica table.
|
|
// Each file is processed independently; failure stops the sequence and logs the error.
|
|
func ImportUserProgramUpdates(ctx context.Context, conn *pgx.Conn, updateDir, schema, logDir string) error {
|
|
if updateDir == "" {
|
|
return nil
|
|
}
|
|
files, err := resolveCSVTargets(updateDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(files) == 0 {
|
|
return nil
|
|
}
|
|
|
|
for _, file := range files {
|
|
if err := importSingle(ctx, conn, file, schema, logDir); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func tableExists(ctx context.Context, conn *pgx.Conn, schema, table string) (bool, error) {
|
|
const q = `
|
|
SELECT EXISTS (
|
|
SELECT 1
|
|
FROM information_schema.tables
|
|
WHERE table_schema = $1 AND table_name = $2
|
|
);`
|
|
|
|
var exists bool
|
|
if err := conn.QueryRow(ctx, q, schema, table).Scan(&exists); err != nil {
|
|
return false, err
|
|
}
|
|
return exists, nil
|
|
}
|
|
|
|
func createReplicaTable(ctx context.Context, conn *pgx.Conn, schema, table string) error {
|
|
identifier := pgx.Identifier{schema, table}.Sanitize()
|
|
ddl := fmt.Sprintf(`
|
|
CREATE TABLE IF NOT EXISTS %s (
|
|
id bigint PRIMARY KEY,
|
|
product_name text,
|
|
login_id text,
|
|
user_employee_id text,
|
|
login_version text,
|
|
login_public_ip text,
|
|
login_local_ip text,
|
|
user_company text,
|
|
user_department text,
|
|
user_position text,
|
|
user_login_time timestamp,
|
|
created_at timestamp,
|
|
user_family_flag boolean
|
|
);`, identifier)
|
|
|
|
_, err := conn.Exec(ctx, ddl)
|
|
return err
|
|
}
|
|
|
|
func ensureSchema(ctx context.Context, conn *pgx.Conn, schema string) error {
|
|
if schema == "" {
|
|
return nil
|
|
}
|
|
_, err := conn.Exec(ctx, fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS %s`, pgx.Identifier{schema}.Sanitize()))
|
|
return err
|
|
}
|
|
|
|
type importResult struct {
|
|
rowsCopied int64
|
|
rowsUpserted int64
|
|
finishedAt time.Time
|
|
}
|
|
|
|
func copyAndUpsertCSV(ctx context.Context, conn *pgx.Conn, path, schema, table string) (importResult, error) {
|
|
res := importResult{}
|
|
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
defer f.Close()
|
|
|
|
reader := csv.NewReader(f)
|
|
reader.FieldsPerRecord = -1
|
|
|
|
header, err := reader.Read()
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
if len(header) != len(userProgramColumns) {
|
|
return res, fmt.Errorf("unexpected column count in CSV: got %d, want %d", len(header), len(userProgramColumns))
|
|
}
|
|
|
|
tx, err := conn.Begin(ctx)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
defer func() {
|
|
_ = tx.Rollback(ctx)
|
|
}()
|
|
|
|
tempTable := fmt.Sprintf("%s_import_tmp_%d", table, time.Now().UnixNano())
|
|
|
|
if _, err := tx.Exec(ctx, fmt.Sprintf(`CREATE TEMP TABLE %s (LIKE %s INCLUDING ALL) ON COMMIT DROP;`, quoteIdent(tempTable), pgx.Identifier{schema, table}.Sanitize())); err != nil {
|
|
return res, err
|
|
}
|
|
|
|
source := &csvSource{
|
|
reader: reader,
|
|
}
|
|
|
|
copied, err := tx.CopyFrom(ctx, pgx.Identifier{tempTable}, userProgramColumns, source)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
if copied == 0 {
|
|
return res, errors.New("no rows were copied from CSV")
|
|
}
|
|
|
|
quotedColumns := quoteColumns(userProgramColumns)
|
|
upsertSQL := fmt.Sprintf(`
|
|
INSERT INTO %s (%s)
|
|
SELECT %s FROM %s
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
product_name = EXCLUDED.product_name,
|
|
login_id = EXCLUDED.login_id,
|
|
user_employee_id = EXCLUDED.user_employee_id,
|
|
login_version = EXCLUDED.login_version,
|
|
login_public_ip = EXCLUDED.login_public_ip,
|
|
login_local_ip = EXCLUDED.login_local_ip,
|
|
user_company = EXCLUDED.user_company,
|
|
user_department = EXCLUDED.user_department,
|
|
user_position = EXCLUDED.user_position,
|
|
user_login_time = EXCLUDED.user_login_time,
|
|
created_at = EXCLUDED.created_at,
|
|
user_family_flag = EXCLUDED.user_family_flag;
|
|
`, pgx.Identifier{schema, table}.Sanitize(), strings.Join(quotedColumns, ", "), strings.Join(quotedColumns, ", "), quoteIdent(tempTable))
|
|
|
|
upsertRes, err := tx.Exec(ctx, upsertSQL)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return res, err
|
|
}
|
|
|
|
res.rowsCopied = copied
|
|
res.rowsUpserted = upsertRes.RowsAffected()
|
|
res.finishedAt = time.Now()
|
|
return res, nil
|
|
}
|
|
|
|
type csvSource struct {
|
|
reader *csv.Reader
|
|
record []string
|
|
err error
|
|
}
|
|
|
|
func (s *csvSource) Next() bool {
|
|
if s.err != nil {
|
|
return false
|
|
}
|
|
|
|
rec, err := s.reader.Read()
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
return false
|
|
}
|
|
s.err = err
|
|
return false
|
|
}
|
|
|
|
s.record = rec
|
|
return true
|
|
}
|
|
|
|
func (s *csvSource) Values() ([]any, error) {
|
|
if len(s.record) != len(userProgramColumns) {
|
|
return nil, fmt.Errorf("unexpected record length: got %d, want %d", len(s.record), len(userProgramColumns))
|
|
}
|
|
|
|
id, err := strconv.ParseInt(s.record[0], 10, 64)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse id: %w", err)
|
|
}
|
|
|
|
loginTime, err := parseTimestamp(s.record[10])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse user_login_time: %w", err)
|
|
}
|
|
|
|
createdAt, err := parseTimestamp(s.record[11])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse created_at: %w", err)
|
|
}
|
|
|
|
var familyFlag any
|
|
if v := s.record[12]; v == "" {
|
|
familyFlag = nil
|
|
} else {
|
|
switch v {
|
|
case "1", "true", "TRUE", "t", "T":
|
|
familyFlag = true
|
|
case "0", "false", "FALSE", "f", "F":
|
|
familyFlag = false
|
|
default:
|
|
parsed, err := strconv.ParseBool(v)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse user_family_flag: %w", err)
|
|
}
|
|
familyFlag = parsed
|
|
}
|
|
}
|
|
|
|
return []any{
|
|
id,
|
|
nullOrString(s.record[1]),
|
|
nullOrString(s.record[2]),
|
|
nullOrString(s.record[3]),
|
|
nullOrString(s.record[4]),
|
|
nullOrString(s.record[5]),
|
|
nullOrString(s.record[6]),
|
|
nullOrString(s.record[7]),
|
|
nullOrString(s.record[8]),
|
|
nullOrString(s.record[9]),
|
|
loginTime,
|
|
createdAt,
|
|
familyFlag,
|
|
}, nil
|
|
}
|
|
|
|
func (s *csvSource) Err() error {
|
|
return s.err
|
|
}
|
|
|
|
func parseTimestamp(raw string) (any, error) {
|
|
if raw == "" {
|
|
return nil, nil
|
|
}
|
|
for _, layout := range timeLayouts {
|
|
if t, err := time.ParseInLocation(layout, raw, kstLocation); err == nil {
|
|
return t, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("unsupported timestamp format: %s", raw)
|
|
}
|
|
|
|
func nullOrString(val string) any {
|
|
if val == "" {
|
|
return nil
|
|
}
|
|
return val
|
|
}
|
|
|
|
func importSingle(ctx context.Context, conn *pgx.Conn, csvPath, schema, logDir string) error {
|
|
startedAt := time.Now()
|
|
|
|
res, err := copyAndUpsertCSV(ctx, conn, csvPath, schema, ReplicaTable)
|
|
logStatus := "succeeded"
|
|
logErrMsg := ""
|
|
if err != nil {
|
|
logStatus = "failed"
|
|
logErrMsg = err.Error()
|
|
}
|
|
|
|
_ = writeImportLog(logDir, importLog{
|
|
StartedAt: startedAt,
|
|
FinishedAt: res.finishedAt,
|
|
CSVPath: csvPath,
|
|
Status: logStatus,
|
|
RowsCopied: res.rowsCopied,
|
|
RowsUpserted: res.rowsUpserted,
|
|
Error: logErrMsg,
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
func resolveCSVTargets(path string) ([]string, error) {
|
|
info, err := os.Stat(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if info.IsDir() {
|
|
entries, err := os.ReadDir(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var files []string
|
|
for _, e := range entries {
|
|
if e.IsDir() {
|
|
continue
|
|
}
|
|
if strings.HasSuffix(strings.ToLower(e.Name()), ".csv") {
|
|
files = append(files, filepath.Join(path, e.Name()))
|
|
}
|
|
}
|
|
slices.Sort(files)
|
|
return files, nil
|
|
}
|
|
return []string{path}, nil
|
|
}
|
|
|
|
type importLog struct {
|
|
StartedAt time.Time
|
|
FinishedAt time.Time
|
|
CSVPath string
|
|
Status string
|
|
RowsCopied int64
|
|
RowsUpserted int64
|
|
Error string
|
|
LatestDate time.Time
|
|
}
|
|
|
|
func writeImportLog(logDir string, entry importLog) error {
|
|
if err := os.MkdirAll(logDir, 0o755); err != nil {
|
|
return err
|
|
}
|
|
|
|
now := time.Now().In(kstLocation)
|
|
if entry.StartedAt.IsZero() {
|
|
entry.StartedAt = now
|
|
}
|
|
filename := fmt.Sprintf("user_program_import_%s.log", now.Format("20060102_150405"))
|
|
path := filepath.Join(logDir, filename)
|
|
|
|
f, err := os.Create(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer f.Close()
|
|
|
|
start := entry.StartedAt.In(kstLocation).Format(time.RFC3339)
|
|
finish := ""
|
|
if !entry.FinishedAt.IsZero() {
|
|
finish = entry.FinishedAt.In(kstLocation).Format(time.RFC3339)
|
|
}
|
|
|
|
lines := []string{
|
|
fmt.Sprintf("status=%s", entry.Status),
|
|
fmt.Sprintf("csv_path=%s", entry.CSVPath),
|
|
fmt.Sprintf("started_at=%s", start),
|
|
fmt.Sprintf("finished_at=%s", finish),
|
|
fmt.Sprintf("rows_copied=%d", entry.RowsCopied),
|
|
fmt.Sprintf("rows_upserted=%d", entry.RowsUpserted),
|
|
}
|
|
if entry.Error != "" {
|
|
lines = append(lines, fmt.Sprintf("error=%s", entry.Error))
|
|
}
|
|
if !entry.LatestDate.IsZero() {
|
|
lines = append(lines, fmt.Sprintf("latest_date=%s", entry.LatestDate.In(kstLocation).Format("2006-01-02")))
|
|
}
|
|
|
|
for _, line := range lines {
|
|
if _, err := f.WriteString(line + "\n"); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func quoteIdent(s string) string {
|
|
return `"` + strings.ReplaceAll(s, `"`, `""`) + `"`
|
|
}
|
|
|
|
func quoteColumns(cols []string) []string {
|
|
out := make([]string, len(cols))
|
|
for i, c := range cols {
|
|
out[i] = quoteIdent(c)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func LatestCreatedDate(ctx context.Context, conn *pgx.Conn, schema, table string) (time.Time, error) {
|
|
var ts sql.NullTime
|
|
query := fmt.Sprintf("SELECT MAX(created_at) FROM %s", pgx.Identifier{schema, table}.Sanitize())
|
|
if err := conn.QueryRow(ctx, query).Scan(&ts); err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
if !ts.Valid {
|
|
return time.Time{}, nil
|
|
}
|
|
return truncateToKSTDate(ts.Time), nil
|
|
}
|
|
|
|
func truncateToKSTDate(t time.Time) time.Time {
|
|
kst := t.In(kstLocation)
|
|
return time.Date(kst.Year(), kst.Month(), kst.Day(), 0, 0, 0, 0, kstLocation)
|
|
}
|
|
|
|
func dateFromFilename(path string) (time.Time, error) {
|
|
base := filepath.Base(path)
|
|
re := regexp.MustCompile(`(\d{8})`)
|
|
match := re.FindStringSubmatch(base)
|
|
if len(match) < 2 {
|
|
return time.Time{}, fmt.Errorf("no date in filename: %s", base)
|
|
}
|
|
return time.ParseInLocation("20060102", match[1], kstLocation)
|
|
}
|