Skip to content

xlisp/visualization-machine-learning

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

350 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Visualization Python Machine Learning, Deep Learning, Reinforcement Learning

First principle

  • Find the same probability distribution in a large amount of data and make predictions based on the same probability distribution: y = f(x)
  • Just like learning a function relationship, the inverse function or reverse engineering function requires DL. You just know that the data has a certain pattern and then guess what the original function that generated the data is. For example, you train to get a calculator neural network.
  • The idea of high-dimensional space: the code is cut into high-dimensional space, and then a very detailed high-dimensional classification is done to separate it. Then the search is also high-dimensional, just like the code, it is entered into the treesitter to do training to obtain logical learning relationships. Most of NLP is a multi-classification problem in high-dimensional space.
  • Collect the input x and output y around you as training data, and mine their mapping relationship f(x) at any time. You can use GPT to generate certain data for your model training needs or write crawler to get you need data.

init env

conda create -n emacspy python=3.11
conda activate emacspy
poetry install

least squares method

import numpy as np
import matplotlib.pyplot as plt

# Example data points
X = np.array([1, 2.2, 3, 4, 5])
y = np.array([2, 4, 6.3, 8, 11])

# Add a column of ones to X for the intercept term (bias)
X_b = np.c_[np.ones((X.shape[0], 1)), X]  # X_b is X with a bias column

# Calculate the best fit line parameters using the Normal Equation
theta_best = np.linalg.inv(X_b.T.dot(X_b)).dot(X_b.T).dot(y)

# Print the parameters (intercept and slope)
print(f"Intercept: {theta_best[0]}")
print(f"Slope: {theta_best[1]}")

# Predict values using the model
y_pred = X_b.dot(theta_best)

# Plot the data points and the best fit line
plt.scatter(X, y, color='blue', label='Data points')
plt.plot(X, y_pred, color='red', label='Best fit line')
plt.xlabel('X')
plt.ylabel('y')
plt.legend()
plt.show()

least squares method by neural network

import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

# graph show the  pytorch torch.optim.Adam and plot it How it works

# Define a simple linear model
class LinearModel(nn.Module):
    def __init__(self):
        super(LinearModel, self).__init__()
        self.linear = nn.Linear(1, 1)

    def forward(self, x):
        return self.linear(x)

# Initialize the model, loss function, and optimizer
model = LinearModel()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Generate some synthetic data (y = 2x + 1 with some noise)
x_train = torch.linspace(-1, 1, 100).reshape(-1, 1)
y_train = 2 * x_train + 1 + 0.2 * torch.randn(x_train.size())

# List to store the loss values
loss_values = []

# Training loop
for epoch in range(1000):
    model.train()
    optimizer.zero_grad()
    outputs = model(x_train)
    loss = criterion(outputs, y_train)
    loss.backward()
    optimizer.step()
    loss_values.append(loss.item())

nonlinear fitting

import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt

# Step 1: Generate a 100-length random sequence
n = 100
x = torch.linspace(1, 10, n).unsqueeze(1)
y = torch.sin(x) + torch.rand(n, 1) * 0.5

# Step 2: Define a simple neural network model for nonlinear fitting
class NonlinearModel(nn.Module):
    def __init__(self):
        super(NonlinearModel, self).__init__()
        self.fc1 = nn.Linear(1, 10)
        self.fc2 = nn.Linear(10, 10)
        self.fc3 = nn.Linear(10, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

model = NonlinearModel()

# Step 3: Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# Step 4: Train the model
epochs = 1000
for epoch in range(epochs):
    model.train()

    # Forward pass
    outputs = model(x)
    loss = criterion(outputs, y)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (epoch+1) % 100 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

# Step 5: Plot the original data and the fitted curve
model.eval()
with torch.no_grad():
    predicted = model(x).numpy()

plt.figure(figsize=(10, 5))
plt.plot(x.numpy(), y.numpy(), 'ro', label='Original data')
plt.plot(x.numpy(), predicted, 'b-', label='Fitted curve')
plt.legend()
plt.show()

polar coordinate classification

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

# Helper function to convert Cartesian to Polar coordinates
def cartesian_to_polar(x, y, z):
    r = torch.sqrt(x**2 + y**2 + z**2)
    theta = torch.atan2(y, x)
    phi = torch.acos(z / r)
    return r, theta, phi

# Example data generation (replace with your actual data)
n_samples = 5000
x = torch.randn(n_samples)
y = torch.randn(n_samples)
z = torch.randn(n_samples)
labels = torch.randint(0, 4, (n_samples,))  # Four classes (0, 1, 2, 3)

# Convert to polar coordinates
r, theta, phi = cartesian_to_polar(x, y, z)

# Combine into a single tensor
data = torch.stack((r, theta, phi), dim=1)

# Create a Dataset and DataLoader
dataset = TensorDataset(data, labels)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

# Define a simple feedforward neural network
class PolarNet(nn.Module):
    def __init__(self):
        super(PolarNet, self).__init__()
        self.fc1 = nn.Linear(3, 64)
        self.fc2 = nn.Linear(64, 128)
        self.fc3 = nn.Linear(128, 4)  # Four output classes

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Initialize the model, loss function, and optimizer
model = PolarNet()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
for epoch in range(20):  # Number of epochs
    for inputs, targets in train_loader:
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch+1}/20, Loss: {loss.item()}')

# After training, evaluate the model on the entire dataset for visualization
with torch.no_grad():
    predicted_labels = model(data).argmax(dim=1)

# Plotting the results in 3D
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

# Convert polar back to Cartesian for plotting
x_cartesian = r * torch.sin(phi) * torch.cos(theta)
y_cartesian = r * torch.sin(phi) * torch.sin(theta)
z_cartesian = r * torch.cos(phi)

# Plot the 3D scatter plot
scatter = ax.scatter(x_cartesian, y_cartesian, z_cartesian, c=predicted_labels, cmap='viridis', marker='o')

# Add color bar and labels
plt.colorbar(scatter, ax=ax)
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
plt.title('3D Visualization of PolarNet Classifications')
plt.show()

mnist ocr

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

