Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 165 additions & 12 deletions src/reconcile/provisioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,17 @@ async fn load_live_policies(
return Ok(BTreeMap::new());
}

client
let mut policies = client
.list_canned_policies()
.await
.map_err(|error| format!("failed to list RustFS canned policies: {error}"))
.map_err(|error| format!("failed to list RustFS canned policies: {error}"))?;

for (name, document) in &mut policies {
*document = normalize_policy_document(document)
.map_err(|error| format!("failed to normalize live RustFS policy '{name}': {error}"))?;
}

Ok(policies)
}

async fn reconcile_policies(
Expand Down Expand Up @@ -454,6 +461,13 @@ async fn reconcile_policy(
}
}
}
Some(_) if live_hash == desired_hash => run.item(
previous,
&policy.name,
ProvisioningItemState::Ready,
Reason::ProvisioningConfigured,
"RustFS policy matches spec",
),
Some(_) => run.item(
previous,
&policy.name,
Expand Down Expand Up @@ -487,7 +501,10 @@ async fn reconcile_policy(

item.desired_hash = Some(desired_hash);
if item.last_applied_hash.is_none() && item.state == ProvisioningItemState::Ready.as_str() {
item.last_applied_hash = item.desired_hash.clone();
item.last_applied_hash = live_policies
.get(&policy.name)
.map(|live_document| hash_document(live_document))
.or_else(|| item.desired_hash.clone());
}
if item.last_applied_hash.is_none() {
item.last_applied_hash = previous.and_then(|item| item.last_applied_hash.clone());
Expand Down Expand Up @@ -547,7 +564,7 @@ async fn load_policy_document(
)
})?;

canonical_json_document(raw).map_err(|message| (Reason::PolicyApplyFailed, message))
normalize_policy_document(raw).map_err(|message| (Reason::PolicyApplyFailed, message))
}

async fn apply_policy(
Expand All @@ -565,7 +582,7 @@ async fn apply_policy(
.get_canned_policy(name)
.await
.map_err(|error| format!("failed to read RustFS policy '{name}' after apply: {error}"))?;
let live_document = canonical_json_document(&live_document)?;
let live_document = normalize_policy_document(&live_document)?;
let live_hash = hash_document(&live_document);
live_policies.insert(name.to_string(), live_document);
Ok(live_hash)
Expand Down Expand Up @@ -1031,11 +1048,99 @@ fn is_ipv4_address_like(value: &str) -> bool {
}) && parts.next().is_none()
}

fn canonical_json_document(document: &str) -> Result<String, String> {
fn normalize_policy_document(document: &str) -> Result<String, String> {
let value = serde_json::from_str::<Value>(document)
.map_err(|error| format!("policy document must be valid JSON: {error}"))?;
serde_json::to_string(&value)
.map_err(|error| format!("failed to canonicalize policy document: {error}"))
let normalized = normalize_policy_value(value);
serde_json::to_string(&normalized)
.map_err(|error| format!("failed to normalize policy document: {error}"))
}

fn normalize_policy_value(value: Value) -> Value {
let Some(object) = value.as_object() else {
return value;
};

if !object.contains_key("Statement") {
return value;
}

let mut normalized = serde_json::Map::new();
if let Some(version) = object.get("Version") {
normalized.insert("Version".to_string(), version.clone());
}
if let Some(statements) = object.get("Statement").and_then(Value::as_array) {
let mut normalized_statements = statements
.iter()
.map(normalize_policy_statement)
.collect::<Vec<_>>();
normalized_statements.sort_by_key(statement_sort_key);
normalized.insert("Statement".to_string(), Value::Array(normalized_statements));
}

Value::Object(normalized)
}

fn normalize_policy_statement(statement: &Value) -> Value {
let Some(object) = statement.as_object() else {
return statement.clone();
};

let mut normalized = serde_json::Map::new();
if let Some(effect) = object.get("Effect") {
normalized.insert("Effect".to_string(), effect.clone());
}
if let Some(action) = object.get("Action") {
normalized.insert(
"Action".to_string(),
normalize_string_or_string_array(action),
);
}
if let Some(resource) = object.get("Resource") {
normalized.insert(
"Resource".to_string(),
normalize_string_or_string_array(resource),
);
}
if let Some(sid) = object
.get("Sid")
.and_then(Value::as_str)
.filter(|sid| !sid.is_empty())
{
normalized.insert("Sid".to_string(), Value::String(sid.to_string()));
}
if let Some(condition) = object
.get("Condition")
.filter(|condition| is_non_empty_json_object(condition))
{
normalized.insert("Condition".to_string(), condition.clone());
}

Value::Object(normalized)
}

fn normalize_string_or_string_array(value: &Value) -> Value {
match value {
Value::String(action) => Value::String(action.clone()),
Value::Array(items) => {
let mut normalized = items.clone();
normalized.sort_by(|left, right| {
left.as_str()
.unwrap_or_default()
.cmp(right.as_str().unwrap_or_default())
});
Value::Array(normalized)
}
_ => value.clone(),
}
}

fn is_non_empty_json_object(value: &Value) -> bool {
value.as_object().is_some_and(|object| !object.is_empty())
}

fn statement_sort_key(statement: &Value) -> String {
normalize_policy_statement(statement).to_string()
}

fn hash_document(document: &str) -> String {
Expand Down Expand Up @@ -1121,16 +1226,64 @@ mod tests {

#[test]
fn policy_document_hash_uses_compact_json() {
let canonical = canonical_json_document(
let normalized = normalize_policy_document(
r#"{
"Version": "2012-10-17",
"Statement": []
}"#,
)
.expect("policy should canonicalize");
.expect("policy should normalize");

assert_eq!(normalized, r#"{"Statement":[],"Version":"2012-10-17"}"#);
assert!(hash_document(&normalized).starts_with("sha256:"));
}

#[test]
fn rustfs_server_policy_matches_configmap_spec() {
let spec = r#"{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": ["arn:aws:s3:::rfsd01-data"]
},
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:DeleteObject", "s3:PutObject"],
"Resource": ["arn:aws:s3:::rfsd01-data/*"]
}
]
}"#;
let server = r#"{
"ID": "",
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": ["arn:aws:s3:::rfsd01-data"],
"Condition": {}
},
{
"Sid": "",
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:DeleteObject", "s3:GetObject"],
"Resource": ["arn:aws:s3:::rfsd01-data/*"],
"Condition": {}
}
]
}"#;

let spec_normalized = normalize_policy_document(spec).expect("spec should normalize");
let server_normalized = normalize_policy_document(server).expect("server should normalize");

assert_eq!(canonical, r#"{"Statement":[],"Version":"2012-10-17"}"#);
assert!(hash_document(&canonical).starts_with("sha256:"));
assert_eq!(spec_normalized, server_normalized);
assert_eq!(
hash_document(&spec_normalized),
hash_document(&server_normalized)
);
}

#[test]
Expand Down