Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 61 additions & 23 deletions dotCMS/src/main/java/com/dotcms/cost/RequestCostApiImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
import com.dotcms.auth.providers.jwt.services.JsonWebTokenAuthCredentialProcessorImpl;
import com.dotcms.cdi.CDIUtils;
import com.dotcms.cost.RequestPrices.Price;
import com.dotcms.enterprise.cluster.ClusterFactory;
import com.dotmarketing.util.Config;
import com.dotmarketing.util.ConfigUtils;
import com.dotmarketing.util.Logger;
import com.dotmarketing.util.UtilMethods;
import com.liferay.portal.model.User;
import com.liferay.portal.util.PortalUtil;
import io.vavr.control.Try;
import java.lang.reflect.Method;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -43,6 +49,7 @@ public class RequestCostApiImpl implements RequestCostApi {
private double requestCostDenominator = 1.0d;

private final LeakyTokenBucket bucket = CDIUtils.getBeanThrows(LeakyTokenBucket.class);
private final RequestCostPublisher publisher = CDIUtils.getBeanThrows(RequestCostPublisher.class);

public RequestCostApiImpl() {
enableForTests = Optional.empty();
Expand All @@ -56,7 +63,10 @@ public RequestCostApiImpl(Boolean enable) {
@PostConstruct
public void init() {
this.requestCostTimeWindowSeconds = Config.getIntProperty("REQUEST_COST_TIME_WINDOW_SECONDS", 60);
this.requestCostDenominator = Config.getFloatProperty("REQUEST_COST_DENOMINATOR", 1.0f);
// Clamp to >= 1.0 so a misconfigured 0 doesn't produce Infinity/NaN in the snapshot —
// those serialize as JSON-invalid literals and break strict parsers on the collector side.
this.requestCostDenominator = Math.max(1.0d,
Config.getFloatProperty("REQUEST_COST_DENOMINATOR", 1.0f));

this.scheduler = Executors.newSingleThreadScheduledExecutor(
r -> {
Expand All @@ -71,42 +81,70 @@ public void init() {
}

private volatile boolean skipZeroRequests = false;

private static String nullSafe(final String value) {
return UtilMethods.isSet(value) ? value : "unknown";
}

private void logRequestCost() {
try {
if (!isAccountingEnabled()) {
return;
}

long totalRequestsForDuration = this.requestCountForWindow.sumThenReset();
double totalCostForDuration = this.requestCostForWindow.sumThenReset() / getRequestCostDenominator();
double costPerRequestForDuration = totalRequestsForDuration == 0
// The four counter reads below are not atomic relative to each other. Increments
// landing between them are counted in the next window's snapshot but already in
// the lifetime totals — Σ(window) can briefly trail lifetime by a few requests.
// Intentional: observational telemetry, atomic snapshot would need a lock.
final long totalRequestsForDuration = this.requestCountForWindow.sumThenReset();
final double totalCostForDuration = this.requestCostForWindow.sumThenReset() / getRequestCostDenominator();
final double costPerRequestForDuration = totalRequestsForDuration == 0
? 0
: totalCostForDuration / totalRequestsForDuration;

if (totalRequestsForDuration == 0 && skipZeroRequests) {
return;
}
skipZeroRequests = totalRequestsForDuration == 0;

long totalRequestsTotal = requestCountTotal.longValue();
double totalCostTotal = requestCostTotal.longValue() / getRequestCostDenominator();
double costPerRequestTotal = totalRequestsTotal == 0
final long totalRequestsTotal = requestCountTotal.longValue();
final double totalCostTotal = requestCostTotal.longValue() / getRequestCostDenominator();
final double costPerRequestTotal = totalRequestsTotal == 0
? 0
: totalCostTotal / totalRequestsTotal;

// The log line is throttled on consecutive idle windows so dev consoles stay quiet.
// The publisher is NOT throttled — telemetry must emit a point every tick so an idle
// cluster and a downed cluster are distinguishable on the receiving side.
final boolean idleWindow = totalRequestsForDuration == 0;
final boolean suppressLog = idleWindow && skipZeroRequests;
skipZeroRequests = idleWindow;

if (!suppressLog) {
Logger.info("REQUEST TOKEN MONITOR >",
String.format(
"Last %ds: Reqs: %d, Tokens: %.2f, Avg Tokens: %.2f | Totals: Reqs: %d, Tokens: %.2f, Avg Tokens: %.2f",
requestCostTimeWindowSeconds,
totalRequestsForDuration,
totalCostForDuration,
costPerRequestForDuration,
totalRequestsTotal,
totalCostTotal,
costPerRequestTotal));
}

Logger.info("REQUEST COST MONITOR >",
String.format(
"Last %ds: Reqs: %d, Cost: %.2f, Avg Cost: %.2f | Totals: Reqs: %d, Cost: %.2f, Avg Cost: %.2f",
requestCostTimeWindowSeconds,
totalRequestsForDuration,
totalCostForDuration,
costPerRequestForDuration,
totalRequestsTotal,
totalCostTotal,
costPerRequestTotal));
if (publisher.isEnabled()) {
publisher.publish(new RequestCostSnapshot(
// Try.getOrElse only fires on throw — also coalesce null returns since
// these lookups can transiently return null during early startup.
nullSafe(Try.of(ClusterFactory::getClusterId).getOrNull()),
nullSafe(Try.of(ConfigUtils::getServerId).getOrNull()),
Instant.now().truncatedTo(ChronoUnit.SECONDS).toString(),
requestCostTimeWindowSeconds,
totalRequestsForDuration,
totalCostForDuration,
costPerRequestForDuration,
totalRequestsTotal,
totalCostTotal,
costPerRequestTotal));
}
} catch (Exception e) {
Logger.warnAndDebug(this.getClass(), "Error logging request cost:" + e.getMessage(), e);
Logger.warnAndDebug(this.getClass(), "Error logging request tokens:" + e.getMessage(), e);
}
}

Expand Down
172 changes: 172 additions & 0 deletions dotCMS/src/main/java/com/dotcms/cost/RequestCostPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package com.dotcms.cost;

import com.dotcms.concurrent.DotConcurrentFactory;
import com.dotcms.http.CircuitBreakerUrl;
import com.dotmarketing.util.Config;
import com.dotmarketing.util.Logger;
import com.dotmarketing.util.UtilMethods;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.enterprise.context.ApplicationScoped;

/**
* Ships {@link RequestCostSnapshot} payloads to an external REST endpoint on each tick of
* {@link RequestCostApiImpl#logRequestCost()}.
*
* <p>Activates implicitly when both {@code REQUEST_COST_PUSH_URL} and
* {@code REQUEST_COST_PUSH_TOKEN} are set. Failures are rate-limited warnings and the snapshot
* is dropped — this is observational telemetry, not durable accounting.</p>
*
* <p>Config keys:
* <ul>
* <li>{@code REQUEST_COST_PUSH_URL} (presence with token activates the publisher)</li>
* <li>{@code REQUEST_COST_PUSH_TOKEN} (bearer token; presence with url activates the publisher)</li>
* <li>{@code REQUEST_COST_PUSH_TIMEOUT_MS} (default 5000)</li>
* </ul>
* </p>
*/
@ApplicationScoped
public class RequestCostPublisher {

private static final ObjectMapper MAPPER = new ObjectMapper();
private static final int FAIL_LOG_INTERVAL_MS = 10 * 60 * 1000;
Comment thread
wezell marked this conversation as resolved.
private final AtomicBoolean httpSchemeWarned = new AtomicBoolean(false);

public boolean isEnabled() {
// Use the sanitized token in the gate so a whitespace-or-CRLF-only token
// doesn't activate the publisher and cause an unauthenticated POST.
return UtilMethods.isSet(getUrl()) && UtilMethods.isSet(sanitizeHeaderValue(getToken()));
}

private String getUrl() {
return Config.getStringProperty("REQUEST_COST_PUSH_URL", null);
}

private String getToken() {
return Config.getStringProperty("REQUEST_COST_PUSH_TOKEN", null);
}

private long getTimeoutMs() {
return Config.getLongProperty("REQUEST_COST_PUSH_TIMEOUT_MS", 5_000L);
}

/**
* Submits the snapshot to {@link DotConcurrentFactory}'s default submitter so the HTTP POST
* never blocks the request-cost monitor scheduler. Returns immediately. Transport errors are
* logged at most once every 10 minutes and the snapshot is dropped.
*/
public void publish(final RequestCostSnapshot snapshot) {
if (!isEnabled()) {
return;
}
DotConcurrentFactory.getInstance().getSubmitter().submit(() -> post(snapshot));
}

private void post(final RequestCostSnapshot snapshot) {
final String url = getUrl();
final String token = sanitizeHeaderValue(getToken());
// Re-check both pieces — config may have been cleared between submit and execute, and
// posting without an Authorization header is a worse failure mode than not posting at all.
if (!UtilMethods.isSet(url) || !UtilMethods.isSet(token)) {
return;
}
warnOncePlainHttp(url);
try {
// CircuitBreakerUrl sniffs the rawData and applies Content-Type automatically when
// the payload starts with '{', so we don't set it explicitly (would risk a duplicate
// header on some Apache HttpClient versions).
final Map<String, String> headers = new HashMap<>();
headers.put("Authorization", "Bearer " + token);
Comment thread
wezell marked this conversation as resolved.

final CircuitBreakerUrl call = CircuitBreakerUrl.builder()
.setMethod(CircuitBreakerUrl.Method.POST)
.setUrl(url)
.setHeaders(headers)
.setRawData(MAPPER.writeValueAsString(snapshot))
.setTimeout(getTimeoutMs())
.setThrowWhenError(false)
.build();

call.doString();
if (!call.isProcessed()) {
Logger.warnEvery(this.getClass(),
"REQUEST_COST_PUSH_FAIL_TRANSPORT",
"Request cost push to " + sanitizeUrlForLog(url) + " did not complete (circuit open or transport error)",
FAIL_LOG_INTERVAL_MS);
return;
}
final int response = call.response();
if (!CircuitBreakerUrl.isSuccessResponse(response)) {
Logger.warnEvery(this.getClass(),
"REQUEST_COST_PUSH_FAIL_HTTP",
"Request cost push to " + sanitizeUrlForLog(url) + " returned HTTP " + response,
FAIL_LOG_INTERVAL_MS);
}
Comment thread
wezell marked this conversation as resolved.
} catch (final Exception e) {
Logger.warnEvery(this.getClass(),
"REQUEST_COST_PUSH_ERR_EXCEPTION",
"Request cost push to " + sanitizeUrlForLog(url) + " failed: " + e.getMessage(),
FAIL_LOG_INTERVAL_MS);
Comment thread
wezell marked this conversation as resolved.
Logger.debug(this.getClass(),
"Request cost push to " + sanitizeUrlForLog(url) + " failed with exception",
e);
}
}

/**
* Warns once (per process lifetime) if the configured URL uses the plain {@code http://}
* scheme — the bearer token would otherwise traverse the wire in cleartext. Stays a warning
* rather than a refusal so a misconfiguration doesn't silently drop telemetry.
*/
private void warnOncePlainHttp(final String url) {
if (httpSchemeWarned.get()) {
return;
}
try {
final String scheme = URI.create(url).getScheme();
if (scheme != null && scheme.equalsIgnoreCase("http")
Comment thread
wezell marked this conversation as resolved.
&& httpSchemeWarned.compareAndSet(false, true)) {
Logger.warn(this.getClass(),
"REQUEST_COST_PUSH_URL uses plain http:// — bearer token will be sent "
+ "in cleartext. Use https:// for any non-loopback destination.");
}
} catch (final Exception ignored) {
// sanitizer is best-effort
}
}

/**
* Strips CR/LF and surrounding whitespace from a header value. A misconfigured
* {@code REQUEST_COST_PUSH_TOKEN} containing CRLF would otherwise enable HTTP header
* injection — low-risk since only operators set this, but cheap to harden.
*/
static String sanitizeHeaderValue(final String value) {
return value == null ? null : value.replace("\r", "").replace("\n", "").trim();
}

/**
* Strips RFC-3986 userinfo (the {@code user:pass@} segment) from a URL before logging so a
* misconfigured {@code REQUEST_COST_PUSH_URL=https://user:secret@host/...} doesn't leak the
* credential into every failure log line.
*/
static String sanitizeUrlForLog(final String url) {
if (!UtilMethods.isSet(url)) {
return url;
}
try {
final URI uri = URI.create(url);
if (uri.getUserInfo() == null) {
return url;
}
final URI safe = new URI(
uri.getScheme(), null, uri.getHost(), uri.getPort(),
uri.getPath(), uri.getQuery(), uri.getFragment());
return safe.toString();
} catch (final Exception ignored) {
return "<unparseable-url>";
}
}
}
49 changes: 49 additions & 0 deletions dotCMS/src/main/java/com/dotcms/cost/RequestCostSnapshot.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.dotcms.cost;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;

/**
* Immutable payload shipped to the external request-cost (a.k.a. request token) collection
* endpoint on each scheduled tick. One snapshot = one point in the time series.
*/
@JsonAutoDetect(
fieldVisibility = Visibility.PUBLIC_ONLY,
getterVisibility = Visibility.NONE,
isGetterVisibility = Visibility.NONE)
public final class RequestCostSnapshot {

Comment thread
wezell marked this conversation as resolved.
public final String clusterId;
public final String serverId;
public final String timestamp;
public final int windowSeconds;
public final long windowRequests;
public final double windowTokens;
public final double windowAvgTokensPerRequest;
public final long lifetimeRequests;
public final double lifetimeTokens;
public final double lifetimeAvgTokensPerRequest;

public RequestCostSnapshot(
final String clusterId,
final String serverId,
final String timestamp,
final int windowSeconds,
final long windowRequests,
final double windowTokens,
final double windowAvgTokensPerRequest,
final long lifetimeRequests,
final double lifetimeTokens,
final double lifetimeAvgTokensPerRequest) {
this.clusterId = clusterId;
this.serverId = serverId;
this.timestamp = timestamp;
this.windowSeconds = windowSeconds;
this.windowRequests = windowRequests;
this.windowTokens = windowTokens;
this.windowAvgTokensPerRequest = windowAvgTokensPerRequest;
this.lifetimeRequests = lifetimeRequests;
this.lifetimeTokens = lifetimeTokens;
this.lifetimeAvgTokensPerRequest = lifetimeAvgTokensPerRequest;
}
}
Loading
Loading