batch_size = 64
learning_rate = 0.01
epochs = 100
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(28 * 28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = x.view(-1, 28 * 28)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

model = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate)

for epoch in range(epochs):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print(f'Epoch: {epoch+1}/{epochs} [Batch: {batch_idx*len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')

model.eval()
test_loss = 0
correct = 0

with torch.no_grad():
    for data, target in test_loader:
        output = model(data)
        test_loss += criterion(output, target).item()
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

test_loss /= len(test_loader.dataset)
accuracy = 100. * correct / len(test_loader.dataset)
print(f'Test set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.2f}%)')
torch.save(model.state_dict(), "mnist_model.pth")

use mnist

model = Net()
### 3. Load the Trained Model Weights
model.load_state_dict(torch.load("mnist_model.pth"))
model.eval()  # Set the model to evaluation mode

### 4. Prepare the Handwritten Input Image
#You need to preprocess the handwritten image to match the format of the MNIST dataset (28x28 pixels, grayscale).
def preprocess_image(image_path):
    transform = transforms.Compose([
        transforms.Grayscale(),  # Ensure the image is grayscale
        transforms.Resize((28, 28)),  # Resize to 28x28 pixels
        transforms.ToTensor(),  # Convert to tensor
        transforms.Normalize((0.1307,), (0.3081,))  # Normalize with the same mean and std as MNIST
    ])
    image = Image.open(image_path)
    image = transform(image).unsqueeze(0)  # Add batch dimension
    return image

### 5. Perform Inference
def recognize_digit(image_path):
    image = preprocess_image(image_path)
    with torch.no_grad():
        output = model(image)
        prediction = output.argmax(dim=1, keepdim=True)
    return prediction.item()

# Example usage
image_path = 'path_to_your_handwritten_digit_image3.png'
predicted_digit = recognize_digit(image_path)
print(f'Predicted Digit: {predicted_digit}')

calculator neural network

import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np

# Define the neural network architecture
class CalculatorNN(nn.Module):
    def __init__(self):
        super(CalculatorNN, self).__init__()
        self.fc1 = nn.Linear(3, 128)  # Input: 2 numbers + operation
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)   # Output: the result

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

model = CalculatorNN()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 50000 # loss is too large if is 5000.
for epoch in range(num_epochs):
    model.train()
    # Forward pass
    predictions = model(X_train)
    loss = criterion(predictions, y_train)
    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# ---- use
model = CalculatorNN()
model.load_state_dict(torch.load('calculator_model.pth'))
model.eval()


# Perform the prediction
with torch.no_grad():
    # Prepare the input (32 * 3)
    input_data = torch.tensor([[32.0, 3.0, 2]], dtype=torch.float32)  # 2 corresponds to multiplication
    prediction = model(input_data)
    print(f'Prediction for 32 * 3: {prediction.item():.4f}')

Data cleaning

## split by pattern, a full log for instance
def split_log_file(input_file, split_pattern, output_pattern):
    with open(input_file, 'r') as file:
        log_content = file.read()
    pattern = re.compile(split_pattern)
    split_points = [match.start() for match in re.finditer(pattern, log_content)]
    split_points.append(len(log_content))
    for i in range(len(split_points) - 1):
        start = split_points[i]
        end = split_points[i + 1]
        segment = log_content[start:end]
        match = pattern.search(segment)
        if match:
            number = match.group(1)
            output_file = output_pattern.format(number=number)
            with open(output_file, 'w') as file:
                file.write(segment)
            print(f"Segment saved as {output_file}")

## difference patterns save log
def move_patterns_logs(destination_path, patterns):
    current_directory = os.getcwd()
    log_files = glob.glob("*.log")
    for log_file in log_files:
        with open(log_file, 'r') as file:
            if any(re.search(pattern, line) for pattern in patterns for line in file):
                shutil.move(os.path.join(current_directory, log_file), destination_path)
                break

## filter show or data visualization
def filter_log_file(log_file_path, exclude_keywords):
    with open(log_file_path, "r") as file:
        lines = file.readlines()
    filtered_lines = [line for line in lines if not any(keyword in line for keyword in exclude_keywords)]
    for line in filtered_lines:
        print(line, end="")

SVM

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
X, y = make_classification(n_samples=100, n_features=3, n_informative=3, n_redundant=0, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
model = SVC(kernel='linear')
model.fit(X_train, y_train)
def plot_svm_decision_boundary_3d(model, X, y):
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')
    # Plot the training points
    scatter = ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=y, s=30, cmap=plt.cm.coolwarm)
    # Create grid to evaluate model (this defines the 3D space)
    xlim = ax.get_xlim()
    ylim = ax.get_ylim()
    zlim = ax.get_zlim()
    xx = np.linspace(xlim[0], xlim[1], 20)
    yy = np.linspace(ylim[0], ylim[1], 20)
    zz = np.linspace(zlim[0], zlim[1], 20)
    # Create a meshgrid to evaluate the decision function
    YY, ZZ = np.meshgrid(yy, zz)
    XX = -(model.coef_[0][0] * YY + model.coef_[0][2] * ZZ + model.intercept_) / model.coef_[0][1]
    # Plot the decision surface
    ax.plot_surface(XX, YY, ZZ, color='gray', alpha=0.3, rstride=100, cstride=100)
    # Highlight support vectors
    ax.scatter(model.support_vectors_[:, 0], model.support_vectors_[:, 1], model.support_vectors_[:, 2],
               s=100, facecolors='none', edgecolors='k', linewidth=1.5, label='Support Vectors')
    ax.set_title('SVM Decision Boundary in 3D')
    ax.set_xlabel('Feature 1')
    ax.set_ylabel('Feature 2')
    ax.set_zlabel('Feature 3')
    # Add color legend
    legend1 = ax.legend(*scatter.legend_elements(), loc="best", title="Classes")
    ax.add_artist(legend1)
    plt.show()
plot_svm_decision_boundary_3d(model, X_train, y_train)

kmeans

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans

