undefined
Slide 1: Introduction to Database Sharding
Database sharding is a technique used to horizontally partition data across multiple databases. This approach is particularly useful when dealing with large-scale applications that have outgrown a single database instance. In this presentation, we'll explore how to split and distribute a monolithic database using Python, with a focus on practical implementation.
Copyimport mysql.connector
from mysql.connector import errorcode
def connect_to_database(config):
try:
cnx = mysql.connector.connect(**config)
print(f"Connected to {config['database']}")
return cnx
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Invalid credentials")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
return None
# Example usage
config = {
'user': 'your_username',
'password': 'your_password',
'host': 'localhost',
'database': 'your_database'
}
connection = connect_to_database(config)
if connection:
connection.close()Slide 2: Understanding the Need for Sharding
As applications grow, a single database can become a bottleneck. Sharding addresses this by distributing data across multiple servers, improving performance and scalability. Let's visualize the growth of data and its impact on query time.
Copyimport matplotlib.pyplot as plt
import numpy as np
def plot_data_growth():
data_size = np.arange(1, 11)
query_time_single_db = data_size ** 2
query_time_sharded = np.log(data_size)
plt.figure(figsize=(10, 6))
plt.plot(data_size, query_time_single_db, label='Single Database')
plt.plot(data_size, query_time_sharded, label='Sharded Database')
plt.xlabel('Data Size (TB)')
plt.ylabel('Query Time (ms)')
plt.title('Query Time vs Data Size')
plt.legend()
plt.show()
plot_data_growth()Slide 3: Choosing a Sharding Key
The sharding key is crucial for distributing data efficiently. It should be chosen based on how the data is frequently accessed. Let's implement a simple sharding key selection function.
Copydef select_sharding_key(table_schema):
candidate_keys = []
for column, properties in table_schema.items():
if properties['unique'] and properties['distribution'] == 'uniform':
candidate_keys.append(column)
if not candidate_keys:
return None
return max(candidate_keys, key=lambda k: table_schema[k]['query_frequency'])
# Example usage
user_table = {
'id': {'unique': True, 'distribution': 'uniform', 'query_frequency': 0.8},
'email': {'unique': True, 'distribution': 'uniform', 'query_frequency': 0.6},
'username': {'unique': True, 'distribution': 'skewed', 'query_frequency': 0.4}
}
sharding_key = select_sharding_key(user_table)
print(f"Selected sharding key: {sharding_key}")Slide 4: Implementing a Sharding Function
A sharding function determines which shard should store a particular piece of data. Here's a simple implementation using the modulo operation.
Copydef shard_data(data, sharding_key, num_shards):
shards = [[] for _ in range(num_shards)]
for item in data:
shard_index = hash(item[sharding_key]) % num_shards
shards[shard_index].append(item)
return shards
# Example usage
users = [
{'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
{'id': 2, 'name': 'Bob', 'email': 'bob@example.com'},
{'id': 3, 'name': 'Charlie', 'email': 'charlie@example.com'},
{'id': 4, 'name': 'David', 'email': 'david@example.com'}
]
sharded_users = shard_data(users, 'id', 2)
for i, shard in enumerate(sharded_users):
print(f"Shard {i}: {shard}")Slide 5: Setting Up Shard Databases
To implement sharding, we need to create multiple databases to store our sharded data. Let's write a function to set up these shard databases.
Copyimport mysql.connector
def setup_shard_databases(config, num_shards):
try:
cnx = mysql.connector.connect(
host=config['host'],
user=config['user'],
password=config['password']
)
cursor = cnx.cursor()
for i in range(num_shards):
db_name = f"{config['database']}_shard_{i}"
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name}")
print(f"Created database: {db_name}")
cursor.close()
cnx.close()
print("Shard databases setup complete")
except mysql.connector.Error as err:
print(f"Error: {err}")
# Example usage
config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database'
}
setup_shard_databases(config, 3)Slide 6: Migrating Data to Shards
Once we have our shard databases set up, we need to migrate the existing data from the monolithic database to the shards. Here's a function to handle this process.
Copyimport mysql.connector
def migrate_data_to_shards(source_config, shard_configs, sharding_key, table_name):
try:
# Connect to source database
source_cnx = mysql.connector.connect(**source_config)
source_cursor = source_cnx.cursor(dictionary=True)
# Fetch all data from source table
source_cursor.execute(f"SELECT * FROM {table_name}")
data = source_cursor.fetchall()
# Shard the data
sharded_data = shard_data(data, sharding_key, len(shard_configs))
# Migrate data to each shard
for i, shard_data in enumerate(sharded_data):
shard_cnx = mysql.connector.connect(**shard_configs[i])
shard_cursor = shard_cnx.cursor()
for item in shard_data:
columns = ', '.join(item.keys())
placeholders = ', '.join(['%s'] * len(item))
query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
shard_cursor.execute(query, tuple(item.values()))
shard_cnx.commit()
shard_cursor.close()
shard_cnx.close()
print(f"Data migrated to shard {i}")
source_cursor.close()
source_cnx.close()
print("Data migration complete")
except mysql.connector.Error as err:
print(f"Error: {err}")
# Example usage
source_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database'
}
shard_configs = [
{'host': 'localhost', 'user': 'your_username', 'password': 'your_password', 'database': 'your_database_shard_0'},
{'host': 'localhost', 'user': 'your_username', 'password': 'your_password', 'database': 'your_database_shard_1'},
{'host': 'localhost', 'user': 'your_username', 'password': 'your_password', 'database': 'your_database_shard_2'}
]
migrate_data_to_shards(source_config, shard_configs, 'id', 'users')Slide 7: Implementing a Shard-Aware Query Router
To make our sharded database system transparent to the application, we need a query router that can direct queries to the appropriate shard.
Copyimport mysql.connector
class ShardRouter:
def __init__(self, shard_configs, sharding_key):
self.shard_configs = shard_configs
self.sharding_key = sharding_key
self.connections = [mysql.connector.connect(**config) for config in shard_configs]
def execute_query(self, query, params=None):
if self.sharding_key in params:
shard_index = hash(params[self.sharding_key]) % len(self.shard_configs)
connection = self.connections[shard_index]
cursor = connection.cursor(dictionary=True)
cursor.execute(query, params)
result = cursor.fetchall()
cursor.close()
return result
else:
results = []
for connection in self.connections:
cursor = connection.cursor(dictionary=True)
cursor.execute(query, params)
results.extend(cursor.fetchall())
cursor.close()
return results
def close(self):
for connection in self.connections:
connection.close()
# Example usage
router = ShardRouter(shard_configs, 'id')
result = router.execute_query("SELECT * FROM users WHERE id = %(id)s", {'id': 1})
print(result)
router.close()Slide 8: Handling Cross-Shard Queries
Cross-shard queries can be challenging in a sharded database system. Let's implement a function to handle queries that need to aggregate data from multiple shards.
Copydef cross_shard_query(shard_router, query, aggregate_function):
results = shard_router.execute_query(query)
if aggregate_function == 'COUNT':
return len(results)
elif aggregate_function == 'SUM':
return sum(result['value'] for result in results)
elif aggregate_function == 'AVG':
total = sum(result['value'] for result in results)
return total / len(results) if results else 0
else:
return results
# Example usage
router = ShardRouter(shard_configs, 'id')
total_users = cross_shard_query(router, "SELECT COUNT(*) as value FROM users", 'SUM')
print(f"Total users across all shards: {total_users}")
router.close()Slide 9: Maintaining Data Consistency Across Shards
Ensuring data consistency across shards is crucial. Let's implement a simple two-phase commit protocol for cross-shard transactions.
Copyimport mysql.connector
def two_phase_commit(shard_configs, queries):
connections = []
try:
# Phase 1: Prepare
for config in shard_configs:
cnx = mysql.connector.connect(**config)
cnx.start_transaction()
cursor = cnx.cursor()
cursor.execute(queries[shard_configs.index(config)])
connections.append((cnx, cursor))
# Phase 2: Commit
for cnx, _ in connections:
cnx.commit()
print("Transaction committed successfully")
except mysql.connector.Error as err:
# Rollback if any error occurs
for cnx, _ in connections:
cnx.rollback()
print(f"Transaction failed: {err}")
finally:
# Close all connections
for cnx, cursor in connections:
cursor.close()
cnx.close()
# Example usage
queries = [
"UPDATE users SET status = 'active' WHERE id = 1",
"INSERT INTO logs (user_id, action) VALUES (1, 'status_change')",
"UPDATE user_stats SET active_count = active_count + 1"
]
two_phase_commit(shard_configs, queries)Slide 10: Implementing Read Replicas
Read replicas can help distribute the read load and improve performance. Let's implement a function to set up and manage read replicas.
Copyimport mysql.connector
from mysql.connector import errorcode
def setup_read_replica(master_config, replica_config):
try:
# Connect to master
master_cnx = mysql.connector.connect(**master_config)
master_cursor = master_cnx.cursor()
# Get binary log file and position
master_cursor.execute("SHOW MASTER STATUS")
master_status = master_cursor.fetchone()
log_file, log_pos = master_status[0], master_status[1]
# Connect to replica
replica_cnx = mysql.connector.connect(**replica_config)
replica_cursor = replica_cnx.cursor()
# Configure replica
replica_cursor.execute("STOP SLAVE")
replica_cursor.execute(f"CHANGE MASTER TO MASTER_HOST='{master_config['host']}', "
f"MASTER_USER='{master_config['user']}', "
f"MASTER_PASSWORD='{master_config['password']}', "
f"MASTER_LOG_FILE='{log_file}', "
f"MASTER_LOG_POS={log_pos}")
replica_cursor.execute("START SLAVE")
print("Read replica setup complete")
except mysql.connector.Error as err:
print(f"Error: {err}")
finally:
if 'master_cursor' in locals():
master_cursor.close()
if 'master_cnx' in locals():
master_cnx.close()
if 'replica_cursor' in locals():
replica_cursor.close()
if 'replica_cnx' in locals():
replica_cnx.close()
# Example usage
master_config = {
'host': 'master_host',
'user': 'replication_user',
'password': 'replication_password',
'database': 'your_database'
}
replica_config = {
'host': 'replica_host',
'user': 'replica_user',
'password': 'replica_password',
'database': 'your_database'
}
setup_read_replica(master_config, replica_config)Slide 11: Monitoring Shard Performance
Monitoring the performance of each shard is essential for maintaining a healthy sharded database system. Let's create a simple monitoring function.
Copyimport mysql.connector
import time
def monitor_shard_performance(shard_configs, query):
results = []
for config in shard_configs:
try:
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()
start_time = time.time()
cursor.execute(query)
end_time = time.time()
execution_time = end_time - start_time
row_count = cursor.rowcount
results.append({
'shard': config['database'],
'execution_time': execution_time,
'row_count': row_count
})
cursor.close()
cnx.close()
except mysql.connector.Error as err:
print(f"Error monitoring shard {config['database']}: {err}")
return results
# Example usage
query = "SELECT COUNT(*) FROM users"
performance_results = monitor_shard_performance(shard_configs, query)
for result in performance_results:
print(f"Shard: {result['shard']}")
print(f"Execution Time: {result['execution_time']:.4f} seconds")
print(f"Row Count: {result['row_count']}")
print("---")Slide 12: Real-Life Example: E-commerce Product Catalog
Let's consider an e-commerce platform with millions of products. We'll shard the product catalog based on product categories.
CopyimportSlide 12: Real-Life Example: E-commerce Product Catalog
Let's consider an e-commerce platform with millions of products. We'll shard the product catalog based on product categories.
Copyimport mysql.connector
def setup_product_shards(categories, base_config):
shard_configs = []
for category in categories:
shard_config = base_config.copy()
shard_config['database'] = f"products_{category.lower().replace(' ', '_')}"
cnx = mysql.connector.connect(**base_config)
cursor = cnx.cursor()
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {shard_config['database']}")
cursor.execute(f"USE {shard_config['database']}")
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
price DECIMAL(10, 2),
description TEXT
)
""")
cursor.close()
cnx.close()
shard_configs.append(shard_config)
return shard_configs
# Example usage
base_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password'
}
categories = ['Electronics', 'Clothing', 'Home & Garden', 'Books']
product_shards = setup_product_shards(categories, base_config)
print("Product catalog shards created:")
for shard in product_shards:
print(f"- {shard['database']}")Slide 13: Real-Life Example: Social Media Post Storage
Consider a social media platform that needs to store and retrieve user posts efficiently. We'll shard the posts based on user ID.
Copyimport hashlib
class PostShardManager:
def __init__(self, shard_configs):
self.shard_configs = shard_configs
self.num_shards = len(shard_configs)
def get_shard_for_user(self, user_id):
shard_index = int(hashlib.md5(str(user_id).encode()).hexdigest(), 16) % self.num_shards
return self.shard_configs[shard_index]
def store_post(self, user_id, post_content):
shard_config = self.get_shard_for_user(user_id)
cnx = mysql.connector.connect(**shard_config)
cursor = cnx.cursor()
query = "INSERT INTO posts (user_id, content) VALUES (%s, %s)"
cursor.execute(query, (user_id, post_content))
cnx.commit()
cursor.close()
cnx.close()
def get_user_posts(self, user_id):
shard_config = self.get_shard_for_user(user_id)
cnx = mysql.connector.connect(**shard_config)
cursor = cnx.cursor(dictionary=True)
query = "SELECT * FROM posts WHERE user_id = %s ORDER BY created_at DESC"
cursor.execute(query, (user_id,))
posts = cursor.fetchall()
cursor.close()
cnx.close()
return posts
# Example usage
shard_configs = [
{'host': 'shard1.example.com', 'user': 'user', 'password': 'pass', 'database': 'posts_shard_1'},
{'host': 'shard2.example.com', 'user': 'user', 'password': 'pass', 'database': 'posts_shard_2'},
]
post_manager = PostShardManager(shard_configs)
# Store a post
post_manager.store_post(user_id=12345, post_content="Hello, world!")
# Retrieve posts
user_posts = post_manager.get_user_posts(user_id=12345)
print(f"Posts for user 12345: {user_posts}")Slide 14: Handling Shard Rebalancing
As data grows, you may need to rebalance shards. Here's a simple approach to rebalance data across shards.
Copyimport mysql.connector
def rebalance_shards(source_config, target_config, table_name, chunk_size=1000):
source_cnx = mysql.connector.connect(**source_config)
source_cursor = source_cnx.cursor(dictionary=True)
target_cnx = mysql.connector.connect(**target_config)
target_cursor = target_cnx.cursor()
# Get the total number of rows
source_cursor.execute(f"SELECT COUNT(*) as count FROM {table_name}")
total_rows = source_cursor.fetchone()['count']
for offset in range(0, total_rows, chunk_size):
# Fetch a chunk of data from the source shard
source_cursor.execute(f"SELECT * FROM {table_name} LIMIT {offset}, {chunk_size}")
rows = source_cursor.fetchall()
if not rows:
break
# Insert the chunk into the target shard
for row in rows:
columns = ', '.join(row.keys())
placeholders = ', '.join(['%s'] * len(row))
query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
target_cursor.execute(query, tuple(row.values()))
target_cnx.commit()
print(f"Transferred {len(rows)} rows. Progress: {min(offset + chunk_size, total_rows)}/{total_rows}")
source_cursor.close()
source_cnx.close()
target_cursor.close()
target_cnx.close()
print("Rebalancing complete")
# Example usage
source_config = {
'host': 'source_host',
'user': 'user',
'password': 'password',
'database': 'source_shard'
}
target_config = {
'host': 'target_host',
'user': 'user',
'password': 'password',
'database': 'target_shard'
}
rebalance_shards(source_config, target_config, 'users', chunk_size=5000)Slide 15: Additional Resources
For those interested in delving deeper into database sharding and distributed systems, here are some valuable resources:
- "Designing Data-Intensive Applications" by Martin Kleppmann This book provides a comprehensive overview of distributed systems and database architectures, including sharding.
- "Building Scalable Systems" by Martin L. Abbott and Michael T. Fisher Offers practical advice on designing and implementing large-scale distributed systems.
- "Dynamo: Amazon's Highly Available Key-value Store" (DeCandia et al., 2007) ArXiv link: https://arxiv.org/abs/0712.1815 This paper describes Amazon's Dynamo, a pioneering distributed key-value store that influenced many modern NoSQL databases.
- "Spanner: Google's Globally-Distributed Database" (Corbett et al., 2012) ArXiv link: https://arxiv.org/abs/1207.4284 Discusses Google's globally distributed database system, which provides strong consistency across multiple data centers.
These resources will provide you with a deeper understanding of the principles and challenges involved in implementing sharded and distributed database systems.
undefined