Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions crates/gitlawb-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod ipfs_pin;
mod operator;
mod p2p;
mod pinata;
mod rate_limit;
mod server;
mod state;
mod sync;
Expand Down Expand Up @@ -147,6 +148,8 @@ async fn main() -> Result<()> {
let repo_store =
git::repo_store::RepoStore::new(config.repos_dir.clone(), tigris, db.pool().clone());

let rate_limiter = rate_limit::RateLimiter::new(10, std::time::Duration::from_secs(3600));

let state = AppState {
config: Arc::new(config.clone()),
db,
Expand All @@ -159,8 +162,20 @@ async fn main() -> Result<()> {
graphql_schema,
machine_id,
repo_store,
rate_limiter,
};

// Periodic cleanup of expired rate limit entries
{
let rl = state.rate_limiter.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(300)).await;
rl.cleanup().await;
}
});
}

let router = server::build_router(state.clone());
let listener = TcpListener::bind(config.bind_addr())
.await
Expand Down
129 changes: 129 additions & 0 deletions crates/gitlawb-node/src/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

use axum::extract::Request;
use axum::http::StatusCode;
use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
use tokio::sync::Mutex;

use crate::auth::AuthenticatedDid;

#[derive(Clone)]
struct Window {
timestamps: Vec<Instant>,
}

#[derive(Clone)]
pub struct RateLimiter {
state: Arc<Mutex<HashMap<String, Window>>>,
max_requests: usize,
window: Duration,
}

impl RateLimiter {
pub fn new(max_requests: usize, window: Duration) -> Self {
Self {
state: Arc::new(Mutex::new(HashMap::new())),
max_requests,
window,
}
}

async fn check(&self, key: &str) -> bool {
let now = Instant::now();
let mut state = self.state.lock().await;
let window = state.entry(key.to_string()).or_insert_with(|| Window {
timestamps: Vec::new(),
});
window
.timestamps
.retain(|t| now.duration_since(*t) < self.window);
if window.timestamps.len() >= self.max_requests {
return false;
}
window.timestamps.push(now);
true
}

pub async fn cleanup(&self) {
let now = Instant::now();
let mut state = self.state.lock().await;
state.retain(|_, w| {
w.timestamps
.retain(|t| now.duration_since(*t) < self.window);
!w.timestamps.is_empty()
});
}
}

pub async fn rate_limit_by_did(request: Request, next: Next) -> Response {
let limiter = request.extensions().get::<RateLimiter>().cloned();

let did = request
.extensions()
.get::<AuthenticatedDid>()
.map(|a| a.0.clone());

if let (Some(limiter), Some(did)) = (limiter, did) {
if !limiter.check(&did).await {
return (
StatusCode::TOO_MANY_REQUESTS,
[("retry-after", "60")],
"rate limit exceeded — try again later",
)
.into_response();
}
}

next.run(request).await
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn allows_within_limit() {
let limiter = RateLimiter::new(3, Duration::from_secs(60));
assert!(limiter.check("did:key:test1").await);
assert!(limiter.check("did:key:test1").await);
assert!(limiter.check("did:key:test1").await);
}

#[tokio::test]
async fn blocks_over_limit() {
let limiter = RateLimiter::new(2, Duration::from_secs(60));
assert!(limiter.check("did:key:test2").await);
assert!(limiter.check("did:key:test2").await);
assert!(!limiter.check("did:key:test2").await);
}

#[tokio::test]
async fn separate_keys_independent() {
let limiter = RateLimiter::new(1, Duration::from_secs(60));
assert!(limiter.check("did:key:alice").await);
assert!(limiter.check("did:key:bob").await);
assert!(!limiter.check("did:key:alice").await);
}

#[tokio::test]
async fn window_expires() {
let limiter = RateLimiter::new(1, Duration::from_millis(50));
assert!(limiter.check("did:key:test3").await);
assert!(!limiter.check("did:key:test3").await);
tokio::time::sleep(Duration::from_millis(60)).await;
assert!(limiter.check("did:key:test3").await);
}

#[tokio::test]
async fn cleanup_removes_expired() {
let limiter = RateLimiter::new(1, Duration::from_millis(50));
limiter.check("did:key:stale").await;
tokio::time::sleep(Duration::from_millis(60)).await;
limiter.cleanup().await;
let state = limiter.state.lock().await;
assert!(state.is_empty());
}
}
25 changes: 17 additions & 8 deletions crates/gitlawb-node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::api::{
pulls, register, replicas, repos, resolve, stars, tasks, webhooks,
};
use crate::auth;
use crate::rate_limit;
use crate::state::AppState;

