Files
test-mcp/clone_db.py
2026-06-19 16:26:09 +09:00

46 lines
1.8 KiB
Python

import pymysql
import os
def clone_database():
try:
connection = pymysql.connect(
host=os.getenv('DB_HOST', 'localhost'),
user=os.getenv('DB_USER', 'root'),
password=os.getenv('DB_PASSWORD', '45278434'),
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
with connection.cursor() as cursor:
# 1. Create test database
cursor.execute("CREATE DATABASE IF NOT EXISTS PM_proto_test")
print("Database PM_proto_test created or already exists.")
# 2. Get all tables from source database
cursor.execute("SHOW TABLES FROM PM_proto")
tables = cursor.fetchall()
for table_row in tables:
table_name = list(table_row.values())[0]
# 3. Drop existing table in test DB if exists
cursor.execute(f"DROP TABLE IF EXISTS PM_proto_test.{table_name}")
# 4. Clone schema and data
# Note: CREATE TABLE ... LIKE doesn't copy data, and CREATE TABLE ... AS SELECT doesn't copy indexes.
# So we use LIKE first, then INSERT INTO ... SELECT *
cursor.execute(f"CREATE TABLE PM_proto_test.{table_name} LIKE PM_proto.{table_name}")
cursor.execute(f"INSERT INTO PM_proto_test.{table_name} SELECT * FROM PM_proto.{table_name}")
print(f"Table {table_name} cloned.")
connection.commit()
print("Database cloning completed successfully.")
except Exception as e:
print(f"Error during database cloning: {e}")
finally:
if 'connection' in locals():
connection.close()
if __name__ == "__main__":
clone_database()