Skip to content

Latest commit

 

History

History
575 lines (447 loc) · 18.8 KB

File metadata and controls

575 lines (447 loc) · 18.8 KB

Efficient Fuzzy Duplicate Detection in Python

Slide 1: Understanding Fuzzy Duplicates

The concept of fuzzy duplicates extends beyond exact matches, encompassing records that are semantically identical but contain variations due to typos, formatting differences, or missing data. This fundamental understanding is crucial for developing efficient deduplication strategies.

# Example of fuzzy duplicates in a DataFrame
import pandas as pd

data = {
    'first_name': ['Daniel', 'Daniel', 'John'],
    'last_name': ['Lopez', None, 'Smith'],
    'address': ['719 Greene St.', '719 Greene Street', '123 Main St'],
    'number': ['1234567890', '1234-567-890', '9876543210']
}

df = pd.DataFrame(data)
print("Standard deduplication result:")
print(df.drop_duplicates().shape)  # Won't detect the fuzzy duplicates

Slide 2: Computational Complexity Analysis

Understanding the computational complexity helps grasp why naive approaches fail at scale. For n records, comparing each pair results in (n2)=n(n−1)2\binom{n}{2} = \frac{n(n-1)}{2}(2n​)=2n(n−1)​ comparisons, making it computationally infeasible for large datasets.

def calculate_naive_runtime(n_records, comparisons_per_second=10000):
    total_comparisons = (n_records * (n_records - 1)) / 2
    seconds = total_comparisons / comparisons_per_second
    years = seconds / (365 * 24 * 60 * 60)
    return total_comparisons, years

records = 1_000_000
comparisons, years = calculate_naive_runtime(records)
print(f"Total comparisons: {comparisons:,.0f}")
print(f"Estimated years: {years:.2f}")

Slide 3: Implementing Smart Bucketing

Bucketing strategy reduces comparisons by grouping potentially similar records together based on specific rules, dramatically improving computational efficiency while maintaining high accuracy in duplicate detection.

import numpy as np
from collections import defaultdict

def create_name_buckets(df):
    buckets = defaultdict(list)
    for idx, row in df.iterrows():
        if pd.notna(row['first_name']):
            # Create bucket key from first 3 letters
            key = row['first_name'][:3].lower()
            buckets[key].append(idx)
    return buckets

name_buckets = create_name_buckets(df)
print("Sample buckets:", dict(list(name_buckets.items())[:2]))

Slide 4: Token-based Address Bucketing

Address comparison requires a more sophisticated approach using token overlap. This implementation splits addresses into tokens and creates buckets based on common word occurrences, effectively grouping similar addresses together.

def create_address_buckets(df):
    buckets = defaultdict(list)
    
    for idx, row in df.iterrows():
        if pd.notna(row['address']):
            # Tokenize and clean address
            tokens = set(row['address'].lower().replace('.', ' ').split())
            
            # Create keys from token pairs
            for t1 in tokens:
                for t2 in tokens:
                    if t1 < t2:  # Avoid duplicate pairs
                        buckets[f"{t1}_{t2}"].append(idx)
                        
    return buckets

address_buckets = create_address_buckets(df)

Slide 5: String Similarity Metrics

Efficient string comparison requires appropriate similarity metrics. This implementation showcases common similarity measures including Levenshtein distance and Jaccard similarity for comparing text fields.

from difflib import SequenceMatcher

def calculate_similarity_metrics(str1, str2):
    if pd.isna(str1) or pd.isna(str2):
        return 0.0
        
    # Convert to lowercase and strip whitespace
    str1 = str1.lower().strip()
    str2 = str2.lower().strip()
    
    # Calculate similarity ratio
    return SequenceMatcher(None, str1, str2).ratio()

# Example usage
similarity = calculate_similarity_metrics(
    "719 Greene St.", 
    "719 Greene Street"
)
print(f"Similarity score: {similarity:.2f}")

Slide 6: Implementing Fuzzy Record Comparison

A robust record comparison function must handle multiple fields with different comparison strategies. This implementation combines field-specific similarity metrics and weights to produce an overall similarity score.

