46 lines
1.8 KiB
Python
46 lines
1.8 KiB
Python
import pymysql
|
|
import os
|
|
|
|
def clone_database():
|
|
try:
|
|
connection = pymysql.connect(
|
|
host=os.getenv('DB_HOST', 'localhost'),
|
|
user=os.getenv('DB_USER', 'root'),
|
|
password=os.getenv('DB_PASSWORD', '45278434'),
|
|
charset='utf8mb4',
|
|
cursorclass=pymysql.cursors.DictCursor
|
|
)
|
|
|
|
with connection.cursor() as cursor:
|
|
# 1. Create test database
|
|
cursor.execute("CREATE DATABASE IF NOT EXISTS PM_proto_test")
|
|
print("Database PM_proto_test created or already exists.")
|
|
|
|
# 2. Get all tables from source database
|
|
cursor.execute("SHOW TABLES FROM PM_proto")
|
|
tables = cursor.fetchall()
|
|
|
|
for table_row in tables:
|
|
table_name = list(table_row.values())[0]
|
|
|
|
# 3. Drop existing table in test DB if exists
|
|
cursor.execute(f"DROP TABLE IF EXISTS PM_proto_test.{table_name}")
|
|
|
|
# 4. Clone schema and data
|
|
# Note: CREATE TABLE ... LIKE doesn't copy data, and CREATE TABLE ... AS SELECT doesn't copy indexes.
|
|
# So we use LIKE first, then INSERT INTO ... SELECT *
|
|
cursor.execute(f"CREATE TABLE PM_proto_test.{table_name} LIKE PM_proto.{table_name}")
|
|
cursor.execute(f"INSERT INTO PM_proto_test.{table_name} SELECT * FROM PM_proto.{table_name}")
|
|
print(f"Table {table_name} cloned.")
|
|
|
|
connection.commit()
|
|
print("Database cloning completed successfully.")
|
|
except Exception as e:
|
|
print(f"Error during database cloning: {e}")
|
|
finally:
|
|
if 'connection' in locals():
|
|
connection.close()
|
|
|
|
if __name__ == "__main__":
|
|
clone_database()
|