async fn graphql_handler(State(state): State<AppState>, req: GraphQLRequest) -> GraphQLResponse {
Expand Down Expand Up @@ -50,11 +51,23 @@ pub fn build_router(state: AppState) -> Router {
.route("/api/v1/tasks", get(tasks::list_tasks))
.route("/api/v1/tasks/{id}", get(tasks::get_task));

// ── Write routes — require HTTP Signature ──────────────────────────────
let write_routes = Router::new()
// ── Rate-limited creation routes — require HTTP Signature + per-DID throttle
let limiter = state.rate_limiter.clone();
let creation_routes = Router::new()
.route("/api/v1/repos", post(repos::create_repo))
.route("/api/register", post(register::register))
.route("/api/v1/repos/{owner}/{repo}/fork", post(repos::fork_repo))
.route(
"/api/v1/repos/{owner}/{repo}/issues",
post(issues::create_issue),
)
.route("/api/v1/repos/{owner}/{repo}/pulls", post(pulls::create_pr))
.layer(middleware::from_fn(rate_limit::rate_limit_by_did))
.layer(axum::Extension(limiter))
.layer(middleware::from_fn(auth::require_signature));

// ── Write routes — require HTTP Signature (no rate limit) ─────────────
let write_routes = Router::new()
.route(
"/api/v1/repos/{owner}/{repo}/pulls/{number}/merge",
post(pulls::merge_pr),
Expand Down Expand Up @@ -103,7 +116,6 @@ pub fn build_router(state: AppState) -> Router {
"/api/v1/repos/{owner}/{repo}/replicas",
axum::routing::delete(replicas::unregister_replica),
)
.route("/api/v1/repos/{owner}/{repo}/fork", post(repos::fork_repo))
.route(
"/api/v1/repos/{owner}/{repo}/labels",
post(labels::add_label),
Expand Down Expand Up @@ -175,12 +187,8 @@ pub fn build_router(state: AppState) -> Router {
get(bounties::agent_bounty_stats),
);

// ── Issue routes ──────────────────────────────────────────────────────
// ── Issue routes (write — require HTTP Signature, no rate limit) ─────
let issue_write_routes = Router::new()
.route(
"/api/v1/repos/{owner}/{repo}/issues",
post(issues::create_issue),
)
.route(
"/api/v1/repos/{owner}/{repo}/issues/{id}/close",
post(issues::close_issue),
Expand Down Expand Up @@ -316,6 +324,7 @@ pub fn build_router(state: AppState) -> Router {
.merge(task_read_routes)
.merge(bounty_write_routes)
.merge(bounty_read_routes)
.merge(creation_routes)
.merge(write_routes)
.merge(git_write_routes)
.merge(git_read_routes)
Expand Down
3 changes: 3 additions & 0 deletions crates/gitlawb-node/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::config::Config;
use crate::db::Db;
use crate::git::repo_store::RepoStore;
use crate::p2p::P2pHandle;
use crate::rate_limit::RateLimiter;

#[derive(Clone, Debug)]
pub struct RefUpdateBroadcast {
Expand Down Expand Up @@ -48,4 +49,6 @@ pub struct AppState {
pub machine_id: Option<String>,
/// Centralized repo storage: local disk cache + optional Tigris backend
pub repo_store: RepoStore,
/// Per-DID rate limiter for creation endpoints (repos, issues, PRs)
pub rate_limiter: RateLimiter,
}
Loading