def compare_records(record1, record2, weights=None):
    if weights is None:
        weights = {
            'first_name': 0.3,
            'last_name': 0.3,
            'address': 0.25,
            'number': 0.15
        }
    
    similarities = {}
    for field in weights:
        similarities[field] = calculate_similarity_metrics(
            str(record1[field]),
            str(record2[field])
        )
    
    # Calculate weighted similarity
    total_similarity = sum(
        similarities[field] * weight 
        for field, weight in weights.items()
    )
    
    return total_similarity, similarities

# Example usage
record1 = df.iloc[0]
record2 = df.iloc[1]
total_sim, field_sims = compare_records(record1, record2)
print(f"Overall similarity: {total_sim:.2f}")
print("Field similarities:", field_sims)

Slide 7: Optimized Bucket Processing

The bucket processing implementation needs to be efficient to handle large datasets. This implementation uses parallel processing and efficient data structures to compare records within buckets.

from concurrent.futures import ProcessPoolExecutor
import itertools

def process_bucket(bucket_indices, df, similarity_threshold=0.85):
    potential_duplicates = []
    
    # Compare all pairs in the bucket
    for idx1, idx2 in itertools.combinations(bucket_indices, 2):
        record1 = df.iloc[idx1]
        record2 = df.iloc[idx2]
        
        similarity, _ = compare_records(record1, record2)
        
        if similarity >= similarity_threshold:
            potential_duplicates.append((idx1, idx2, similarity))
            
    return potential_duplicates

def parallel_bucket_processing(buckets, df, n_workers=4):
    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        futures = []
        for bucket_indices in buckets.values():
            if len(bucket_indices) > 1:  # Only process buckets with multiple records
                futures.append(
                    executor.submit(process_bucket, bucket_indices, df)
                )
                
        all_duplicates = []
        for future in futures:
            all_duplicates.extend(future.result())
            
    return all_duplicates

Slide 8: Phone Number Normalization

Standardizing phone numbers is crucial for accurate comparison. This implementation handles various phone number formats and creates a normalized representation for comparison.

import re

def normalize_phone_number(phone):
    if pd.isna(phone):
        return None
        
    # Remove all non-digit characters
    digits = re.sub(r'\D', '', str(phone))
    
    # Check if we have a valid number of digits
    if len(digits) == 10:
        return digits
    elif len(digits) == 11 and digits.startswith('1'):
        return digits[1:]
    else:
        return None

def compare_phone_numbers(phone1, phone2):
    norm1 = normalize_phone_number(phone1)
    norm2 = normalize_phone_number(phone2)
    
    if norm1 is None or norm2 is None:
        return 0.0
    
    return 1.0 if norm1 == norm2 else 0.0

# Example
phones = ['1234567890', '123-456-7890', '(123) 456-7890']
normalized = [normalize_phone_number(p) for p in phones]
print("Normalized numbers:", normalized)

Slide 9: Address Standardization

Address standardization is essential for accurate matching. This implementation handles common variations in address formats and creates a standardized representation for comparison.

import usaddress

def standardize_address(address):
    if pd.isna(address):
        return None
        
    try:
        # Parse address using usaddress library
        parsed = usaddress.tag(address)[0]
        
        # Create standardized components
        components = {
            'number': parsed.get('AddressNumber', ''),
            'street': parsed.get('StreetName', ''),
            'suffix': parsed.get('StreetNamePostType', ''),
            'unit': parsed.get('OccupancyIdentifier', '')
        }
        
        # Combine into standardized format
        return ' '.join(filter(None, [
            components['number'],
            components['street'],
            components['suffix'],
            components['unit']
        ])).lower()
        
    except Exception:
        return address.lower()

# Example usage
addresses = [
    "719 Greene St.",
    "719 Greene Street",
    "719 GREENE STREET"
]
standardized = [standardize_address(addr) for addr in addresses]
print("Standardized addresses:", standardized)

Slide 10: Efficient Data Structures for Large-Scale Processing

Implementation of specialized data structures for handling large datasets efficiently. Using memory-mapped files and chunked processing allows handling datasets that exceed available RAM.

import numpy as np
import pandas as pd
from pathlib import Path