def cluster_error_messages(error_messages, num_clusters=5):
    vectorizer = TfidfVectorizer(stop_words='english')
    X = vectorizer.fit_transform(error_messages)

    kmeans = KMeans(n_clusters=num_clusters, random_state=0)
    kmeans.fit(X)

    labels = kmeans.labels_
    clustered_errors = {}
    for i, label in enumerate(labels):
        if label not in clustered_errors:
            clustered_errors[label] = []
        clustered_errors[label].append(error_messages[i])
    return clustered_errors

Decision Tree Classifier

import matplotlib.pyplot as plt
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier, plot_tree
from sklearn import metrics
iris = load_iris()
X = iris.data  # Features
y = iris.target  # Labels
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
clf = DecisionTreeClassifier()
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
accuracy = metrics.accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy * 100:.2f}%")
plt.figure(figsize=(12,8))
plot_tree(clf, feature_names=iris.feature_names, class_names=iris.target_names, filled=True)
plt.show()

Reinforcement Learning (DQN)

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random

# Define a simple fully connected neural network
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

# ### 3. **Initialize the environment and model:**

import gymnasium as gym
import torch

env = gym.make("LunarLander-v2", render_mode="human")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

# Create the DQN model
model = DQN(input_dim=state_dim, output_dim=action_dim)

# ### 4. **Define the training loop:**
# In this section, we'll define how the agent interacts with the environment, how rewards are collected, and how the model is updated.

# Parameters
learning_rate = 0.001
gamma = 0.99  # Discount factor
epsilon = 1.0  # Exploration rate
epsilon_decay = 0.995
epsilon_min = 0.01
episodes = 500

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Function to choose action (using epsilon-greedy policy)
def choose_action(state, epsilon):
    if np.random.rand() <= epsilon:
        return np.random.choice(action_dim)  # Random action
    state = torch.FloatTensor(state).unsqueeze(0)
    with torch.no_grad():
        q_values = model(state)
    return torch.argmax(q_values).item()

# Function to train the model
def train_model(memory, batch_size=64):
    if len(memory) < batch_size:
        return

    # Randomly sample a batch from memory
    batch = random.sample(memory, batch_size)

    # Extract states, actions, rewards, next_states, and dones from the batch
    states, actions, rewards, next_states, dones = zip(*batch)

    # Convert them to tensors
    states = torch.FloatTensor(states)
    actions = torch.LongTensor(actions)
    rewards = torch.FloatTensor(rewards)
    next_states = torch.FloatTensor(next_states)
    dones = torch.FloatTensor(dones)

    # Compute Q values for the current states
    q_values = model(states).gather(1, actions.unsqueeze(1)).squeeze(1)

    # Compute the maximum Q values for the next states
    next_q_values = model(next_states).max(1)[0]

    # Compute the target Q values
    q_targets = rewards + (1 - dones) * gamma * next_q_values

    # Compute the loss
    loss = F.mse_loss(q_values, q_targets)

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Main loop
memory = []

for episode in range(episodes):
    state = env.reset()[0]
    total_reward = 0

    for t in range(1000):
        action = choose_action(state, epsilon)
        next_state, reward, done, truncated, _ = env.step(action)
        memory.append((state, action, reward, next_state, done))

        train_model(memory)

        state = next_state
        total_reward += reward
        if done or truncated:
            break

    epsilon = max(epsilon_min, epsilon * epsilon_decay)
    print(f"Episode {episode + 1}, Total Reward: {total_reward}")

env.close()

Flappy bird dqn

import gymnasium as gym
import numpy as np
import pygame
from gymnasium import spaces
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import time

import pygame
import numpy as np
from gymnasium import spaces

from flappy_bird_cl3_pass_env_to_nn_3 import FlappyBirdEnv

class DQN(nn.Module):
    def __init__(self, input_size, n_actions):
        super(DQN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, n_actions)
        )

    def forward(self, x):
        return self.fc(x)

