Skip to content

Latest commit

 

History

History
329 lines (260 loc) · 11.8 KB

File metadata and controls

329 lines (260 loc) · 11.8 KB

Handling Compressed Data in Python

Slide 1: Understanding Compressed Data Formats in Python

The fundamental challenge with compressed data formats lies in their sequential access nature. Standard compression algorithms like gzip, bzip2, and xz create streams that must be read sequentially, making random access operations computationally expensive and time-consuming for large datasets.

import gzip
import bz2
import lzma
import time

def compare_compression_formats(data, iterations=1000):
    # Compress same data with different formats
    gzip_data = gzip.compress(data)
    bz2_data = bz2.compress(data)
    xz_data = lzma.compress(data)
    
    print(f"Original size: {len(data)} bytes")
    print(f"Gzip size: {len(gzip_data)} bytes")
    print(f"BZ2 size: {len(bz2_data)} bytes")
    print(f"XZ size: {len(xz_data)} bytes")

Slide 2: Basic Random Access Limitations

Standard library compression modules force sequential reading through the compressed stream until reaching the desired position. This limitation becomes particularly problematic when implementing machine learning algorithms that require frequent random access to training data.

def measure_standard_seek_time(compressed_file, positions):
    times = []
    with bz2.BZ2File(compressed_file, 'rb') as f:
        for pos in positions:
            start_time = time.time()
            f.seek(pos)
            data = f.read(100)  # Read small chunk
            times.append(time.time() - start_time)
    return sum(times) / len(times)

Slide 3: Introduction to indexed_bzip2

The indexed_bzip2 library revolutionizes random access in bzip2 compressed files by maintaining an index of block positions. This enables direct seeking to specific positions without sequential decompression, significantly improving performance for random access operations.

import indexed_bzip2
import numpy as np

def create_indexed_bz2_reader(filename):
    return indexed_bzip2.open(filename)

Slide 4: Building and Saving Index

Creating an index is a one-time operation that maps compressed block positions to uncompressed positions. This index can be serialized and saved for future use, eliminating the need to rebuild it on subsequent accesses.

def build_and_save_index(bz2_file):
    reader = indexed_bzip2.open(bz2_file)
    # Force index creation by seeking to end
    reader.seek(0, 2)
    
    # Save index for future use
    with open(f"{bz2_file}.idx", "wb") as f:
        reader.save_index(f)
    return reader

Slide 5: Implementing Efficient Random Access

Efficient random access implementation using indexed_bzip2 enables fast seeking within compressed files. This implementation demonstrates significant performance improvements compared to standard library approaches.

def random_access_benchmark(filename, n_seeks=1000):
    # Initialize readers
    standard_reader = bz2.BZ2File(filename, 'rb')
    indexed_reader = indexed_bzip2.open(filename)
    
    # Generate random positions
    file_size = indexed_reader.seek(0, 2)
    positions = np.random.randint(0, file_size, n_seeks)
    
    results = {'standard': [], 'indexed': []}
    
    # Benchmark standard access
    for pos in positions:
        start = time.time()
        standard_reader.seek(pos)
        standard_reader.read(100)
        results['standard'].append(time.time() - start)
    
    # Benchmark indexed access
    for pos in positions:
        start = time.time()
        indexed_reader.seek(pos)
        indexed_reader.read(100)
        results['indexed'].append(time.time() - start)
    
    return results

Slide 6: Working with XZ Compression

The python-xz library provides similar random access capabilities for XZ compressed files. While less popular than bzip2, XZ compression often achieves better compression ratios for certain types of data.

import xz

def xz_random_access_example(filename):
    with xz.open(filename) as f:
        # Get file size
        f.seek(0, 2)
        file_size = f.tell()
        
        # Perform random seeks
        positions = np.random.randint(0, file_size, 10)
        for pos in positions:
            f.seek(pos)
            data = f.read(100)
            print(f"Successfully read at position {pos}")

Slide 7: Data Streaming with Compressed Files

Implementation of a data streaming system that efficiently handles compressed files, particularly useful for machine learning applications where data needs to be fed in batches during training while maintaining memory efficiency.

class CompressedDataStream:
    def __init__(self, filename, batch_size=1024, compression='bz2'):
        self.filename = filename
        self.batch_size = batch_size
        self.compression = compression
        self.reader = self._get_reader()
        self.file_size = self._get_file_size()
        
    def _get_reader(self):
        if self.compression == 'bz2':
            return indexed_bzip2.open(self.filename)
        elif self.compression == 'xz':
            return xz.open(self.filename)
            
    def _get_file_size(self):
        self.reader.seek(0, 2)
        size = self.reader.tell()
        self.reader.seek(0)
        return size
        
    def get_batch(self):
        data = self.reader.read(self.batch_size)
        if not data:
            self.reader.seek(0)
            data = self.reader.read(self.batch_size)
        return data

Slide 8: Performance Optimization Techniques