class LargeScaleDeduplicator:
    def __init__(self, chunk_size=10000):
        self.chunk_size = chunk_size
        self.temp_dir = Path('temp_dedup')
        self.temp_dir.mkdir(exist_ok=True)
        
    def process_large_file(self, filepath):
        # Create memory-mapped array for similarity matrix
        reader = pd.read_csv(filepath, chunksize=self.chunk_size)
        total_chunks = sum(1 for _ in pd.read_csv(filepath, chunksize=self.chunk_size))
        
        similarity_matrix = np.memmap(
            self.temp_dir / 'similarity_matrix.npy',
            dtype='float32',
            mode='w+',
            shape=(total_chunks, total_chunks)
        )
        
        # Process chunks
        for i, chunk1 in enumerate(reader):
            for j, chunk2 in enumerate(reader):
                if j >= i:  # Process upper triangle only
                    sim = self._compute_chunk_similarity(chunk1, chunk2)
                    similarity_matrix[i, j] = sim
                    
        return similarity_matrix
        
    def _compute_chunk_similarity(self, chunk1, chunk2):
        return np.mean([
            compare_records(r1, r2)[0]
            for _, r1 in chunk1.iterrows()
            for _, r2 in chunk2.iterrows()
        ])

# Example usage
deduplicator = LargeScaleDeduplicator()

Slide 11: Machine Learning-Based Duplicate Detection

Implementing a supervised learning approach for duplicate detection using feature engineering and gradient boosting for improved accuracy in complex scenarios.

import lightgbm as lgb
from sklearn.model_selection import train_test_split

class MLDuplicateDetector:
    def __init__(self):
        self.model = lgb.LGBMClassifier(
            n_estimators=100,
            num_leaves=31,
            learning_rate=0.05
        )
        
    def create_features(self, record1, record2):
        # Generate features for record pair
        total_sim, field_sims = compare_records(record1, record2)
        
        features = {
            'total_similarity': total_sim,
            **field_sims,
            'name_length_diff': abs(
                len(str(record1['first_name'])) - 
                len(str(record2['first_name']))
            ),
            'address_length_diff': abs(
                len(str(record1['address'])) - 
                len(str(record2['address']))
            )
        }
        return features
        
    def train(self, training_pairs, labels):
        # Convert pairs to feature matrix
        X = pd.DataFrame([
            self.create_features(r1, r2)
            for r1, r2 in training_pairs
        ])
        
        # Split and train
        X_train, X_val, y_train, y_val = train_test_split(
            X, labels, test_size=0.2
        )
        
        self.model.fit(
            X_train, y_train,
            eval_set=[(X_val, y_val)],
            early_stopping_rounds=10
        )
        
    def predict(self, record1, record2):
        features = self.create_features(record1, record2)
        return self.model.predict_proba(
            pd.DataFrame([features])
        )[0, 1]

Slide 12: Results Analysis and Visualization

Implementation of comprehensive results analysis and visualization tools to evaluate the effectiveness of the deduplication process and identify potential improvements.

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import precision_recall_curve

def analyze_results(true_duplicates, predicted_duplicates, df):
    results = {
        'precision': [],
        'recall': [],
        'thresholds': np.linspace(0, 1, 100)
    }
    
    for threshold in results['thresholds']:
        # Filter predictions by threshold
        filtered_preds = {
            (i, j) for i, j, score in predicted_duplicates
            if score >= threshold
        }
        
        # Calculate metrics
        true_positives = len(
            filtered_preds.intersection(true_duplicates)
        )
        precision = true_positives / len(filtered_preds)
        recall = true_positives / len(true_duplicates)
        
        results['precision'].append(precision)
        results['recall'].append(recall)
    
    # Plot results
    plt.figure(figsize=(10, 6))
    plt.plot(results['recall'], results['precision'])
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve for Duplicate Detection')
    plt.grid(True)
    return results

Slide 13: Production-Ready Implementation

Complete implementation of the fuzzy deduplication system with proper error handling, logging, and performance monitoring for production deployment.

import logging
from typing import Dict, List, Tuple
import time
from dataclasses import dataclass

@dataclass
class DeduplicationConfig:
    similarity_threshold: float = 0.85
    chunk_size: int = 10000
    n_workers: int = 4
    bucket_size_limit: int = 1000