class DQNAgent:
    def __init__(self, env, learning_rate=1e-3, gamma=0.99, epsilon_start=1.0, epsilon_final=0.01, epsilon_decay=0.995):
        self.env = env
        self.n_actions = env.action_space.n
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.epsilon = epsilon_start
        self.epsilon_final = epsilon_final
        self.epsilon_decay = epsilon_decay

        self.memory = deque(maxlen=10000)
        self.batch_size = 64

        state_size = len(env.get_state())
        self.model = DQN(state_size, self.n_actions).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.criterion = nn.MSELoss()

        self.gamma = gamma

    def get_action(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, self.n_actions - 1)

        with torch.no_grad():
            state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            q_values = self.model(state)
            return torch.argmax(q_values).item()

    def update_epsilon(self):
        self.epsilon = max(self.epsilon_final, self.epsilon * self.epsilon_decay)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def train(self):
        if len(self.memory) < self.batch_size:
            return

        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)

        current_q_values = self.model(states).gather(1, actions.unsqueeze(1))
        with torch.no_grad():
            next_q_values = self.model(next_states).max(1)[0]
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

        loss = self.criterion(current_q_values.squeeze(), target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

def train_dqn(env, episodes=2000, max_steps=1000, render_interval=10):
    agent = DQNAgent(env)
    scores = []

    for episode in range(episodes):
        state = env.reset()
        score = 0

        for step in range(max_steps):
            if episode % render_interval == 0:
                env.render()

            action = agent.get_action(state)
            next_state, reward, done, _, _ = env.step(action)
            agent.remember(state, action, reward, next_state, done)
            agent.train()

            state = next_state
            score += reward

            if done:
                break

            if episode % render_interval == 0:
                pygame.event.pump()

        agent.update_epsilon()
        scores.append(score)

        if episode % 10 == 0:
            print(f"Episode: {episode}, Score: {score}, Epsilon: {agent.epsilon:.2f}")

    return agent, scores

if __name__ == "__main__":
    env = FlappyBirdEnv()
    agent, scores = train_dqn(env, episodes=6000, render_interval=50)

    # Test the trained agent
    state = env.reset()
    done = False
    score = 0

    while not done:
        env.render()
        action = agent.get_action(state)
        next_state, reward, done, _, _ = env.step(action)
        state = next_state
        score += reward

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                done = True

        pygame.event.pump()
        time.sleep(0.03)

    print(f"Final Score: {score}")
    env.close()

GRPO (minimal example)

grpo_minimal.py is the tiniest possible GRPO (Group Relative Policy Optimization) training loop, written to mirror the pattern used by the companion project MathGPT/scripts/train_rl.py, which runs GRPO on GSM8K math problems with an LLM.

Instead of an LLM, the toy runs a contextual bandit: 4 "prompts", vocabulary of 8 tokens, and a reward of 1 when the sampled token matches that prompt's secret target. The policy is just a [num_prompts, vocab] logit table — the smallest thing that still has a policy gradient.

The recipe, straight from MathGPT's train_rl.py:

  1. For each prompt, sample G rollouts from π(a|prompt) — the "group".
  2. Score each rollout with a reward function (1.0 / 0.0).
  3. Advantage = reward − group mean (advantages = rewards - rewards.mean() in train_rl.py:158).
  4. Policy-gradient loss: −E[ log π(a|s) · A ] (train_rl.py:254-258).
  5. Gradient step; repeat.
def grpo_step():
    prompts = torch.arange(NUM_PROMPTS).repeat_interleave(NUM_SAMPLES)
    logits  = policy(prompts)
    dist    = torch.distributions.Categorical(logits=logits)
    actions = dist.sample()
    logp    = dist.log_prob(actions)

    rewards = reward_fn(prompts, actions)

    # group-relative advantage: per-prompt baseline
    rewards_by_group = rewards.view(NUM_PROMPTS, NUM_SAMPLES)
    baseline         = rewards_by_group.mean(dim=1, keepdim=True)
    advantages       = (rewards_by_group - baseline).view(-1)

    loss = -(logp * advantages.detach()).mean()
    optimizer.zero_grad(); loss.backward(); optimizer.step()

The script saves a step-by-step decomposition of the pipeline:

file what it shows
grpo_01_task.png prompt → correct-token mapping (the reward landscape)
grpo_02_policy_init.png π(a|prompt) before training — uniform
grpo_03_rollouts.png one batch of group rollouts, colored by reward
grpo_04_advantages.png reward vs. reward − group mean (the variance-reducing baseline)
grpo_05_training_curves.png mean reward, pg loss, policy entropy over steps
grpo_06_policy_final.png learned π(a|prompt) — mass on the stars
grpo_06b_policy_evolution.png snapshots of π migrating toward the correct token
grpo_overview.png all of the above on one figure

Run it:

python grpo/grpo_minimal.py

DQN vs GRPO — same RL core, different assumptions

The DQN examples above (rl_games/rl_gym_dqn_lunar.py, rl_games/flappy_bird_app/...) and the GRPO example (grpo_minimal.py, mirroring MathGPT/scripts/train_rl.py) are both reinforcement learning. They look very different because the environments they target are very different (Atari/Gym games vs. an LLM answering math), but the underlying loop is the same.

The same part (the RL essence)

Both methods do exactly this:

  1. Interact with something — a Gym env for DQN, the LLM's own generator for GRPO — to collect (situation, action, reward) data.
  2. Assign credit: decide, for each action we took, how good it actually was.
  3. Nudge the neural network so that actions that turned out well become more likely next time.
  4. Repeat until the expected reward stops going up.

Both are solving max_θ E_{τ~π_θ} [ R(τ) ] — the same objective. Both need exploration, both use a neural net as a function approximator, both need some form of variance reduction because the reward signal is noisy, both suffer from sample inefficiency and credit assignment.

The different part

The interesting question is where each algorithm makes different structural choices, and why — the environment shapes the algorithm.

dimension DQN (game playing, this project's rl_games/) GRPO (reasoning / QA, grpo_minimal.pyMathGPT)
Family Value-based (Q-learning) Policy-gradient (REINFORCE with baseline)
What the NN outputs Q(s, a) — an action-value estimate π(a|s) — a probability distribution directly
How the policy is obtained argmax_a Q(s,a) + ε-greedy noise Sample from the softmax — the net is the policy
Update rule Bellman/TD: Q(s,a) ← r + γ max_a' Q(s',a') PG: θ ← θ + η · ∇log π(a|s) · A
Bootstraps from own estimate? Yes — TD target uses Q itself (unstable, needs a target network) No — uses Monte-Carlo reward directly
On- or off-policy Off-policy (old transitions in replay buffer are still valid) On-policy (must resample every update — old rollouts become stale)
Replay / data reuse Experience replay buffer, shuffle-sampled minibatches Fresh rollouts each step, discarded after one update
Exploration mechanism Externally injected: ε-greedy, noise, decaying schedules Built-in: stochastic sampling from π, optionally a temperature
Variance reduction Target network, Double-DQN, Huber loss, clipped rewards Group-relative baseline: subtract mean reward of the N rollouts from the same prompt
Episode shape Long sequence of (s_t, a_t, r_t, s_{t+1}) tuples, many decisions per episode One prompt → one full generation; the "episode" is a single sequence
Reward density Dense — a reward at (almost) every environment step Sparse — one scalar for the whole generated answer (correct / wrong)
Discount γ Yes, γ ∈ [0.9, 0.999], crucial for long horizons Usually γ = 1 (terminal-only reward, so discounting changes nothing)
State / context Physics vector or pixel frame; Markovian Token prefix (prompt); state = everything generated so far
Action space Small & discrete — 2 in CartPole, 4 in LunarLander, 2 in Flappy Bird Huge — one vocabulary-sized categorical per token (tens of thousands)
"Rollouts per decision" 1 transition per env step, batched later from the buffer G rollouts per prompt (the Group in GRPO, --num-samples=16 in MathGPT)
Where the baseline comes from A slowly-updated target network provides a stable Q-target The mean reward of the group — no extra network needed
What breaks it Q overestimation, non-stationary targets, replay staleness Reward collapse (entropy → 0), stale rollouts if you reuse them
Typical environment Gym / ALE / physics simulators A base/SFT language model generating its own rollouts
Files in this repo rl_games/rl_gym_dqn_lunar.py, rl_games/flappy_bird_app/*, drive_torch_dqn_nocnn.py grpo_minimal.py (and MathGPT/scripts/train_rl.py)

One-line summary

DQN learns how good each action is in a state, then acts greedily — it works because games give dense per-step feedback and you can replay old experience. GRPO skips the value function entirely: it generates a whole answer, looks at its reward, compares it to the reward of its siblings from the same prompt, and pushes the policy toward the better siblings — it works because a language model can cheaply generate a whole group of rollouts for the same prompt, and the group mean is a free, unbiased baseline.

SGD

import torch
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.animation import FuncAnimation

# Random 3D surface (loss function)
def loss_function(x, y):
    return torch.sin(x) * torch.cos(y) + 0.1 * (x**2 + y**2)

# Generate a meshgrid for plotting the surface
x = torch.linspace(-5, 5, 100)
y = torch.linspace(-5, 5, 100)
X, Y = torch.meshgrid(x, y)
Z = loss_function(X, Y).detach().numpy()

# Initialize figure and 3D axis for animation
fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111, projection='3d')
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_title('SGD Optimization Path on 3D Surface')

# Plot the static 3D surface
ax.plot_surface(X.numpy(), Y.numpy(), Z, cmap='viridis', alpha=0.7)

# SGD starting point
start_point = torch.tensor([4.0, 4.0], requires_grad=True)

# Hyperparameters
learning_rate = 0.1
optimizer = torch.optim.SGD([start_point], lr=learning_rate)

# Number of steps and animation frames
steps = 10
path = np.zeros((steps, 3))

# Plotting the initial point on the surface
point_plot, = ax.plot([], [], [], color='r', marker='o', markersize=5)

# Function to update the frame during animation
def update(i):
    global start_point

    optimizer.zero_grad()

    # Calculate the loss (z value)
    loss = loss_function(start_point[0], start_point[1])

    # Backpropagation to compute gradients
    loss.backward()

    # Perform optimization step
    optimizer.step()

    # Store the (x, y, z) values
    path[i, 0] = start_point[0].item()
    path[i, 1] = start_point[1].item()
    path[i, 2] = loss.item()

    # Update point on the surface
    point_plot.set_data(path[:i+1, 0], path[:i+1, 1])
    point_plot.set_3d_properties(path[:i+1, 2])
    
    return point_plot,

# Animate SGD for 10 steps
ani = FuncAnimation(fig, update, frames=steps, interval=500, blit=True)

# Show the animation
plt.show()

CNN with Attention

import torch.nn as nn
import torch.nn.functional as F

class Attention(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Attention, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        # Global feature extraction
        global_features = torch.mean(x, dim=(2, 3), keepdim=True)
        attention_map = self.conv(global_features)
        attention_map = self.softmax(attention_map)
        out = x * attention_map
        return out

class CNNWithAttention(nn.Module):
    def __init__(self):
        super(CNNWithAttention, self).__init__()
        # Convolutional layers
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)

        # Attention layer
        self.attention = Attention(64, 64)

        # Fully connected layers
        self.fc1 = nn.Linear(64 * 8 * 8, 512)
        self.fc2 = nn.Linear(512, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))

        # Attention mechanism
        x = self.attention(x)

        x = x.view(-1, 64 * 8 * 8)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# --
# Initialize the model, loss function, and optimizer
model = CNNWithAttention()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
for epoch in range(5):  # Train for 5 epochs
    running_loss = 0.0
    for inputs, labels in trainloader:
        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch [{epoch + 1}/5], Loss: {running_loss / len(trainloader)}")

LSTM generator

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader


class Vocab:
    def __init__(self, stoi, itos):
        self.stoi = stoi
        self.itos = itos

# Provided corpus (AI history)
corpus = """
The history of artificial intelligence (AI) began in antiquity, with myths, stories and rumors of artificial beings endowed with intelligence or consciousness by master craftsmen.
... ...
"""

# Simple tokenization (splitting by spaces)
corpus = corpus.replace("\n", " ")  # Remove newlines

# Tokenization can be improved using libraries like nltk or spacy, but we'll use a simple split here
tokens = corpus.split()

# You can build a vocabulary from this corpus as you did before, for instance:
from collections import Counter

# Create a vocabulary from the corpus
token_counts = Counter(tokens)
vocab_stoi = {token: idx for idx, (token, count) in enumerate(token_counts.items())}
vocab_itos = {idx: token for token, idx in vocab_stoi.items()}

# Create the Vocab object
vocab = Vocab(stoi=vocab_stoi, itos=vocab_itos)

class RNNModel(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers):
        super(RNNModel, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden):
        x = self.embedding(x)
        out, hidden = self.rnn(x, hidden)
        out = self.fc(out)
        return out, hidden

    def init_hidden(self, batch_size):
        # Initialize hidden states (h_0) and cell states (c_0) with correct batch size
        weight = next(self.parameters()).data
        return (weight.new_zeros(self.num_layers, batch_size, self.hidden_size),
                weight.new_zeros(self.num_layers, batch_size, self.hidden_size))

class TextDataset(Dataset):
    def __init__(self, text, vocab, sequence_length):
        self.vocab = vocab
        self.sequence_length = sequence_length
        self.data = self.tokenize_and_encode(text)
    def tokenize_and_encode(self, text):
        tokens = text.split()  # Simple tokenization (split by spaces)
        return [self.vocab.stoi[token] for token in tokens if token in self.vocab.stoi]
    def __len__(self):
        return len(self.data) - self.sequence_length
    def __getitem__(self, idx):
        x = self.data[idx:idx + self.sequence_length]
        y = self.data[idx + 1:idx + 1 + self.sequence_length]
        return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)

# Define sequence length and batch size
sequence_length = 10  # Can be tuned
batch_size = 100

# Create the dataset and dataloader
dataset = TextDataset(corpus, vocab, sequence_length)
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Now you're ready to train the model using the provided corpus

# Define model, loss function, and optimizer
vocab_size = len(vocab.stoi)
embed_size = 50  # Adjust as needed
hidden_size = 100  # Adjust as needed
num_layers = 2
num_epochs = 100  # Adjust based on performance
learning_rate = 0.001

model = RNNModel(vocab_size, embed_size, hidden_size, num_layers)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    for batch in train_loader:
        inputs, targets = batch
        batch_size = inputs.size(0)  # Get the actual batch size for this iteration
        hidden = model.init_hidden(batch_size)  # Initialize hidden state with correct batch size

        outputs, hidden = model(inputs, hidden)
        loss = criterion(outputs.view(-1, vocab_size), targets.view(-1))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch+1}, Loss: {loss.item()}')

torch.save(model.state_dict(), 'rnn_model_ai.pth')
def generate_text(model, start_text, max_length=100):
    model.eval()
    hidden = model.init_hidden(1)  # Start with batch size 1
    input = torch.tensor([[vocab.stoi[start_text]]])  # Convert start_text to input tensor
    result = [start_text]
    for _ in range(max_length):
        output, hidden = model(input, hidden)
        prob = nn.functional.softmax(output[0, -1], dim=0).data
        next_word = torch.multinomial(prob, 1).item()
        result.append(vocab.itos[next_word])  # Convert back to word using vocab
        input = torch.tensor([[next_word]])  # Feed the next word as input
    return ' '.join(result)
start_text = 'AI'  # The starting word
generated_text = generate_text(model, start_text, max_length=100)
print(generated_text)

Seq2seq number translator

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
import matplotlib.pyplot as plt

import random

class NumeralTranslationDataset:
    def __init__(self):
        # Comprehensive mapping of Arabic numerals to English words
        self.num_to_words = {
            '0': 'zero', '1': 'one', '2': 'two', '3': 'three', '4': 'four', 
            '5': 'five', '6': 'six', '7': 'seven', '8': 'eight', '9': 'nine',
            '10': 'ten', '11': 'eleven', '12': 'twelve', '13': 'thirteen', 
            '14': 'fourteen', '15': 'fifteen', '16': 'sixteen', 
            '17': 'seventeen', '18': 'eighteen', '19': 'nineteen',
            '20': 'twenty', '21': 'twenty one', '22': 'twenty two', 
            '23': 'twenty three', '24': 'twenty four', '25': 'twenty five',
            '30': 'thirty', '31': 'thirty one', '32': 'thirty two', 
            '33': 'thirty three', '34': 'thirty four', '35': 'thirty five',
            '40': 'forty', '41': 'forty one', '42': 'forty two', 
            '43': 'forty three', '44': 'forty four', '45': 'forty five',
            '50': 'fifty', '51': 'fifty one', '52': 'fifty two', 
            '53': 'fifty three', '54': 'fifty four', '55': 'fifty five',
            '60': 'sixty', '61': 'sixty one', '62': 'sixty two', 
            '63': 'sixty three', '64': 'sixty four', '65': 'sixty five',
            '70': 'seventy', '71': 'seventy one', '72': 'seventy two', 
            '73': 'seventy three', '74': 'seventy four', '75': 'seventy five',
            '80': 'eighty', '81': 'eighty one', '82': 'eighty two', 
            '83': 'eighty three', '84': 'eighty four', '85': 'eighty five',
            '90': 'ninety', '91': 'ninety one', '92': 'ninety two', 
            '93': 'ninety three', '94': 'ninety four', '95': 'ninety five'
        }

    def generate_training_data(self, num_examples=1000):
        """Generate random training data for number translation."""
        input_sequences = []
        target_sequences = []

        # Generate random numbers from 0 to 99 for a total of `num_examples` examples
        for _ in range(num_examples):
            num = random.randint(0, 99)  # Randomly pick a number from 0 to 99
            num_str = str(num)
            
            # Translate to words
            if num in self.num_to_words:
                word = self.num_to_words[num_str]
            elif num < 20:
                # Handle teens
                units = str(num % 10)
                word = self.num_to_words[units]
            else:
                # Handle 21-99
                tens = str((num // 10) * 10)
                units = str(num % 10)
                tens_word = self.num_to_words[tens]
                units_word = self.num_to_words[units] if units != '0' else ''
                word = f"{tens_word} {units_word}".strip()
            
            input_sequences.append(list(num_str))
            target_sequences.append(list(word))

        return input_sequences, target_sequences

class Encoder(nn.Module):
    def __init__(self, input_size, embedding_dim, hidden_dim):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_size, embedding_dim)
        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True, num_layers=2, dropout=0.2)
        
    def forward(self, x):
        embedded = self.embedding(x)
        outputs, hidden = self.gru(embedded)
        return outputs, hidden

class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hidden_dim * 2, hidden_dim)
        self.v = nn.Parameter(torch.rand(hidden_dim))
        
    def forward(self, hidden, encoder_outputs):
        # hidden = [batch size, hidden dim]
        # encoder_outputs = [batch size, seq len, hidden dim]
        
        batch_size = encoder_outputs.shape[0]
        src_len = encoder_outputs.shape[1]
        
        # Repeat hidden state src_len times
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
        
        # Concatenate hidden state with encoder outputs
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=-1)))
        
        # Compute attention scores
        attention = torch.sum(self.v * energy, dim=-1)
        
        return torch.softmax(attention, dim=1)

class Decoder(nn.Module):
    def __init__(self, output_size, embedding_dim, hidden_dim):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_size, embedding_dim)
        self.attention = Attention(hidden_dim)
        self.gru = nn.GRU(embedding_dim + hidden_dim, hidden_dim, batch_first=True, num_layers=2, dropout=0.2)
        self.fc_out = nn.Linear(hidden_dim, output_size)
        
    def forward(self, input, hidden, encoder_outputs):
        # input = [batch size, 1]
        # hidden = [batch size, hidden dim]
        # encoder_outputs = [batch size, src len, hidden dim]
        
        # Embedding input
        embedded = self.embedding(input)
        
        # Compute attention weights
        a = self.attention(hidden[-1], encoder_outputs)
        
        # Apply attention to encoder outputs
        attended = torch.bmm(a.unsqueeze(1), encoder_outputs).squeeze(1)
        
        # Concatenate embedded input with attended context
        rnn_input = torch.cat((embedded.squeeze(1), attended), dim=1).unsqueeze(1)
        
        # GRU step
        output, hidden = self.gru(rnn_input, hidden)
        
        # Prediction
        prediction = self.fc_out(output.squeeze(1))
        
        return prediction, hidden, a

class Seq2SeqTranslator(nn.Module):
    def __init__(self, input_size, output_size, embedding_dim, hidden_dim):
        super(Seq2SeqTranslator, self).__init__()
        self.encoder = Encoder(input_size, embedding_dim, hidden_dim)
        self.decoder = Decoder(output_size, embedding_dim, hidden_dim)
        
    def forward(self, input_seq, target_seq, teacher_forcing_ratio=0.5):
        batch_size = input_seq.size(0)
        target_len = target_seq.size(1)
        target_vocab_size = self.decoder.fc_out.out_features
        
        # Tensor to store decoder outputs
        outputs = torch.zeros(batch_size, target_len, target_vocab_size)
        
        # Encoder
        encoder_outputs, hidden = self.encoder(input_seq)
        
        # First decoder input
        decoder_input = torch.zeros(batch_size, 1, dtype=torch.long)
        
        # Decode
        for t in range(target_len):
            decoder_output, hidden, _ = self.decoder(decoder_input, hidden, encoder_outputs)
            outputs[:, t:t+1, :] = decoder_output.unsqueeze(1)
            
            # Teacher forcing
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = decoder_output.argmax(1)
            
            if teacher_force:
                decoder_input = target_seq[:, t:t+1]
            else:
                decoder_input = top1.unsqueeze(1)
        
        return outputs

class NumeralTranslator:
    def __init__(self, input_chars, output_chars):
        # Create dataset
        self.dataset = NumeralTranslationDataset()
        
        # Create character to index mappings
        self.input_char_to_idx = {char: i for i, char in enumerate(input_chars)}
        self.input_idx_to_char = {i: char for char, i in self.input_char_to_idx.items()}
        
        self.output_char_to_idx = {char: i for i, char in enumerate(output_chars)}
        self.output_idx_to_char = {i: char for char, i in self.output_char_to_idx.items()}
        
        # Hyperparameters
        self.embedding_dim = 128
        self.hidden_dim = 256
        
        # Initialize model
        self.model = Seq2SeqTranslator(
            input_size=len(input_chars),
            output_size=len(output_chars),
            embedding_dim=self.embedding_dim,
            hidden_dim=self.hidden_dim
        )
        
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
    
    def prepare_sequence(self, seq, char_to_idx):
        """Convert sequence of characters to tensor of indices."""
        return torch.tensor([char_to_idx.get(char, 0) for char in seq], dtype=torch.long)
    
    def pad_sequences(self, sequences, pad_token):
        """Pad sequences to equal length."""
        # Convert sequences to lists if they are tensors
        sequences = [seq.tolist() if torch.is_tensor(seq) else seq for seq in sequences]
        
        max_len = max(len(seq) for seq in sequences)
        padded = []
        for seq in sequences:
            padded.append(seq + [pad_token] * (max_len - len(seq)))
        return torch.tensor(padded, dtype=torch.long)
    
    def train(self, epochs=300, batch_size=32):
        """Train the translation model."""
        # Generate training data
        input_sequences, target_sequences = self.dataset.generate_training_data()
        
        # Prepare input and target sequences
        input_chars = [list(str(seq)) for seq in input_sequences]
        target_chars = [list(seq) for seq in target_sequences]
        
        # Get character sets for input and output
        input_chars_set = sorted(set(''.join([''.join(seq) for seq in input_chars])))
        output_chars_set = sorted(set(''.join([''.join(seq) for seq in target_chars])))
        
        print("Input characters:", input_chars_set)
        print("Output characters:", output_chars_set)
        
        # Training loop
        epoch_losses = []
        for epoch in range(epochs):
            total_loss = 0
            
            # Shuffle data
            combined = list(zip(input_chars, target_chars))
            random.shuffle(combined)
            input_chars, target_chars = zip(*combined)
            
            for i in range(0, len(input_chars), batch_size):
                batch_input = input_chars[i:i+batch_size]
                batch_target = target_chars[i:i+batch_size]
                
                # Prepare input sequences
                input_seqs = self.pad_sequences(
                    [self.prepare_sequence(seq, self.input_char_to_idx) for seq in batch_input], 
                    pad_token=0
                )
                
                # Prepare target sequences
                target_seqs = self.pad_sequences(
                    [self.prepare_sequence(seq, self.output_char_to_idx) for seq in batch_target], 
                    pad_token=0
                )
                
                # Zero gradients
                self.optimizer.zero_grad()
                
                # Forward pass
                outputs = self.model(input_seqs, target_seqs)
                
                # Compute loss
                loss = self.criterion(
                    outputs.view(-1, outputs.size(-1)), 
                    target_seqs.view(-1)
                )
                
                # Backward pass
                loss.backward()
                self.optimizer.step()
                
                total_loss += loss.item()
            
            # Record average epoch loss
            avg_loss = total_loss / (len(input_chars) // batch_size)
            epoch_losses.append(avg_loss)
            
            # Print progress
            if epoch % 10 == 0:
                print(f'Epoch {epoch}, Loss: {avg_loss:.4f}')
        
        # Visualize training loss
        self.plot_training_loss(epoch_losses)
        
        return epoch_losses
    
    def translate(self, input_number):
        """Translate a single number to words."""
        # Prepare input sequence
        input_seq = self.prepare_sequence(list(str(input_number)), self.input_char_to_idx)
        input_seq = input_seq.unsqueeze(0)  # Add batch dimension
        
        # Create dummy target sequence of zeros
        max_output_length = 10  # Maximum expected word length
        dummy_target = torch.zeros(1, max_output_length, dtype=torch.long)
        
        # Disable gradient computation
        with torch.no_grad():
            # Get model outputs
            outputs = self.model(input_seq, dummy_target)
            
            # Get the most likely output characters
            predicted_indices = outputs.argmax(dim=-1)
            
            # Convert indices back to characters
            predicted_chars = []
            for i in range(predicted_indices.size(1)):
                char_idx = predicted_indices[0, i].item()
                char = self.output_idx_to_char[char_idx]
                if char != '<pxad>':  # Skip padding
                    predicted_chars.append(char)
            
            # Join characters to form a word
            return ''.join(predicted_chars).strip()
    
    def save_model(self, filepath='numeral_translator.pth'):
        """Save model state."""
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'input_char_to_idx': self.input_char_to_idx,
            'output_char_to_idx': self.output_char_to_idx
        }, filepath)
        print(f"Model saved to {filepath}")
    
    def load_model(self, filepath='numeral_translator.pth'):
        """Load model state."""
        checkpoint = torch.load(filepath)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.input_char_to_idx = checkpoint['input_char_to_idx']
        self.output_char_to_idx = checkpoint['output_char_to_idx']
        print(f"Model loaded from {filepath}")
    
    def plot_training_loss(self, losses):
        """Visualize training loss."""
        plt.figure(figsize=(10, 5))
        plt.plot(losses, label='Training Loss')
        plt.title('Training Loss Over Epochs')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.tight_layout()
        plt.savefig('training_loss.png')
        plt.close()

def main():
    # Define input and output character sets
    input_chars = list('0123456789')
    output_chars = list(' abcdefghijklmnopqrstuvwxyz') + ['<pad>']

    # Initialize translator
    translator = NumeralTranslator(input_chars, output_chars)

    # Train the model
    print("Training model...")
    losses = translator.train(epochs=300, batch_size=32)

    # Save the trained model
    translator.save_model()

    # Test the model with some examples
    test_numbers = ['0', '5', '13', '25', '42', '67', '89', '99']
    print("\nTesting translations:")
    for number in test_numbers:
        translation = translator.translate(number)
        print(f"{number} -> {translation}")

    # Interactive mode
    print("\nEnter a number (0-99) to translate or 'q' to quit:")
    while True:
        user_input = input("> ")
        if user_input.lower() == 'q':
            break
        try:
            number = int(user_input)
            if 0 <= number <= 99:
                translation = translator.translate(user_input)
                print(f"Translation: {translation}")
            else:
                print("Please enter a number between 0 and 99")
        except ValueError:
            print("Invalid input. Please enter a valid number or 'q' to quit")

if __name__ == "__main__":
    main()

Transformer generator

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import math
class Vocab:
    def __init__(self, stoi, itos):
        self.stoi = stoi
        self.itos = itos

corpus = """
The history of artificial intelligence (AI) began in antiquity, with myths, stories and rumors of artificial beings endowed with intelligence or consciousness by master craftsmen.
...
"""
corpus = corpus.replace("\n", " ")
tokens = corpus.split()
from collections import Counter
token_counts = Counter(tokens)
vocab_stoi = {token: idx for idx, (token, count) in enumerate(token_counts.items())}
vocab_itos = {idx: token for token, idx in vocab_stoi.items()}
vocab = Vocab(stoi=vocab_stoi, itos=vocab_itos)

class PositionalEncoding(nn.Module):
    def __init__(self, embed_size, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, embed_size)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * (-math.log(10000.0) / embed_size))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)
    def forward(self, x):
        return x + self.encoding[:, :x.size(1), :].to(x.device)

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, hidden_size, num_layers, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.pos_encoder = PositionalEncoding(embed_size)
        encoder_layers = nn.TransformerEncoderLayer(embed_size, num_heads, hidden_size, dropout)
        self.transformer = nn.TransformerEncoder(encoder_layers, num_layers)
        self.fc = nn.Linear(embed_size, vocab_size)
    def forward(self, src, src_mask=None):
        src = self.embedding(src) * math.sqrt(src.size(-1))  # scale by sqrt(embed_size)
        src = self.pos_encoder(src)
        output = self.transformer(src, src_mask)
        output = self.fc(output)
        return output

class TextDataset(Dataset):
    def __init__(self, text, vocab, sequence_length):
        self.vocab = vocab
        self.sequence_length = sequence_length
        self.data = self.tokenize_and_encode(text)
    def tokenize_and_encode(self, text):
        tokens = text.split()
        return [self.vocab.stoi[token] for token in tokens if token in self.vocab.stoi]
    def __len__(self):
        return len(self.data) - self.sequence_length
    def __getitem__(self, idx):
        x = self.data[idx:idx + self.sequence_length]
        y = self.data[idx + 1:idx + 1 + self.sequence_length]
        return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)

sequence_length = 10
batch_size = 100
dataset = TextDataset(corpus, vocab, sequence_length)
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
vocab_size = len(vocab.stoi)
embed_size = 50  # Can be tuned
num_heads = 2  # Number of attention heads
hidden_size = 100  # Hidden layer size in feedforward network
num_layers = 88  # Number of Transformer layers
dropout = 0.1
num_epochs = 100  # Adjust based on performance
learning_rate = 0.001
model = TransformerModel(vocab_size, embed_size, num_heads, hidden_size, num_layers, dropout)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    for batch in train_loader:
        inputs, targets = batch
        inputs = inputs.permute(1, 0)  # (batch_size, sequence_length) -> (sequence_length, batch_size)
        targets = targets.permute(1, 0)
        outputs = model(inputs)

        # Instead of view(), use reshape()
        loss = criterion(outputs.reshape(-1, vocab_size), targets.reshape(-1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f'Epoch {epoch+1}, Loss: {loss.item()}')

torch.save(model.state_dict(), 'transformer_model_ai.pth')

def generate_text(model, start_text, max_length=100):
    model.eval()
    input = torch.tensor([[vocab.stoi[start_text]]]).permute(1, 0)  # Convert start_text to input tensor
    result = [start_text]
    for _ in range(max_length):
        output = model(input)
        prob = nn.functional.softmax(output[-1, 0], dim=0).data
        next_word = torch.multinomial(prob, 1).item()
        result.append(vocab.itos[next_word])
        input = torch.cat([input, torch.tensor([[next_word]])], dim=0)
    return ' '.join(result)
start_text = 'AI'
generated_text = generate_text(model, start_text, max_length=100)
print(generated_text)

About

Visualization Python Machine Learning, Deep Learning, Reinforcement Learning

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors