Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,32 +44,36 @@ import java.util.concurrent.TimeUnit
* [VirtualThreadAsyncHttpClient] for the `close()` path so shutdown semantics are unchanged.
*/
internal class MdcAwareExecutor(private val delegate: ExecutorService) : ExecutorService by delegate {
private fun MdcSnapshot.wrap(command: Runnable): Runnable = Runnable { withMdc { command.run() } }

private fun <T> MdcSnapshot.wrap(task: Callable<T>): Callable<T> = Callable { withMdc { task.call() } }

override fun execute(command: Runnable) {
val snapshot = MdcSnapshot.capture()
delegate.execute { snapshot.withMdc { command.run() } }
delegate.execute(snapshot.wrap(command))
}

override fun <T : Any?> submit(task: Callable<T>): Future<T> {
val snapshot = MdcSnapshot.capture()
return delegate.submit(Callable { snapshot.withMdc { task.call() } })
return delegate.submit(snapshot.wrap(task))
}

override fun submit(task: Runnable): Future<*> {
val snapshot = MdcSnapshot.capture()
return delegate.submit { snapshot.withMdc { task.run() } }
return delegate.submit(snapshot.wrap(task))
}

override fun <T : Any?> submit(
task: Runnable,
result: T,
): Future<T> {
val snapshot = MdcSnapshot.capture()
return delegate.submit({ snapshot.withMdc { task.run() } }, result)
return delegate.submit(snapshot.wrap(task), result)
}

override fun <T : Any?> invokeAll(tasks: MutableCollection<out Callable<T>>): MutableList<Future<T>> {
val snapshot = MdcSnapshot.capture()
return delegate.invokeAll(tasks.map { task -> Callable { snapshot.withMdc { task.call() } } })
return delegate.invokeAll(tasks.map { task -> snapshot.wrap(task) })
}

override fun <T : Any?> invokeAll(
Expand All @@ -78,12 +82,12 @@ internal class MdcAwareExecutor(private val delegate: ExecutorService) : Executo
unit: TimeUnit,
): MutableList<Future<T>> {
val snapshot = MdcSnapshot.capture()
return delegate.invokeAll(tasks.map { task -> Callable { snapshot.withMdc { task.call() } } }, timeout, unit)
return delegate.invokeAll(tasks.map { task -> snapshot.wrap(task) }, timeout, unit)
}

override fun <T : Any?> invokeAny(tasks: MutableCollection<out Callable<T>>): T {
val snapshot = MdcSnapshot.capture()
return delegate.invokeAny(tasks.map { task -> Callable { snapshot.withMdc { task.call() } } })
return delegate.invokeAny(tasks.map { task -> snapshot.wrap(task) })
}

override fun <T : Any?> invokeAny(
Expand All @@ -92,6 +96,6 @@ internal class MdcAwareExecutor(private val delegate: ExecutorService) : Executo
unit: TimeUnit,
): T {
val snapshot = MdcSnapshot.capture()
return delegate.invokeAny(tasks.map { task -> Callable { snapshot.withMdc { task.call() } } }, timeout, unit)
return delegate.invokeAny(tasks.map { task -> snapshot.wrap(task) }, timeout, unit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public class Configuration internal constructor(
// ISO-8601 path: `PT5S`, `P1D`, etc. Reject negative durations (e.g. `PT-5S`) for the
// same reason the shorthand path does below — downstream consumers (Clock.sleep,
// Futures.delay) assume a non-negative duration and throw on a negative one.
if (Character.toUpperCase(raw[0]) == 'P') {
if (raw[0].uppercaseChar() == 'P') {
return try {
val d = Duration.parse(raw)
if (d.isNegative) null else d
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,15 @@ public object AuthChallengeParser {
}
}

private val TOKEN_PUNCTUATION: Set<Char> = "!#$%&'*+-.^_`|~".toSet()

private val TOKEN68_PUNCTUATION: Set<Char> = "-._~+/".toSet()

/** RFC 7230 token char: ALPHA / DIGIT / one of the punctuation set. */
private fun isTokenChar(c: Char): Boolean =
(c in 'a'..'z') || (c in 'A'..'Z') || (c in '0'..'9') ||
c == '!' || c == '#' || c == '$' || c == '%' || c == '&' ||
c == '\'' || c == '*' || c == '+' || c == '-' || c == '.' ||
c == '^' || c == '_' || c == '`' || c == '|' || c == '~'
(c in 'a'..'z') || (c in 'A'..'Z') || (c in '0'..'9') || c in TOKEN_PUNCTUATION

/** RFC 7235 token68 char (excluding the trailing "=" pad, handled separately). */
private fun isToken68Char(c: Char): Boolean =
(c in 'a'..'z') || (c in 'A'..'Z') || (c in '0'..'9') ||
c == '-' || c == '.' || c == '_' || c == '~' || c == '+' || c == '/'
(c in 'a'..'z') || (c in 'A'..'Z') || (c in '0'..'9') || c in TOKEN68_PUNCTUATION
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,35 +96,18 @@ public class DigestChallengeHandler
* - it carries a `realm` and `nonce`.
*
* Ordering: we scan [preferredAlgorithms] first, then within that scan the
* challenges. This ensures SHA-256 wins over MD5 even when MD5 appears first
* candidates. This ensures SHA-256 wins over MD5 even when MD5 appears first
* in the header.
*
* Each `continue` skips a challenge that fails a specific validation gate
* (scheme, realm, nonce, qop, algorithm). Collapsing to a single composite
* predicate would obscure which validation rejected the candidate when
* debugging Digest interop.
* Per-challenge validation is delegated to [toCandidate]; this scan only applies
* the algorithm-priority ordering, independent of the order challenges arrived in.
*/
@Suppress("LoopWithTooManyJumpStatements")
private fun pickChallenge(
challenges: List<AuthenticateChallenge>,
): Pair<AuthenticateChallenge, DigestAlgorithm>? {
// Find all challenges that match Digest with a satisfiable qop/realm/nonce
// — we'll filter by algorithm preference below.
val candidates = ArrayList<Pair<AuthenticateChallenge, DigestAlgorithm>>(challenges.size)
for (challenge in challenges) {
if (!challenge.scheme.equals("Digest", ignoreCase = true)) continue
if (challenge.parameters["realm"] == null) continue
if (challenge.parameters["nonce"] == null) continue
if (!qopSupportsAuth(challenge.parameters["qop"])) continue
val algorithmName = challenge.parameters["algorithm"]
val algorithm =
if (algorithmName == null) {
DigestAlgorithm.MD5 // RFC 7616 §3.3: MD5 is the default when omitted.
} else {
DigestAlgorithm.fromString(algorithmName) ?: continue
}
candidates.add(challenge to algorithm)
}
val candidates = challenges.mapNotNull(::toCandidate)
// Pick the candidate whose algorithm appears earliest in our preference list.
// Returns null when no candidate matches any preferred algorithm.
for (preferred in preferredAlgorithms) {
Expand All @@ -133,6 +116,30 @@ public class DigestChallengeHandler
return null
}

/**
* Maps a single challenge to a satisfiable (challenge, algorithm) candidate, or null
* when it is not usable: it must be a `Digest` challenge carrying `realm`, `nonce`, and
* a supported `qop`. Its algorithm is then resolved — an absent `algorithm` parameter
* defaults to MD5 (RFC 7616 §3.3), and an unsupported one is declined.
*/
private fun toCandidate(challenge: AuthenticateChallenge): Pair<AuthenticateChallenge, DigestAlgorithm>? {
val isInvalidChallenge =
!challenge.scheme.equals("Digest", ignoreCase = true) ||
challenge.parameters["realm"] == null ||
challenge.parameters["nonce"] == null ||
!qopSupportsAuth(challenge.parameters["qop"])

if (isInvalidChallenge) {
return null
}

val algorithmName = challenge.parameters["algorithm"] ?: return challenge to DigestAlgorithm.MD5

return DigestAlgorithm.fromString(algorithmName)?.let {
challenge to it
}
}

/**
* Builds the `Authorization: Digest ...` header value for a single challenge.
* Follows RFC 7616 §3.4.6 — HA1, HA2, response — plus the standard parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ public data class MediaType private constructor(
if (value.isNotEmpty() && value.all(::isTokenChar)) {
return value
}
val sb = StringBuilder(value.length + 2)
sb.append('"')
value.forEach { ch ->
if (ch == '\\' || ch == '"') {
sb.append('\\')
return buildString(value.length + 2) {
append('"')
value.forEach { ch ->
if (ch == '\\' || ch == '"') {
append('\\')
}
append(ch)
}
sb.append(ch)
append('"')
}
sb.append('"')
return sb.toString()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public data class DispatchContext(
* counter to it for the actual key.
*/
private fun deriveCallKey(instrumentationContext: InstrumentationContext): String =
instrumentationContext.traceId.value + ":" + instrumentationContext.spanId.value
"${instrumentationContext.traceId.value}:${instrumentationContext.spanId.value}"

/**
* A dispatch context with a no-op instrumentation context; used when tracing is
Expand All @@ -81,6 +81,6 @@ public data class DispatchContext(
* [DispatchContext]), so every link in the chain is collision-safe by default.
*/
internal fun mintCallKey(instrumentationContext: InstrumentationContext): String =
deriveCallKey(instrumentationContext) + ":" + mintCounter.incrementAndGet()
"${deriveCallKey(instrumentationContext)}:${mintCounter.incrementAndGet()}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ public class AsyncHttpPipelineBuilder(private val httpClient: AsyncHttpClient) {
/** Builds an immutable [AsyncHttpPipeline]. */
public fun build(): AsyncHttpPipeline {
val ordered = steps.flatten()
val array = arrayOfNulls<AsyncHttpStep>(ordered.size)
for ((i, s) in ordered.withIndex()) array[i] = s
@Suppress("UNCHECKED_CAST")
return AsyncHttpPipeline(httpClient, array as Array<AsyncHttpStep>)
return AsyncHttpPipeline(httpClient, Array(ordered.size) { ordered[it] })
}

public companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,10 @@ public class HttpPipelineBuilder(private val httpClient: HttpClient) {
/**
* Builds an immutable [HttpPipeline] in stage order. [Stage.SEND] is reserved for the
* transport and is skipped.
*
* `arrayOfNulls<HttpStep>` then fill — Kotlin's `List.toTypedArray<T>()` is erased to
* `Array<Any?>` at runtime which fails the `Array<HttpStep>` cast.
*/
public fun build(): HttpPipeline {
val ordered = steps.flatten()
val array = arrayOfNulls<HttpStep>(ordered.size)
for ((i, s) in ordered.withIndex()) array[i] = s
@Suppress("UNCHECKED_CAST")
return HttpPipeline(httpClient, array as Array<HttpStep>)
return HttpPipeline(httpClient, Array(ordered.size) { ordered[it] })
}

public companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,9 @@ public open class DefaultRedirectStep
// underlying store may return names in mixed case (`Content-Type`), so lower-case
// before the prefix test. Iterate a snapshot of the keys to avoid concurrent
// modification while mutating the builder.
val toRemove = ArrayList<String>()
for (name in headers.names()) {
if (name.lowercase(Locale.US).startsWith("content-")) toRemove.add(name)
}
for (name in toRemove) builder.remove(name)
headers.names()
.filter { it.lowercase(Locale.US).startsWith("content-") }
.forEach { builder.remove(it) }
return builder.build()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ public class LoggableResponseBody
* otherwise the delegate's reported length (the true length), since the capture is just
* a bounded prefix.
*/
override fun contentLength(): Long =
if (fullyCaptured) captured?.size ?: delegate.contentLength() else delegate.contentLength()
override fun contentLength(): Long = (if (fullyCaptured) captured?.size else null) ?: delegate.contentLength()

/**
* Returns a view of the captured body. Drains (up to the cap) on first call. If the drain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,25 @@ public fun Span.makeCurrentWithLoggingContext(): TracingScope {
MDC.put(MDC_SPAN_ID, spanId)
return TracingScope {
try {
if (prevTraceId == null) MDC.remove(MDC_TRACE_ID) else MDC.put(MDC_TRACE_ID, prevTraceId)
if (prevSpanId == null) MDC.remove(MDC_SPAN_ID) else MDC.put(MDC_SPAN_ID, prevSpanId)
restoreMdc(MDC_TRACE_ID, prevTraceId)
restoreMdc(MDC_SPAN_ID, prevSpanId)
} finally {
inner.close()
}
}
}

/**
* Restores a single MDC [key] to its [previous] value: removes the key when it was previously
* unset, otherwise re-puts the captured value.
*/
private fun restoreMdc(
key: String,
previous: String?,
) {
if (previous == null) MDC.remove(key) else MDC.put(key, previous)
}

/** SLF4J MDC key for the W3C trace id. Lowercase-dotted to match the SDK's field-naming convention. */
internal const val MDC_TRACE_ID: String = "trace.id"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ public object UrlRedactor {

private fun lowercaseAllowList(allowed: Set<String>): Set<String> {
if (allowed.isEmpty()) return emptySet()
val lower = HashSet<String>(allowed.size)
for (name in allowed) lower.add(name.lowercase())
return lower
return allowed.mapTo(HashSet(allowed.size)) { it.lowercase() }
}

private fun rebuild(
Expand Down
14 changes: 7 additions & 7 deletions sdk-core/src/main/kotlin/org/dexpace/sdk/core/io/Io.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ public object Io {
public fun installProvider(provider: IoProvider) {
lock.withLock {
val existing = installed
if (existing != null && existing !== provider) {
throw IllegalStateException(
"An IoProvider (${existing::class.qualifiedName ?: existing::class}) is " +
"already installed; refusing to overwrite with a different provider " +
"(${provider::class.qualifiedName ?: provider::class}). " +
"Use withProvider { ... } from org.dexpace.sdk.core.testing for scoped overrides.",
)
check(existing == null || existing === provider) {
val existingName = existing!!::class.qualifiedName ?: existing::class
val providerName = provider::class.qualifiedName ?: provider::class
"An IoProvider ($existingName) is " +
"already installed; refusing to overwrite with a different provider " +
"($providerName). " +
"Use withProvider { ... } from org.dexpace.sdk.core.testing for scoped overrides."
}
installed = provider
}
Expand Down
24 changes: 16 additions & 8 deletions sdk-core/src/main/kotlin/org/dexpace/sdk/core/io/TeeSink.kt
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,22 @@ internal class TeeSink(
source: Buffer,
byteCount: Long,
) {
val allowed = (tapLimit - mirrored).coerceAtLeast(0L)
if (allowed == 0L) return
val copy = if (byteCount < allowed) byteCount else allowed
val copy = tapAllowance(byteCount)
if (copy == 0L) return
source.copyTo(tap, 0, copy)
mirrored += copy
}

/**
* Computes how many of [requested] bytes may still be mirrored into [tap]: the smaller of
* [requested] and the remaining [tapLimit] budget, clamped to never go negative. The actual
* copy and [mirrored] advancement stay at each call site.
*/
private fun tapAllowance(requested: Long): Long {
val remaining = (tapLimit - mirrored).coerceAtLeast(0L)
return if (requested < remaining) requested else remaining
}

@Throws(IOException::class)
override fun flush() {
primary.flush()
Expand Down Expand Up @@ -199,11 +208,10 @@ internal class TeeSink(
// Tap first (within the budget), primary second (see single-byte overload): a
// primary-side failure leaves the failing chunk captured in the tap. The FULL
// chunk is always forwarded to the primary so the wire body is never truncated.
val allowed = (tapLimit - mirrored).coerceAtLeast(0L)
if (allowed > 0L) {
val copy = if (len.toLong() < allowed) len else allowed.toInt()
tapStream.write(b, off, copy)
mirrored += copy.toLong()
val copy = tapAllowance(len.toLong())
if (copy > 0L) {
tapStream.write(b, off, copy.toInt())
mirrored += copy
}
primaryStream.write(b, off, len)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,12 @@ public class LinkHeaderPaginationStrategy<T>
* header values) and returns the URL of the first link-value whose `rel` parameter
* contains the token `next`, or `null` if no such link-value exists.
*/
private fun extractNextUrl(header: String): String? {
for (entry in splitLinkValues(header)) {
val parsed = parseLinkValue(entry) ?: continue
val rels = parsed.second
if (rels.any { it.equals("next", ignoreCase = true) }) {
return parsed.first
}
}
return null
}
private fun extractNextUrl(header: String): String? =
splitLinkValues(header)
.asSequence()
.mapNotNull { parseLinkValue(it) }
.firstOrNull { it.second.any { rel -> rel.equals("next", ignoreCase = true) } }
?.first

/**
* Splits an RFC 5988 `Link` header into individual link-values. Commas inside
Expand Down
Loading
Loading