diff --git a/obp-api/src/main/scala/code/api/util/http4s/RequestScopeConnection.scala b/obp-api/src/main/scala/code/api/util/http4s/RequestScopeConnection.scala index c8a87c3e14..b42b99ce82 100644 --- a/obp-api/src/main/scala/code/api/util/http4s/RequestScopeConnection.scala +++ b/obp-api/src/main/scala/code/api/util/http4s/RequestScopeConnection.scala @@ -39,7 +39,10 @@ import scala.concurrent.Future * 2. Wrap it in a non-closing proxy (commit/rollback/close are no-ops). * 3. Store the proxy in requestProxyLocal (IOLocal) only — currentProxy (TTL) is * NOT set here to avoid leaving compute threads dirty. - * 4. Run validateRequest + routes.run inside withRequestTransaction. + * 4. Run validateOnly (auth, roles, entity lookups) — outside the transaction, on + * auto-commit vendor connections. On Left: return error response, no transaction + * opened. On Right (GET/HEAD): run routes.run directly on auto-commit connections. + * On Right (POST/PUT/DELETE/PATCH): open the transaction and run routes.run inside it. * 5. Each IO.fromFuture call site uses RequestScopeConnection.fromFuture, which in * a single synchronous IO.defer block on compute thread T: * a. Sets currentProxy (TTL) on T. @@ -101,16 +104,11 @@ object RequestScopeConnection extends MdcLoggable { else method.invoke(real, args: _*) if (result == null || method.getReturnType == Void.TYPE) null else result } catch { - case e: java.lang.reflect.InvocationTargetException - if Option(e.getCause).exists(_.isInstanceOf[java.sql.SQLException]) => + case e: java.lang.reflect.InvocationTargetException => + val cause = Option(e.getCause).getOrElse(e) logger.error( - s"[RequestScopeProxy] method=${method.getName} failed on closed/returned connection. " + - s"This means the request-scoped proxy was handed to code that ran AFTER withRequestTransaction " + - s"committed and closed the underlying connection. " + - s"Likely cause: v7 path fell through to Http4sLiftWebBridge without a transaction scope — " + - s"currentProxy was still set on this thread from a previous fiber or was not cleared. " + - s"Cause: ${e.getCause.getMessage}", - e.getCause + s"[RequestScopeProxy] method=${method.getName} failed: ${cause.getClass.getName}: ${cause.getMessage}", + cause ) throw e } @@ -162,7 +160,7 @@ class RequestAwareConnectionManager(delegate: ConnectionManager) extends Connect if (proxy != null) { // Guard: if the underlying connection is already closed, the proxy is stale — it // was captured in a TtlRunnable submitted during a prior request and that request's - // withRequestTransaction has already committed and closed the real connection. + // withBusinessDBTransaction has already committed and closed the real connection. // Returning a stale proxy would throw "Connection is closed" inside the caller's // DB.use and, if that caller is inside authenticate, would be caught as Left(_) // and silently turned into a 401 response. @@ -184,7 +182,7 @@ class RequestAwareConnectionManager(delegate: ConnectionManager) extends Connect } /** - * If conn is our request proxy, skip release — it is managed by withRequestTransaction. + * If conn is our request proxy, skip release — it is managed by withBusinessDBTransaction. * Otherwise delegate to the original vendor (which does HikariCP ProxyConnection.close()). * * Reference equality is safe: one proxy instance per request, same object throughout. @@ -192,7 +190,7 @@ class RequestAwareConnectionManager(delegate: ConnectionManager) extends Connect override def releaseConnection(conn: Connection): Unit = { val proxy = RequestScopeConnection.currentProxy.get() if (proxy != null && (conn eq proxy.asInstanceOf[AnyRef])) { - // Skip release — this connection is managed by withRequestTransaction. + // Skip release — this connection is managed by withBusinessDBTransaction. } else { delegate.releaseConnection(conn) } diff --git a/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala b/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala index 990579b56a..9bafe7db75 100644 --- a/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala +++ b/obp-api/src/main/scala/code/api/util/http4s/ResourceDocMiddleware.scala @@ -99,10 +99,25 @@ object ResourceDocMiddleware extends MdcLoggable { case Some(resourceDoc) => val ccWithDoc = ResourceDocMatcher.attachToCallContext(cc, resourceDoc) val pathParams = ResourceDocMatcher.extractPathParams(req.uri.path, resourceDoc) - // Wrap in a request-scoped transaction, then run full validation chain - OptionT(withRequestTransaction( - validateRequest(req, resourceDoc, pathParams, ccWithDoc, routes) - ).map(Option(_))) + // Validate first (read-only, outside any transaction), then run business logic. + // GET/HEAD are safe methods — no writes, no transaction needed; they run on + // auto-commit vendor connections (same as validation). All other methods + // (POST/PUT/DELETE/PATCH) wrap routes.run in withBusinessDBTransaction. + OptionT( + validateOnly(req, resourceDoc, pathParams, ccWithDoc).flatMap { + case Left(errorResponse) => + IO.pure(Option(errorResponse)) + case Right(enrichedReq) => + val routeIO = + routes.run(enrichedReq) + .map(ensureJsonContentType) + .getOrElseF(IO.pure(ensureJsonContentType(Response[IO](org.http4s.Status.NotFound)))) + val executed = + if (req.method == Method.GET || req.method == Method.HEAD) routeIO + else withBusinessDBTransaction(routeIO) + executed.map(Option(_)) + } + ) case None => // No matching ResourceDoc: fallback to original route (NO transaction scope opened). @@ -117,61 +132,69 @@ object ResourceDocMiddleware extends MdcLoggable { } /** - * Wraps an IO[Response[IO]] in a request-scoped DB transaction. + * Wraps the business-logic IO in a request-scoped DB transaction. + * + * Called only for mutating methods (POST/PUT/DELETE/PATCH) after validateOnly succeeds. + * GET/HEAD bypass this entirely and run on auto-commit vendor connections, avoiding + * a pool borrow + empty-commit overhead on every read request. * - * Borrows a Connection from HikariCP, wraps it in a non-closing proxy (so Lift's - * internal DB.use lifecycle cannot commit or return it to the pool prematurely), - * and stores it in requestProxyLocal (IOLocal — fiber-local source of truth). + * Borrows a Connection from HikariCP via Resource.make so close() is guaranteed + * even if commit/rollback throws or the fiber is cancelled. The proxy prevents + * Lift's internal DB.use lifecycle from committing or returning the connection + * prematurely. * * currentProxy (TTL) is NOT set here. Every DB call goes through * RequestScopeConnection.fromFuture, which atomically sets + submits + clears the * TTL within a single IO.defer block on the compute thread, so the thread is never * left dirty after the fromFuture call returns. * - * On success: commits and closes the real connection. - * On exception: rolls back and closes the real connection. + * On success: commits, then Resource finalizer closes. + * On error/cancellation: rolls back (errors swallowed to preserve original cause), + * then Resource finalizer closes. * * Metric writes (IO.blocking in recordMetric) run on the blocking pool where * currentProxy is not set — they get their own pool connection and commit * independently, matching v6 behaviour. */ - private def withRequestTransaction(io: IO[Response[IO]]): IO[Response[IO]] = { - for { - realConn <- IO.blocking(APIUtil.vendor.HikariDatasource.ds.getConnection()) - proxy = RequestScopeConnection.makeProxy(realConn) - _ <- RequestScopeConnection.requestProxyLocal.set(Some(proxy)) - // Note: currentProxy (TTL) is NOT set here. Every DB call goes through - // RequestScopeConnection.fromFuture, which atomically sets + submits + clears - // the TTL within a single IO.defer block on the compute thread. Setting it - // here would leave the compute thread's TTL dirty if guaranteeCase runs on a - // different thread. - result <- io.guaranteeCase { + private def withBusinessDBTransaction(io: IO[Response[IO]]): IO[Response[IO]] = + Resource.make( + IO.blocking(APIUtil.vendor.HikariDatasource.ds.getConnection()) + )(conn => + IO.blocking { try { conn.close() } catch { case _: Exception => () } } + ).use { realConn => + val proxy = RequestScopeConnection.makeProxy(realConn) + for { + _ <- RequestScopeConnection.requestProxyLocal.set(Some(proxy)) + // Note: currentProxy (TTL) is NOT set here. Every DB call goes through + // RequestScopeConnection.fromFuture, which atomically sets + submits + clears + // the TTL within a single IO.defer block on the compute thread. + result <- io.guaranteeCase { case Outcome.Succeeded(_) => RequestScopeConnection.requestProxyLocal.set(None) *> - IO.blocking { try { realConn.commit() } finally { realConn.close() } } + IO.blocking { realConn.commit() } case _ => RequestScopeConnection.requestProxyLocal.set(None) *> - IO.blocking { try { realConn.rollback() } finally { realConn.close() } } + IO.blocking { try { realConn.rollback() } catch { case _: Exception => () } } } - } yield result - } + } yield result + } /** - * Executes the full validation chain for the request. - * Returns either an error Response or enriched request routed to the handler. + * Runs the full validation chain (auth → roles → bank → account → view → counterparty) + * and returns either an error Response or an enriched Request ready for the handler. + * + * All steps are read-only and execute outside any DB transaction, so no locks are + * held during validation. The caller opens a transaction only after this returns Right. */ - private def validateRequest( - req: Request[IO], - resourceDoc: ResourceDoc, - pathParams: Map[String, String], - cc: CallContext, - routes: HttpRoutes[IO] - ): IO[Response[IO]] = { + private def validateOnly( + req: Request[IO], + resourceDoc: ResourceDoc, + pathParams: Map[String, String], + cc: CallContext + ): IO[Either[Response[IO], Request[IO]]] = { - // Initial context with just CallContext val initialContext = ValidationContext(callContext = cc) - // Compose all validation steps using EitherT val result: Validation[ValidationContext] = for { context <- authenticate(req, resourceDoc, initialContext) context <- authorizeRoles(resourceDoc, pathParams, context) @@ -181,12 +204,11 @@ object ResourceDocMiddleware extends MdcLoggable { context <- validateCounterparty(pathParams, context) } yield context - // Convert Validation result to Response - result.value.flatMap { - case Left(errorResponse) => IO.pure(ensureJsonContentType(errorResponse)) // Ensure all error responses are JSON + result.value.map { + case Left(errorResponse) => + Left(ensureJsonContentType(errorResponse)) case Right(validCtx) => - // Enrich request with validated CallContext - val enrichedReq = req.withAttribute( + Right(req.withAttribute( Http4sRequestAttributes.callContextKey, validCtx.callContext.copy( bank = validCtx.bank, @@ -194,10 +216,7 @@ object ResourceDocMiddleware extends MdcLoggable { view = validCtx.view, counterparty = validCtx.counterparty ) - ) - routes.run(enrichedReq) - .map(ensureJsonContentType) // Ensure routed response has JSON content type - .getOrElseF(IO.pure(ensureJsonContentType(Response[IO](org.http4s.Status.NotFound)))) + )) } } @@ -207,8 +226,8 @@ object ResourceDocMiddleware extends MdcLoggable { logger.debug(s"[ResourceDocMiddleware] needsAuthentication for ${resourceDoc.partialFunctionName}: $needsAuth") val io = - if (needsAuth) RequestScopeConnection.fromFuture(APIUtil.authenticatedAccess(ctx.callContext)) - else RequestScopeConnection.fromFuture(APIUtil.anonymousAccess(ctx.callContext)) + if (needsAuth) IO.fromFuture(IO(APIUtil.authenticatedAccess(ctx.callContext))) + else IO.fromFuture(IO(APIUtil.anonymousAccess(ctx.callContext))) EitherT( io.attempt.flatMap { @@ -266,7 +285,7 @@ object ResourceDocMiddleware extends MdcLoggable { pathParams.get("BANK_ID") match { case Some(bankId) => EitherT( - RequestScopeConnection.fromFuture(NewStyle.function.getBank(BankId(bankId), Some(ctx.callContext))) + IO.fromFuture(IO(NewStyle.function.getBank(BankId(bankId), Some(ctx.callContext)))) .attempt.flatMap { case Right((bank, Some(updatedCC))) => IO.pure(Right(ctx.copy(bank = Some(bank), callContext = updatedCC))) case Right((bank, None)) => IO.pure(Right(ctx.copy(bank = Some(bank)))) @@ -284,7 +303,7 @@ object ResourceDocMiddleware extends MdcLoggable { (pathParams.get("BANK_ID"), pathParams.get("ACCOUNT_ID")) match { case (Some(bankId), Some(accountId)) => EitherT( - RequestScopeConnection.fromFuture(NewStyle.function.getBankAccount(BankId(bankId), AccountId(accountId), Some(ctx.callContext))) + IO.fromFuture(IO(NewStyle.function.getBankAccount(BankId(bankId), AccountId(accountId), Some(ctx.callContext)))) .attempt.flatMap { case Right((acc, Some(updatedCC))) => IO.pure(Right(ctx.copy(account = Some(acc), callContext = updatedCC))) case Right((acc, None)) => IO.pure(Right(ctx.copy(account = Some(acc)))) @@ -302,7 +321,7 @@ object ResourceDocMiddleware extends MdcLoggable { (pathParams.get("BANK_ID"), pathParams.get("ACCOUNT_ID"), pathParams.get("VIEW_ID")) match { case (Some(bankId), Some(accountId), Some(viewId)) => EitherT( - RequestScopeConnection.fromFuture(ViewNewStyle.checkViewAccessAndReturnView(ViewId(viewId), BankIdAccountId(BankId(bankId), AccountId(accountId)), ctx.user.toOption, Some(ctx.callContext))) + IO.fromFuture(IO(ViewNewStyle.checkViewAccessAndReturnView(ViewId(viewId), BankIdAccountId(BankId(bankId), AccountId(accountId)), ctx.user.toOption, Some(ctx.callContext)))) .attempt.flatMap { case Right(view) => IO.pure(Right(ctx.copy(view = Some(view)))) case Left(e: APIFailureNewStyle) => ErrorResponseConverter.createErrorResponse(e.failCode, e.failMsg, ctx.callContext).map(Left(_)) @@ -319,7 +338,7 @@ object ResourceDocMiddleware extends MdcLoggable { (pathParams.get("BANK_ID"), pathParams.get("ACCOUNT_ID"), pathParams.get("COUNTERPARTY_ID")) match { case (Some(bankId), Some(accountId), Some(counterpartyId)) => EitherT( - RequestScopeConnection.fromFuture(NewStyle.function.getCounterpartyTrait(BankId(bankId), AccountId(accountId), counterpartyId, Some(ctx.callContext))) + IO.fromFuture(IO(NewStyle.function.getCounterpartyTrait(BankId(bankId), AccountId(accountId), counterpartyId, Some(ctx.callContext)))) .attempt.flatMap { case Right((cp, Some(updatedCC))) => IO.pure(Right(ctx.copy(counterparty = Some(cp), callContext = updatedCC))) case Right((cp, None)) => IO.pure(Right(ctx.copy(counterparty = Some(cp)))) diff --git a/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala b/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala index 3d401d10da..9ee114cbaf 100644 --- a/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala +++ b/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala @@ -11,7 +11,7 @@ import code.api.util.{APIUtil, ApiRole, ApiVersionUtils, CallContext, CustomJson import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateEntitlementAtOneBank, canDeleteEntitlementAtAnyBank, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetCardsForBank, canGetConnectorHealth, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMigrations} import code.api.util.ApiTag._ import code.api.util.ErrorMessages._ -import code.api.util.http4s.{ErrorResponseConverter, Http4sRequestAttributes, ResourceDocMiddleware} +import code.api.util.http4s.{ErrorResponseConverter, Http4sRequestAttributes, RequestScopeConnection, ResourceDocMiddleware} import code.api.util.http4s.Http4sRequestAttributes.{EndpointHelpers, RequestOps} import code.api.util.newstyle.ViewNewStyle import code.api.v1_3_0.JSONFactory1_3_0 @@ -1091,6 +1091,40 @@ object Http4s700 { // ── End Phase 1 batch 3 ────────────────────────────────────────────────── + // ── Test-only rollback endpoint ─────────────────────────────────────────── + // Enabled only in Lift test mode (Props.testMode == true, i.e. -Drun.mode=test). + // Props.testMode is set from the JVM system property before any props file loads, + // so it is reliably available at object-initialization time unlike file-based props. + // POST /obp/v7.0.0/test/rollback-check: writes one entitlement to DB via + // RequestScopeConnection.fromFuture, then raises IO.raiseError so the middleware + // hits Outcome.Errored → rollback. Used by Http4s700TransactionTest to verify + // that data written inside a failed request is never committed. + if (net.liftweb.util.Props.testMode) { + val testRollbackEndpoint: HttpRoutes[IO] = HttpRoutes.of[IO] { + case req @ POST -> `prefixPath` / "test" / "rollback-check" => + val cc = req.callContext + cc.user.toOption match { + case Some(user) => + RequestScopeConnection.fromFuture( + Future(Entitlement.entitlement.vend.addEntitlement("", user.userId, "TestRollbackSentinel")) + ).flatMap(_ => IO.raiseError[Response[IO]](new RuntimeException("[test] intentional rollback"))) + case None => + IO.pure(Response[IO](Status.Unauthorized)) + } + } + resourceDocs += ResourceDoc( + null, + implementedInApiVersion, + "testRollbackEndpoint", + "POST", "/test/rollback-check", "Test rollback", "Test-only: write then throw to verify rollback", + EmptyBody, EmptyBody, + List($AuthenticatedUserIsRequired, UnknownError), + Nil, + None, + http4sPartialFunction = Some(testRollbackEndpoint) + ) + } + // All routes combined (without middleware - for direct use). // // Routes are sorted automatically by URL template specificity (segment count, diff --git a/obp-api/src/test/scala/code/api/v7_0_0/Http4s700TransactionTest.scala b/obp-api/src/test/scala/code/api/v7_0_0/Http4s700TransactionTest.scala index 54ef857c92..f20b44eb96 100644 --- a/obp-api/src/test/scala/code/api/v7_0_0/Http4s700TransactionTest.scala +++ b/obp-api/src/test/scala/code/api/v7_0_0/Http4s700TransactionTest.scala @@ -19,7 +19,7 @@ import scala.concurrent.duration._ * Integration tests for the v7 request-scoped transaction feature. * * Each HTTP request handled by the http4s stack runs inside - * `ResourceDocMiddleware.withRequestTransaction`, which: + * `ResourceDocMiddleware.withBusinessDBTransaction`, which: * - Borrows one real JDBC connection from HikariCP * - Wraps it in a non-closing proxy so Lift Mapper cannot commit early * - Commits on Outcome.Succeeded (HTTP 2xx or error response) @@ -248,4 +248,31 @@ class Http4s700TransactionTest extends ServerSetupWithTestData { banksStatus shouldBe 200 } } + + // ── Rollback on uncaught exception ─────────────────────────────────────── + + feature("v7 transaction — rollback on uncaught exception") { + + scenario("Uncaught IO exception triggers rollback — write is not committed", Http4s700TransactionTag) { + Given("No TestRollbackSentinel entitlement exists for resourceUser1 before the request") + val before = Entitlement.entitlement.vend.getEntitlementsByUserId(resourceUser1.userId) + .map(_.filter(_.roleName == "TestRollbackSentinel")) + .openOr(Nil) + before shouldBe empty + + When("POST /obp/v7.0.0/test/rollback-check raises an uncaught IO error after writing") + val headers = Map("DirectLogin" -> s"token=${token1.value}") + val (status, _, _) = makeHttpRequestWithBody( + "POST", "/obp/v7.0.0/test/rollback-check", "{}", headers) + + Then("The server returns 500 (IO error propagated through the stack)") + status shouldBe 500 + + And("The TestRollbackSentinel row is NOT in the DB — the transaction was rolled back") + val after = Entitlement.entitlement.vend.getEntitlementsByUserId(resourceUser1.userId) + .map(_.filter(_.roleName == "TestRollbackSentinel")) + .openOr(Nil) + after shouldBe empty + } + } }