class ProductionDuplicateDetector:
    def __init__(self, config: DeduplicationConfig):
        self.config = config
        self.logger = self._setup_logging()
        self.stats = {
            'processed_records': 0,
            'found_duplicates': 0,
            'processing_time': 0
        }
        
    def _setup_logging(self):
        logger = logging.getLogger('deduplication')
        logger.setLevel(logging.INFO)
        handler = logging.FileHandler('dedup.log')
        handler.setFormatter(
            logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        )
        logger.addHandler(handler)
        return logger
        
    def process_dataset(self, df: pd.DataFrame) -> List[Tuple[int, int, float]]:
        start_time = time.time()
        self.logger.info(f"Starting deduplication for {len(df)} records")
        
        try:
            # Create buckets
            buckets = self._create_smart_buckets(df)
            self.logger.info(f"Created {len(buckets)} buckets")
            
            # Process buckets in parallel
            duplicates = parallel_bucket_processing(
                buckets, df, self.config.n_workers
            )
            
            # Update stats
            self.stats['processed_records'] = len(df)
            self.stats['found_duplicates'] = len(duplicates)
            self.stats['processing_time'] = time.time() - start_time
            
            self.logger.info(
                f"Found {len(duplicates)} duplicate pairs in "
                f"{self.stats['processing_time']:.2f} seconds"
            )
            
            return duplicates
            
        except Exception as e:
            self.logger.error(f"Error during deduplication: {str(e)}")
            raise
            
    def _create_smart_buckets(self, df: pd.DataFrame) -> Dict[str, List[int]]:
        buckets = defaultdict(list)
        
        for idx, row in df.iterrows():
            bucket_key = self._generate_bucket_key(row)
            buckets[bucket_key].append(idx)
            
        # Filter oversized buckets
        filtered_buckets = {
            k: v for k, v in buckets.items()
            if len(v) <= self.config.bucket_size_limit
        }
        
        return filtered_buckets
        
    def get_performance_metrics(self) -> Dict[str, float]:
        return {
            'records_per_second': (
                self.stats['processed_records'] / 
                self.stats['processing_time']
            ),
            'duplicate_rate': (
                self.stats['found_duplicates'] / 
                self.stats['processed_records']
            ),
            'total_time': self.stats['processing_time']
        }

# Example usage
config = DeduplicationConfig()
detector = ProductionDuplicateDetector(config)
duplicates = detector.process_dataset(df)
metrics = detector.get_performance_metrics()
print("Performance metrics:", metrics)

Slide 14: Real-World Application Case Study

Implementation of a complete deduplication pipeline for a real customer database with 1.5 million records, showcasing the practical application of the optimized approach.

class CustomerDatabaseDeduplication:
    def __init__(self):
        self.preprocessor = DataPreprocessor()
        self.detector = ProductionDuplicateDetector(DeduplicationConfig())
        self.validator = DuplicateValidator()
        
    def preprocess_customer_data(self, df: pd.DataFrame) -> pd.DataFrame:
        return self.preprocessor.process(df)
        
    def find_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
        # Track memory usage
        initial_memory = df.memory_usage().sum() / 1024**2
        
        # Preprocess data
        clean_df = self.preprocess_customer_data(df)
        
        # Find potential duplicates
        duplicates = self.detector.process_dataset(clean_df)
        
        # Validate and format results
        validated_duplicates = self.validator.validate(
            duplicates, clean_df
        )
        
        # Create results DataFrame
        results = pd.DataFrame(validated_duplicates)
        
        # Log memory usage
        final_memory = results.memory_usage().sum() / 1024**2
        print(f"Memory usage: {initial_memory:.2f}MB -> {final_memory:.2f}MB")
        
        return results

# Example with timing
import time

start_time = time.time()
deduplicator = CustomerDatabaseDeduplication()
results = deduplicator.find_duplicates(large_customer_df)
processing_time = time.time() - start_time

print(f"Processed {len(large_customer_df)} records in {processing_time:.2f} seconds")
print(f"Found {len(results)} duplicate pairs")

Slide 15: Additional Resources