Skip to content

Latest commit

 

History

History
214 lines (183 loc) · 9.98 KB

File metadata and controls

214 lines (183 loc) · 9.98 KB

Rust Implementation: Financial Data Processing with Parallel Computing

Overview

This Rust implementation demonstrates high-performance financial data processing using parallel computing techniques. It calculates 100+ quantitative features for each row in a large financial dataset (3.9M+ records).

Code Breakdown

1. Imports and Dependencies

use csv::Reader;           // Efficient CSV parsing library
use rayon::prelude::*;     // Parallel computing library for data parallelism
use std::time::Instant;    // Time measurement utilities
use std::sync::Arc;        // Atomic Reference Counting for thread-safe sharing
use crossbeam::thread;     // Alternative threading library (not used in final version)
use num_cpus::get;         // Get number of CPU cores for optimal parallelization

2. Data Structure Definition

#[derive(Debug, Clone)]    // Derive macros for debugging and cloning
struct Candlestick {
    timestamp: String,     // Time of the candlestick bar
    open: f64,            // Opening price
    high: f64,            // Highest price during the period
    low: f64,             // Lowest price during the period
    close: f64,           // Closing price
    volume: f64,          // Trading volume
}
  • #[derive(Debug, Clone)]: Macros that automatically implement the Debug and Clone traits
  • Debug allows printing the struct for debugging purposes
  • Clone allows creating copies of the struct, necessary for parallel processing

3. CSV Reading Function

fn read_csv(filename: &str) -> Result<Vec<Candlestick>, Box<dyn std::error::Error>> {
    let mut reader = Reader::from_path(filename)?;  // Create CSV reader
    let mut data = Vec::new();                      // Initialize empty vector
    
    for result in reader.records() {                // Iterate through CSV records
        let record = result?;                       // Handle potential parsing errors
        if record.len() >= 6 {                      // Ensure record has minimum columns
            let candle = Candlestick {              // Create Candlestick struct
                timestamp: record[0].to_string(),   // Parse timestamp
                open: record[1].parse::<f64>()?,    // Parse open price as f64
                high: record[2].parse::<f64>()?,    // Parse high price as f64
                low: record[3].parse::<f64>()?,     // Parse low price as f64
                close: record[4].parse::<f64>()?,   // Parse close price as f64
                volume: record[5].parse::<f64>()?,  // Parse volume as f64
            };
            data.push(candle);                      // Add to data vector
        }
    }
    Ok(data)                                        // Return successful result
}
  • Uses the csv crate for efficient CSV parsing
  • Returns a Result type to handle potential errors gracefully
  • The ? operator propagates errors automatically
  • Memory is allocated efficiently with Vec::new() and push()

4. Parallel Feature Calculation Function