Advanced techniques for optimizing compressed data access, including implementing a caching mechanism to store frequently accessed data segments and parallel processing capabilities for improved throughput.

from functools import lru_cache
import multiprocessing as mp

class OptimizedCompressedReader:
    def __init__(self, filename, cache_size=128):
        self.reader = indexed_bzip2.open(filename)
        self.cache_size = cache_size
        self.pool = mp.Pool(processes=mp.cpu_count())
        
    @lru_cache(maxsize=128)
    def cached_read(self, position, size):
        self.reader.seek(position)
        return self.reader.read(size)
        
    def parallel_read(self, positions, chunk_size):
        read_args = [(pos, chunk_size) for pos in positions]
        return self.pool.starmap(self.cached_read, read_args)

Slide 9: Real-world Example: Machine Learning Dataset Handler

Implementation of a practical dataset handler for machine learning applications, demonstrating how to efficiently manage large compressed datasets during training while maintaining random access capabilities.

import numpy as np
from sklearn.preprocessing import StandardScaler

class MLCompressedDataHandler:
    def __init__(self, compressed_file, batch_size=32, feature_dim=10):
        self.reader = indexed_bzip2.open(compressed_file)
        self.batch_size = batch_size
        self.feature_dim = feature_dim
        self.scaler = StandardScaler()
        self._build_index()
        
    def _build_index(self):
        self.data_points = []
        while True:
            pos = self.reader.tell()
            data = self.reader.readline()
            if not data:
                break
            self.data_points.append(pos)
            
    def get_random_batch(self):
        indices = np.random.choice(len(self.data_points), 
                                 size=self.batch_size)
        batch_data = []
        for idx in indices:
            self.reader.seek(self.data_points[idx])
            data = self.reader.readline()
            batch_data.append(np.fromstring(data, sep=','))
        return np.array(batch_data)

Slide 10: Results Analysis for Random Access Performance

A comprehensive comparison of random access performance between standard compression libraries and indexed alternatives, featuring benchmark results and statistical analysis.

def performance_analysis():
    # Test data preparation
    data_size = 1_000_000
    test_data = b"x" * data_size
    compressed_file = "test.bz2"
    with bz2.open(compressed_file, "wb") as f:
        f.write(test_data)
    
    # Benchmark results
    standard_times = []
    indexed_times = []
    
    # Standard bz2
    with bz2.BZ2File(compressed_file, 'rb') as f:
        for _ in range(10):
            start = time.time()
            f.seek(data_size // 2)
            standard_times.append(time.time() - start)
    
    # indexed_bzip2
    with indexed_bzip2.open(compressed_file) as f:
        for _ in range(10):
            start = time.time()
            f.seek(data_size // 2)
            indexed_times.append(time.time() - start)
    
    return {
        "standard_mean": np.mean(standard_times),
        "indexed_mean": np.mean(indexed_times),
        "speedup_factor": np.mean(standard_times) / np.mean(indexed_times)
    }

Slide 11: Memory Management for Large Compressed Files

Advanced memory management techniques when working with large compressed files, implementing a sliding window approach to maintain optimal memory usage while processing large datasets sequentially or randomly.

class MemoryEfficientReader:
    def __init__(self, filename, window_size_mb=100):
        self.filename = filename
        self.window_size = window_size_mb * 1024 * 1024
        self.reader = indexed_bzip2.open(filename)
        self.window_cache = {}
        
    def read_window(self, start_pos):
        window_key = start_pos // self.window_size
        if window_key not in self.window_cache:
            # Clear cache if too large
            if len(self.window_cache) > 3:
                self.window_cache.clear()
            
            self.reader.seek(start_pos)
            self.window_cache[window_key] = self.reader.read(self.window_size)
            
        window_offset = start_pos % self.window_size
        return self.window_cache[window_key][window_offset:]

Slide 12: Real-world Example: Time Series Data Processing

Implementation of a time series data processor that efficiently handles compressed data files while maintaining temporal relationships and enabling random access to specific time periods.

class TimeSeriesCompressedProcessor:
    def __init__(self, compressed_file):
        self.reader = indexed_bzip2.open(compressed_file)
        self.timestamp_index = {}
        self._build_timestamp_index()
        
    def _build_timestamp_index(self):
        current_pos = 0
        while True:
            pos = self.reader.tell()
            line = self.reader.readline()
            if not line:
                break
            timestamp = float(line.split(b',')[0])
            self.timestamp_index[timestamp] = pos
            
    def get_time_range(self, start_time, end_time):
        # Binary search for closest timestamps
        timestamps = sorted(self.timestamp_index.keys())
        start_idx = np.searchsorted(timestamps, start_time)
        end_idx = np.searchsorted(timestamps, end_time)
        
        data = []
        for timestamp in timestamps[start_idx:end_idx]:
            self.reader.seek(self.timestamp_index[timestamp])
            data.append(self.reader.readline())
            
        return data

Slide 13: Additional Resources