DB 삽입위치 변경, 업데이트 로직 개선
This commit is contained in:
@@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
@@ -12,18 +13,18 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultCSVPath = "./initial_data/user_program_info_init_20251208.csv"
|
defaultCSVPath = "/initial_data/user_program_info_init_20251208.csv"
|
||||||
defaultUpdateDir = "./update_data"
|
defaultUpdateDir = "/update_data"
|
||||||
defaultTimeout = 10 * time.Minute
|
defaultTimeout = 10 * time.Minute
|
||||||
defaultSchema = "public"
|
defaultSchema = "public"
|
||||||
defaultLogDir = "./log"
|
defaultLogDir = "/log"
|
||||||
targetTableName = "user_program_info_replica"
|
targetTableName = "user_program_info_replica"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
dbURL := os.Getenv("DATABASE_URL")
|
dbURL, err := databaseURL()
|
||||||
if dbURL == "" {
|
if err != nil {
|
||||||
log.Fatal("DATABASE_URL is required")
|
log.Fatalf("database config: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
csvPath := env("USER_PROGRAM_INFO_CSV", defaultCSVPath)
|
csvPath := env("USER_PROGRAM_INFO_CSV", defaultCSVPath)
|
||||||
@@ -57,3 +58,18 @@ func env(key, fallback string) string {
|
|||||||
}
|
}
|
||||||
return fallback
|
return fallback
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func databaseURL() (string, error) {
|
||||||
|
if url := os.Getenv("DATABASE_URL"); url != "" {
|
||||||
|
return url, nil
|
||||||
|
}
|
||||||
|
user := os.Getenv("POSTGRES_USER")
|
||||||
|
pass := os.Getenv("POSTGRES_PASSWORD")
|
||||||
|
host := env("POSTGRES_HOST", "localhost")
|
||||||
|
port := env("POSTGRES_PORT", "5432")
|
||||||
|
db := os.Getenv("POSTGRES_DB")
|
||||||
|
if user == "" || pass == "" || db == "" {
|
||||||
|
return "", fmt.Errorf("DATABASE_URL or POSTGRES_{USER,PASSWORD,DB} is required")
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", user, pass, host, port, db), nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
@@ -14,9 +15,9 @@ const defaultTimeout = 30 * time.Minute
|
|||||||
func main() {
|
func main() {
|
||||||
logger := log.New(os.Stdout, "[sync] ", log.LstdFlags)
|
logger := log.New(os.Stdout, "[sync] ", log.LstdFlags)
|
||||||
|
|
||||||
dbURL := os.Getenv("DATABASE_URL")
|
dbURL, err := databaseURL()
|
||||||
if dbURL == "" {
|
if err != nil {
|
||||||
logger.Fatal("DATABASE_URL is required")
|
logger.Fatalf("database config: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mysqlCfg, err := userprogram.NewMySQLConfigFromEnv()
|
mysqlCfg, err := userprogram.NewMySQLConfigFromEnv()
|
||||||
@@ -43,3 +44,24 @@ func main() {
|
|||||||
logger.Fatalf("sync failed: %v", err)
|
logger.Fatalf("sync failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func databaseURL() (string, error) {
|
||||||
|
if url := os.Getenv("DATABASE_URL"); url != "" {
|
||||||
|
return url, nil
|
||||||
|
}
|
||||||
|
user := os.Getenv("POSTGRES_USER")
|
||||||
|
pass := os.Getenv("POSTGRES_PASSWORD")
|
||||||
|
host := os.Getenv("POSTGRES_HOST")
|
||||||
|
if host == "" {
|
||||||
|
host = "localhost"
|
||||||
|
}
|
||||||
|
port := os.Getenv("POSTGRES_PORT")
|
||||||
|
if port == "" {
|
||||||
|
port = "5432"
|
||||||
|
}
|
||||||
|
db := os.Getenv("POSTGRES_DB")
|
||||||
|
if user == "" || pass == "" || db == "" {
|
||||||
|
return "", fmt.Errorf("DATABASE_URL or POSTGRES_{USER,PASSWORD,DB} is required")
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", user, pass, host, port, db), nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -177,9 +177,9 @@ func copyAndUpsertCSV(ctx context.Context, conn *pgx.Conn, path, schema, table s
|
|||||||
_ = tx.Rollback(ctx)
|
_ = tx.Rollback(ctx)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
tempTable := fmt.Sprintf("%s_import_tmp", table)
|
tempTable := fmt.Sprintf("%s_import_tmp_%d", table, time.Now().UnixNano())
|
||||||
|
|
||||||
if _, err := tx.Exec(ctx, fmt.Sprintf(`CREATE TEMP TABLE %s (LIKE %s INCLUDING ALL);`, quoteIdent(tempTable), pgx.Identifier{schema, table}.Sanitize())); err != nil {
|
if _, err := tx.Exec(ctx, fmt.Sprintf(`CREATE TEMP TABLE %s (LIKE %s INCLUDING ALL) ON COMMIT DROP;`, quoteIdent(tempTable), pgx.Identifier{schema, table}.Sanitize())); err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user