fn calculate_quantitative_features_parallel(data: &[Candlestick]) -> Vec<Vec<f64>> {
    let data = Arc::new(data.to_vec());             // Wrap in Arc for thread-safe sharing
    let chunk_size = data.len() / num_cpus::get();  // Calculate optimal chunk size
    let chunks: Vec<_> = data.chunks(chunk_size).collect();  // Split data into chunks
    
    // Process each chunk in parallel using rayon's par_iter()
    let results: Vec<Vec<Vec<f64>>> = chunks.into_par_iter().map(|chunk| {
        let mut features_chunk = Vec::with_capacity(chunk.len());  // Pre-allocate memory
        
        for (local_idx, candle) in chunk.iter().enumerate() {
            let mut features = vec![0.0; 101];  // Initialize 101 features for this row
            
            // Basic price features (0-4)
            features[0] = candle.close;           // Close price
            features[1] = candle.open;            // Open price
            features[2] = candle.high;            // High price
            features[3] = candle.low;             // Low price
            features[4] = candle.volume;          // Volume
  • Arc::new(): Wraps data in Atomic Reference Counting for safe sharing across threads
  • num_cpus::get(): Gets the number of CPU cores to optimize parallelization
  • chunks(): Divides the data into chunks for parallel processing
  • into_par_iter(): Converts to parallel iterator using rayon
  • Vec::with_capacity(): Pre-allocates memory to avoid repeated allocations
  • Each thread processes a chunk independently, avoiding race conditions

5. Mathematical Feature Calculations

            // Return calculations (5-7)
            features[5] = if candle.open != 0.0 { (candle.close - candle.open) / candle.open } else { 0.0 }; // Return
            features[6] = if candle.open != 0.0 { (candle.high - candle.low) / candle.open } else { 0.0 };   // True range
            features[7] = if candle.high != candle.low {
                (candle.close - candle.low) / (candle.high - candle.low)
            } else { 0.5 }; // Stochastic
  • Conditional checks prevent division by zero errors
  • These are common financial indicators:
    • Return: percentage change from open to close
    • True range: measure of volatility
    • Stochastic: momentum indicator showing closing price relative to high-low range

6. Moving Average Calculations

            // Simple moving averages and volatilities for different periods (8-22)
            if actual_global_idx >= 5 {
                let start_idx = if actual_global_idx >= 5 { actual_global_idx - 5 } else { 0 };
                let recent_prices: Vec<f64> = data[start_idx..=actual_global_idx].iter().map(|c| c.close).collect();
                features[8] = recent_prices.iter().sum::<f64>() / recent_prices.len() as f64; // 5-period SMA
                let mean = features[8];
                let variance = recent_prices.iter().map(|p| (p - mean).powi(2)).sum::<f64>() / recent_prices.len() as f64;
                features[9] = variance.sqrt(); // 5-period volatility
  • Sliding window technique for efficient moving average calculation
  • Variance calculation for volatility measurement
  • Bounds checking prevents index out of bounds errors

7. Parallel Moving Average Calculation

fn calculate_multiple_moving_averages_parallel(data: &[f64], periods: &[usize]) -> std::collections::HashMap<String, Vec<f64>> {
    periods
        .par_iter()  // Parallel iteration over periods
        .map(|&period| {  // Calculate each period in parallel
            let ma_values = calculate_moving_average_serial(data, period);
            (format!("MA_{}", period), ma_values)  // Format key and return values
        })
        .collect()  // Collect into HashMap
}
  • par_iter(): Parallel iteration over the periods array
  • Each moving average period is calculated independently in parallel
  • format!: Creates unique keys for each moving average period
  • collect(): Aggregates results into a HashMap

8. Main Function Structure

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let start_time = Instant::now();  // Start timing
    
    println!("Reading CSV file...");
    let data = read_csv("USDJPY2.csv")?;  // Load data with error handling
    println!("Loaded {} records.", data.len());
    
    // Calculate features in parallel
    println!("Calculating 100+ quantitative features for each row using parallel processing...");
    let features = calculate_quantitative_features_parallel(&data);
    
    // Extract close prices for moving averages
    let closes: Vec<f64> = data.iter().map(|c| c.close).collect();
    
    // Calculate multiple moving averages in parallel
    println!("Calculating moving averages for periods 200-220 using parallel processing...");
    let ma_periods: Vec<usize> = (200..=220).collect(); // 200 to 220 inclusive
    let all_mas = calculate_multiple_moving_averages_parallel(&closes, &ma_periods);
    
    let duration = start_time.elapsed();  // End timing
    println!("Total execution time: {:?}", duration);
    
    Ok(())  // Return success
}
  • Uses Result type for proper error handling
  • Timing measurements to evaluate performance
  • Parallel processing for both feature calculation and moving averages
  • Memory-efficient data extraction and processing

Key Rust Concepts Demonstrated

1. Memory Safety Without Garbage Collection

  • Ownership system prevents memory leaks and data races
  • Borrowing rules ensure safe access to data across threads
  • No null pointer exceptions or buffer overflows

2. Zero-Cost Abstractions

  • High-level constructs compile to efficient machine code
  • Iterator chains are optimized by the compiler
  • Generic functions are monomorphized for performance

3. Fearless Concurrency

  • Thread safety guaranteed by the type system
  • Race conditions prevented at compile time
  • Easy parallelization with rayon crate

4. Performance Optimizations

  • Pre-allocation of memory with Vec::with_capacity()
  • Efficient iteration patterns
  • Minimal heap allocations
  • Cache-friendly data access patterns

Why This Approach Was Taken

1. Parallel Processing Strategy

  • Financial data processing is "embarrassingly parallel" - each row can be processed independently
  • Using rayon's par_iter() allows automatic load balancing across CPU cores
  • Chunking data reduces thread synchronization overhead

2. Memory Layout Considerations

  • Processing data in chunks improves cache locality
  • Pre-allocating vectors avoids repeated memory allocations
  • Using f64 for financial data ensures precision

3. Error Handling

  • Using Result type for comprehensive error handling
  • The ? operator simplifies error propagation
  • Prevents crashes from malformed data

4. Performance vs. Safety Balance

  • Rust allows both memory safety and high performance
  • Zero-cost abstractions mean no runtime overhead
  • Compile-time guarantees eliminate many runtime checks

This implementation demonstrates Rust's strengths in systems programming: memory safety, performance, and ease of parallelization, making it ideal for high-frequency financial computations.