diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 33e6c90..1bede48 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -25,28 +25,27 @@ jobs: test: # runs-on: self-hosted runs-on: ubuntu-latest - env: - # define Java options for both official sbt and sbt-extras - JAVA_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 - JVM_OPTS: -Xms2048M -Xmx2048M -Xss6M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8 steps: - name: Env run: | echo "JFROG_USER=${{ secrets.JFROG_USER }}" >> $GITHUB_ENV echo "JFROG_PASSWORD=${{ secrets.JFROG_PASSWORD }}" >> $GITHUB_ENV echo "GITHUB_TOKEN=${{ secrets.GITHUB_TOKEN }}" >> $GITHUB_ENV + echo "CODECOV_TOKEN=${{ secrets.CODECOV_TOKEN }}" >> $GITHUB_ENV - name: Checkout uses: actions/checkout@v3 - - name: Set up JDK 8 + - name: Set up JDK 17 uses: actions/setup-java@v3 with: - java-version: '8' + java-version: '17' distribution: 'temurin' # cache: 'sbt' - name: Setup sbt launcher uses: sbt/setup-sbt@v1 + - name: Cross Compile + run: SBT_OPTS="-Xss4M -Xms1g -Xmx4g -Dfile.encoding=UTF-8" sbt + compile - name: Run tests & Coverage Report - run: sbt coverage test coverageReport + run: SBT_OPTS="-Xss4M -Xms1g -Xmx4g -Dfile.encoding=UTF-8" sbt coverage test coverageReport - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 with: @@ -60,10 +59,10 @@ jobs: steps: - name: Checkout uses: actions/checkout@v3 - - name: Set up JDK 8 + - name: Set up JDK 17 uses: actions/setup-java@v3 with: - java-version: '8' + java-version: '17' distribution: 'temurin' # cache: 'sbt' - name: Setup sbt launcher diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 8eb8cbb..1d9309c 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -32,18 +32,21 @@ jobs: echo "JFROG_USER=${{ secrets.JFROG_USER }}" >> $GITHUB_ENV echo "JFROG_PASSWORD=${{ secrets.JFROG_PASSWORD }}" >> $GITHUB_ENV echo "GITHUB_TOKEN=${{ secrets.GITHUB_TOKEN }}" >> $GITHUB_ENV + echo "CODECOV_TOKEN=${{ secrets.CODECOV_TOKEN }}" >> $GITHUB_ENV - name: Checkout uses: actions/checkout@v3 - - name: Set up JDK 8 + - name: Set up JDK 17 uses: actions/setup-java@v3 with: - java-version: '8' + java-version: '17' distribution: 'temurin' # cache: 'sbt' - name: Setup sbt launcher uses: sbt/setup-sbt@v1 + - name: Cross Compile + run: SBT_OPTS="-Xss4M -Xms1g -Xmx4g -Dfile.encoding=UTF-8" sbt '+ compile' - name: Run tests & Coverage Report - run: sbt coverage test coverageReport coverageAggregate + run: SBT_OPTS="-Xss4M -Xms1g -Xmx4g -Dfile.encoding=UTF-8" sbt coverage test coverageReport coverageAggregate - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 with: @@ -59,10 +62,10 @@ jobs: steps: - name: Checkout uses: actions/checkout@v3 - - name: Set up JDK 8 + - name: Set up JDK 17 uses: actions/setup-java@v3 with: - java-version: '8' + java-version: '17' distribution: 'temurin' # cache: 'sbt' - name: Setup sbt launcher diff --git a/build.sbt b/build.sbt index 7cc1791..36cb024 100644 --- a/build.sbt +++ b/build.sbt @@ -6,25 +6,24 @@ import app.softnetwork.* lazy val scala212 = "2.12.20" lazy val scala213 = "2.13.16" -lazy val javacCompilerVersion = "1.8" +lazy val javacCompilerVersion = "17" lazy val scalacCompilerOptions = Seq( "-deprecation", - "-feature", - s"-target:jvm-$javacCompilerVersion" + "-feature" ) ThisBuild / organization := "app.softnetwork" name := "generic-persistence-api" -ThisBuild / version := "0.8.0" +ThisBuild / version := "0.8-SNAPSHOT" lazy val moduleSettings = Seq( crossScalaVersions := Seq(scala212, scala213), scalacOptions ++= { CrossVersion.partialVersion(scalaVersion.value) match { case Some((2, 12)) => scalacCompilerOptions :+ "-Ypartial-unification" - case Some((2, 13)) => scalacCompilerOptions + case Some((2, 13)) => scalacCompilerOptions :+ s"-release:$javacCompilerVersion" case _ => Seq.empty } } @@ -55,6 +54,24 @@ ThisBuild / dependencyOverrides ++= Seq( "org.lmdbjava" % "lmdbjava" % "0.9.1" exclude("org.slf4j", "slf4j-api"), ) +ThisBuild / javaOptions ++= Seq( + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.math=ALL-UNNAMED", + "--add-opens=java.base/java.io=ALL-UNNAMED", + "--add-opens=java.base/java.net=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.text=ALL-UNNAMED", + "--add-opens=java.base/java.time=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" +) + +ThisBuild / Test / fork := true + +Test / javaOptions ++= javaOptions.value + Test / parallelExecution := false lazy val common = project.in(file("common")) diff --git a/common/testkit/src/main/scala/app/softnetwork/concurrent/scalatest/CompletionTestKit.scala b/common/testkit/src/main/scala/app/softnetwork/concurrent/scalatest/CompletionTestKit.scala index 8ee9dc8..e1c966c 100644 --- a/common/testkit/src/main/scala/app/softnetwork/concurrent/scalatest/CompletionTestKit.scala +++ b/common/testkit/src/main/scala/app/softnetwork/concurrent/scalatest/CompletionTestKit.scala @@ -12,9 +12,9 @@ import scala.util.{Failure, Success, Try} import scala.language.reflectiveCalls /** Created by smanciot on 12/04/2021. - */ + */ trait CompletionTestKit extends Completion with Assertions { - _: {def log: Logger} => + _: { def log: Logger } => implicit class AwaitAssertion[T](future: Future[T])(implicit atMost: Duration = defaultTimeout) { def assert(fun: T => Assertion): Assertion = diff --git a/core/src/main/scala/app/softnetwork/persistence/model/package.scala b/core/src/main/scala/app/softnetwork/persistence/model/package.scala index 5fab2b1..0f558dc 100644 --- a/core/src/main/scala/app/softnetwork/persistence/model/package.scala +++ b/core/src/main/scala/app/softnetwork/persistence/model/package.scala @@ -45,7 +45,7 @@ package object model { acc + char } } - def $: String = toSnakeCase + def $ : String = toSnakeCase } case class StateWrapper[T <: State]( diff --git a/core/src/main/scala/app/softnetwork/persistence/package.scala b/core/src/main/scala/app/softnetwork/persistence/package.scala index ecc5db2..f0514c8 100644 --- a/core/src/main/scala/app/softnetwork/persistence/package.scala +++ b/core/src/main/scala/app/softnetwork/persistence/package.scala @@ -8,7 +8,7 @@ import java.time.Instant import scala.language.implicitConversions /** Created by smanciot on 13/04/2020. - */ + */ package object persistence { trait ManifestWrapper[T] { @@ -20,7 +20,7 @@ package object persistence { def generateUUID(key: Option[String] = None): String = key match { case Some(clearText) => sha256(clearText) - case _ => UUID.randomUUID().toString + case _ => UUID.randomUUID().toString } def now(): Date = Date.from(Instant.now()) @@ -30,8 +30,8 @@ package object persistence { } /** Used for akka and elastic persistence ids, one per targeted environment (development, - * production, ...) - */ + * production, ...) + */ val version: String = sys.env.getOrElse("VERSION", PersistenceCoreBuildInfo.version) val environment: String = sys.env.getOrElse( diff --git a/core/src/main/scala/app/softnetwork/persistence/query/ExternalPersistenceProvider.scala b/core/src/main/scala/app/softnetwork/persistence/query/ExternalPersistenceProvider.scala index f7df4c1..c28b864 100644 --- a/core/src/main/scala/app/softnetwork/persistence/query/ExternalPersistenceProvider.scala +++ b/core/src/main/scala/app/softnetwork/persistence/query/ExternalPersistenceProvider.scala @@ -58,6 +58,16 @@ trait ExternalPersistenceProvider[T <: Timestamped] { _: ManifestWrapper[T] => */ def deleteDocument(uuid: String): Boolean = false + /** Hard-deletes the underlying document referenced by its uuid from the external system. Use for + * GDPR compliance or permanent data removal. + * + * @param uuid + * - the uuid of the document to destroy + * @return + * whether the operation is successful or not + */ + def destroy(uuid: String): Boolean = false + /** Load the document referenced by its uuid * * @param uuid diff --git a/core/src/main/scala/app/softnetwork/persistence/query/JsonProvider.scala b/core/src/main/scala/app/softnetwork/persistence/query/JsonProvider.scala index 1b1792d..4fde30d 100644 --- a/core/src/main/scala/app/softnetwork/persistence/query/JsonProvider.scala +++ b/core/src/main/scala/app/softnetwork/persistence/query/JsonProvider.scala @@ -1,7 +1,12 @@ package app.softnetwork.persistence.query import app.softnetwork.persistence.ManifestWrapper -import app.softnetwork.persistence.model.{CamelCaseString, StateWrapper, StateWrappertReader, Timestamped} +import app.softnetwork.persistence.model.{ + CamelCaseString, + StateWrapper, + StateWrappertReader, + Timestamped +} import app.softnetwork.serialization.{commonFormats, serialization, updateCaseClass} import com.typesafe.config.{Config, ConfigFactory} import org.json4s.Formats @@ -167,10 +172,9 @@ trait JsonProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] { } match { case Success(value) if value.uuid != uuid => // do nothing case Success(value) if value.uuid == uuid && value.state.isDefined || value.deleted => - if(value.deleted){ + if (value.deleted) { lastMatchingLine = None - } - else{ + } else { lastMatchingLine = value.state // return the state } case _ => @@ -184,19 +188,18 @@ trait JsonProvider[T <: Timestamped] extends ExternalPersistenceProvider[T] { case Some(d) => d.toString.toBoolean case _ => false } - if(deleted){ + if (deleted) { lastMatchingLine = None - } - else{ + } else { parsed.get("state") match { case Some(updated: Map[String, Any]) => lastMatchingLine match { case Some(l) - if updated - .get("lastUpdated") - .map(lu => Instant.parse(lu.toString)) - .getOrElse(Instant.MIN) - .isAfter(l.lastUpdated) => // update the state + if updated + .get("lastUpdated") + .map(lu => Instant.parse(lu.toString)) + .getOrElse(Instant.MIN) + .isAfter(l.lastUpdated) => // update the state Try(updateCaseClass(l, updated)) match { case Success(updated: T) => lastMatchingLine = Some(updated) diff --git a/core/testkit/src/main/scala/app/softnetwork/persistence/person/query/PersonToJsonProcessorStream.scala b/core/testkit/src/main/scala/app/softnetwork/persistence/person/query/PersonToJsonProcessorStream.scala index 4215188..a2e9abf 100644 --- a/core/testkit/src/main/scala/app/softnetwork/persistence/person/query/PersonToJsonProcessorStream.scala +++ b/core/testkit/src/main/scala/app/softnetwork/persistence/person/query/PersonToJsonProcessorStream.scala @@ -1,12 +1,16 @@ package app.softnetwork.persistence.person.query import app.softnetwork.persistence.person.model.Person -import app.softnetwork.persistence.query.{InMemoryJournalProvider, InMemoryOffsetProvider, JsonProvider} +import app.softnetwork.persistence.query.{ + InMemoryJournalProvider, + InMemoryOffsetProvider, + JsonProvider +} import java.nio.file.{Files, Paths} trait PersonToJsonProcessorStream - extends PersonToExternalProcessorStream + extends PersonToExternalProcessorStream with InMemoryJournalProvider with InMemoryOffsetProvider with JsonProvider[Person] { diff --git a/core/testkit/src/main/scala/app/softnetwork/persistence/scalatest/PersistenceTestKit.scala b/core/testkit/src/main/scala/app/softnetwork/persistence/scalatest/PersistenceTestKit.scala index 1d0d5f5..b5b1d4f 100644 --- a/core/testkit/src/main/scala/app/softnetwork/persistence/scalatest/PersistenceTestKit.scala +++ b/core/testkit/src/main/scala/app/softnetwork/persistence/scalatest/PersistenceTestKit.scala @@ -23,9 +23,9 @@ import scala.language.implicitConversions import scala.reflect.ClassTag /** Created by smanciot on 04/01/2020. - */ + */ trait PersistenceTestKit - extends PersistenceGuardian + extends PersistenceGuardian with BeforeAndAfterAll with Eventually with CompletionTestKit @@ -61,8 +61,8 @@ trait PersistenceTestKit } /** @return - * roles associated with this node - */ + * roles associated with this node + */ def roles: Seq[String] = Seq.empty final lazy val akka: String = @@ -145,8 +145,8 @@ trait PersistenceTestKit |""".stripMargin + additionalConfig /** @return - * additional configuration - */ + * additional configuration + */ def additionalConfig: String = "" lazy val akkaConfig: Config = ConfigFactory.parseString(akka) @@ -160,7 +160,7 @@ trait PersistenceTestKit def typedSystem(): ActorSystem[Nothing] = system /** `PatienceConfig` from [[_root_.akka.actor.testkit.typed.TestKitSettings#DefaultTimeout]] - */ + */ implicit val patience: PatienceConfig = PatienceConfig(Settings.DefaultTimeout, Span(100, org.scalatest.time.Millis)) @@ -175,7 +175,7 @@ trait PersistenceTestKit } /** init and join cluster - */ + */ final def initAndJoinCluster(): Unit = { testKit.spawn(setup(), "guardian") // let the nodes join and become Up diff --git a/jdbc/build.sbt b/jdbc/build.sbt index 9ca9227..4fad035 100644 --- a/jdbc/build.sbt +++ b/jdbc/build.sbt @@ -10,7 +10,12 @@ val akkaPersistenceJdbc = Seq( "com.typesafe.slick" %% "slick" % Versions.slick, "com.typesafe.slick" %% "slick-hikaricp" % Versions.slick, "org.postgresql" % "postgresql" % Versions.postgresql, - "com.mysql" % "mysql-connector-j" % Versions.mysql + "com.mysql" % "mysql-connector-j" % Versions.mysql, + "org.flywaydb" % "flyway-core" % Versions.flyway, + "org.flywaydb" % "flyway-database-postgresql" % Versions.flyway, + // H2 uses the HSQLDB Flyway plugin — test-scoped because H2 is only used in tests. + // If H2 is needed outside tests, move this to Compile scope or add it in the consuming project. + "org.flywaydb" % "flyway-database-hsqldb" % Versions.flyway % Test ) libraryDependencies ++= akkaPersistenceJdbc diff --git a/jdbc/src/main/resources/db/migration/README.md b/jdbc/src/main/resources/db/migration/README.md new file mode 100644 index 0000000..5f30da5 --- /dev/null +++ b/jdbc/src/main/resources/db/migration/README.md @@ -0,0 +1,28 @@ +# Flyway Migrations + +## Convention + +Each entity type that uses `ColumnMappedJdbcStateProvider` has its own +migration folder: `db/migration/{tablename}/` + +## Naming + +`V{version}__{description}.sql` (double underscore) + +Examples: + +- `V1__create_api_keys.sql` +- `V2__add_revoked_at_column.sql` + +## Database Compatibility + +- Production: PostgreSQL 16+ +- Tests: H2 (use PostgreSQL-compatible subset) +- Avoid: PostgreSQL-specific syntax not supported by H2 + (e.g., `GENERATED ALWAYS AS IDENTITY`, `jsonb`) + +## Flyway Metadata Tables + +Each migration folder gets its own Flyway history table: +`flyway_schema_history_{tablename}`. This prevents conflicts when +multiple entities use Flyway in the same database. diff --git a/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/db/SlickDatabase.scala b/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/db/SlickDatabase.scala index 7956fd3..93f55f0 100644 --- a/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/db/SlickDatabase.scala +++ b/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/db/SlickDatabase.scala @@ -20,6 +20,20 @@ trait SlickDatabase extends ClasspathResources { def config: Config + /** Provides a DataSource for use by Flyway migrations. Extracted from the underlying HikariCP + * connection pool. Override this method if using a non-HikariCP connection pool. + */ + lazy val dataSource: javax.sql.DataSource = { + db.source match { + case hikari: slick.jdbc.hikaricp.HikariCPJdbcDataSource => hikari.ds + case other => + throw new IllegalStateException( + s"Expected HikariCP data source for Flyway, got ${other.getClass.getName}. " + + "Configure slick-hikaricp or override the dataSource method." + ) + } + } + lazy val slickProfile: String = config.getString("slick.profile") lazy val db: Database = { diff --git a/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/query/ColumnMappedJdbcStateProvider.scala b/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/query/ColumnMappedJdbcStateProvider.scala new file mode 100644 index 0000000..320d6db --- /dev/null +++ b/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/query/ColumnMappedJdbcStateProvider.scala @@ -0,0 +1,322 @@ +package app.softnetwork.persistence.jdbc.query + +import app.softnetwork.concurrent.Completion +import app.softnetwork.persistence._ +import app.softnetwork.persistence.jdbc.db.SlickDatabase +import app.softnetwork.persistence.jdbc.schema.FlywayMigration +import app.softnetwork.persistence.model.Timestamped +import app.softnetwork.persistence.query.ExternalPersistenceProvider +import app.softnetwork.serialization.{commonFormats, serialization} +import org.json4s.Formats +import slick.jdbc.{GetResult, JdbcProfile} + +import scala.concurrent.ExecutionContext +import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} + +/** A state provider that maps entity fields to individual typed table columns, enabling indexed + * lookups and SQL queries on specific fields. + * + * Unlike `JdbcStateProvider` which stores the entire state as a JSON blob, this provider requires + * the implementor to define: + * - A Slick `Table` class with typed columns (`TableType`) + * - `toRow(T)` / `fromRow(Row)` conversions + * - An implicit `GetResult[RowType]` for `searchDocuments` raw SQL + * + * Schema DDL is managed by Flyway migrations (not Slick DDL auto-generation). + * + * @tparam T + * the entity state type + */ +trait ColumnMappedJdbcStateProvider[T <: Timestamped] + extends ExternalPersistenceProvider[T] + with SlickDatabase + with FlywayMigration + with Completion { + _: ManifestWrapper[T] with JdbcProfile => + + import api._ + + // --- Abstract type members --- + + /** The row type used by the Slick Table definition. Typically a tuple matching the table columns. + */ + type RowType + + /** The concrete Slick Table class. */ + type TableType <: Table[RowType] + + // --- Abstract members --- + + /** The Slick TableQuery for this entity's table. */ + def tableQuery: TableQuery[TableType] + + /** Convert an entity state to a table row. */ + def toRow(entity: T, deleted: Boolean = false): RowType + + /** Convert a table row to an entity state. Return None if the row represents a + * deleted/soft-deleted entity. + */ + def fromRow(row: RowType): Option[T] + + /** Return the uuid column from the Table definition, used for filtering. */ + def rowUuidColumn(row: TableType): Rep[String] + + /** Implicit `GetResult` for deserializing raw SQL result rows in `searchDocuments`. Must match + * the column order of the table. + */ + implicit def getResult: GetResult[RowType] + + // --- Concrete members --- + + implicit def formats: Formats = commonFormats + + /** The table name, used for raw SQL search queries and logging. Derived from the entity type + * name, converted to snake_case. + */ + lazy val tableName: String = + manifestWrapper.wrapped.runtimeClass.getSimpleName + .replaceAll("([A-Z])", "_$1") + .toLowerCase + .stripPrefix("_") + + def excludedFields: Set[String] = Set("__serializedSizeMemoized") + + /** Optional schema/dataset prefix for qualified table name. */ + def dataset: Option[String] = { + if (config.hasPath("jdbc-external-processor.dataset")) { + val d = config.getString("jdbc-external-processor.dataset") + if (d.nonEmpty) Some(d) else None + } else { + None + } + } + + /** Defaults to `tableName` so that Flyway migration folder and table name stay consistent. + * Override only when they intentionally differ. + */ + override def migrationFolder: String = tableName + + override def migrationSchema: Option[String] = dataset + + private[this] lazy val tableFullName: String = dataset match { + case Some(s) => s"$s.$tableName" + case _ => tableName + } + + /** Column list for `searchDocuments` raw SQL. Override with explicit column names (e.g., `"uuid, + * last_updated, name, email, status, deleted"`) when adding Flyway migrations that change column + * order, to keep `GetResult` aligned. + */ + protected def selectColumns: String = "*" + + implicit def executionContext: ExecutionContext + + // --- CRUD operations --- + + override final def createDocument(document: T)(implicit t: ClassTag[T]): Boolean = { + val row = toRow(document) + val action = (tableQuery += row).map(_ > 0) + db.run(action).complete() match { + case Success(value) => + log.debug(s"Insert $tableFullName ${document.uuid} -> $value") + value + case Failure(f) => + log.error(f.getMessage, f) + false + } + } + + override final def updateDocument(document: T, upsert: Boolean = true)(implicit + t: ClassTag[T] + ): Boolean = { + if (upsert) { + loadDocument(document.uuid) match { + case Some(_) => doUpdate(document) + case None => createDocument(document) + } + } else { + doUpdate(document) + } + } + + private def doUpdate(document: T): Boolean = { + val row = toRow(document) + val action = tableQuery + .filter(r => rowUuidColumn(r) === document.uuid) + .update(row) + .map(_ > 0) + db.run(action).complete() match { + case Success(value) => + log.debug(s"Update $tableFullName ${document.uuid} -> $value") + value + case Failure(f) => + log.error(f.getMessage, f) + false + } + } + + override final def upsertDocument(uuid: String, data: String): Boolean = { + implicit val manifest: Manifest[T] = manifestWrapper.wrapped + loadDocument(uuid) match { + case Some(existing) => + var state = serialization.read[Map[String, Any]]( + serialization.write(existing) + ) + val updatedState = serialization.read[Map[String, Any]](data) + for ((key, value) <- updatedState) { + if (!excludedFields.contains(key)) { + state = state + (key -> value) + } + } + Try( + serialization.read[T](serialization.write(state))(formats, manifest) + ) match { + case Success(updated) => updateDocument(updated) + case Failure(e) => + log.error(s"Failed to upsert $uuid with data $data", e) + false + } + case None => + Try( + serialization.read[T](data)(formats, manifestWrapper.wrapped) + ) match { + case Success(entity) => createDocument(entity) + case Failure(e) => + log.error(s"Failed to create $uuid from data $data", e) + false + } + } + } + + /** Soft-deletes the document by setting deleted=true via `toRow(entity, deleted=true)`. + * + * Returns `true` if the entity was found and soft-deleted, or if the entity was already absent + * (idempotent — safe for `State2ExternalProcessorStream` which treats `false` as a failure + * requiring retry). + * + * The load-then-update is intentionally tolerant of concurrent removal: if another + * thread/process deletes the row between `loadDocument` and the UPDATE, the update affects 0 + * rows but we still return `true` because the end state (entity gone) is what the caller wanted. + */ + override final def deleteDocument(uuid: String): Boolean = { + loadDocument(uuid) match { + case Some(entity) => + val row = toRow(entity, deleted = true) + val action = tableQuery + .filter(r => rowUuidColumn(r) === uuid) + .update(row) + db.run(action).complete() match { + case Success(_) => + // Whether count > 0 or == 0, the entity is effectively deleted. + // count == 0 means another thread/process removed it between + // loadDocument and this update — same desired end state. + log.debug(s"Delete $tableFullName $uuid -> true") + true + case Failure(f) => + log.error(f.getMessage, f) + false + } + case None => + log.debug(s"Delete $tableFullName $uuid -> already absent, no-op") + true + } + } + + /** Hard-deletes the document by physically removing the row. Use for GDPR compliance or permanent + * data removal. + * + * Note: this method is not part of the `ExternalPersistenceProvider` interface and must be + * called on the concrete provider type. For pipeline-driven hard deletes, downcast the provider + * or use a dedicated command handler. + */ + override def destroy(uuid: String): Boolean = { + val action = tableQuery.filter(r => rowUuidColumn(r) === uuid).delete.map(_ > 0) + db.run(action).complete() match { + case Success(value) => + log.debug(s"Destroy $tableFullName $uuid -> $value") + value + case Failure(f) => + log.error(f.getMessage, f) + false + } + } + + override final def loadDocument( + uuid: String + )(implicit m: Manifest[T], formats: Formats): Option[T] = { + val action = tableQuery + .filter(r => rowUuidColumn(r) === uuid) + .result + .headOption + db.run(action).complete() match { + case Success(Some(row)) => fromRow(row) + case Success(None) => + log.debug(s"Load $tableFullName $uuid -> None") + None + case Failure(f) => + log.error(f.getMessage, f) + None + } + } + + /** Search documents using a SQL WHERE clause against typed columns. + * + * Unlike `JdbcStateProvider.search` which queries the JSON `state` column, this queries real + * typed columns — enabling indexed lookups. + * + * **Security:** The `query` string is interpolated directly into SQL (same design as + * `JdbcStateProvider`). This is safe when queries come from internal code (event processors). + * NEVER pass user-supplied input. For external input, use type-safe Slick finder methods. + */ + override final def searchDocuments( + query: String + )(implicit m: Manifest[T], formats: Formats): List[T] = { + val action = sql"""SELECT #$selectColumns FROM #$tableFullName WHERE #$query""" + .as[RowType] + db.run(action).complete() match { + case Success(rows) => + log.debug( + s"Search $tableFullName with $query -> ${rows.size} result(s)" + ) + rows.flatMap(fromRow).toList + case Failure(f) => + log.error(f.getMessage, f) + Nil + } + } + + /** Validates that a string is a safe SQL identifier (letters, digits, underscores, dots). */ + private def validateIdentifier(id: String): Unit = + require( + id.matches("[a-zA-Z_][a-zA-Z0-9_.]*"), + s"Invalid SQL identifier: '$id'" + ) + + /** Initialize the table via Flyway migration. + * + * @throws org.flywaydb.core.api.FlywayException + * if migration fails (fail-fast: schema must be correct before app starts) + */ + def initTable(): Unit = { + dataset.foreach { d => + validateIdentifier(d) + log.info(s"Setting up dataset $d") + withStatement { stmt => + Try( + stmt.executeUpdate(s"CREATE SCHEMA IF NOT EXISTS $d") + ) match { + case Success(_) => + log.debug(s"Dataset $d is ready") + case Failure(e: java.sql.SQLSyntaxErrorException) => + // Suppress known syntax errors (e.g., if the database doesn't support schemas) + log.warn(s"Could not create schema $d, it may not be supported by the database", e) + case Failure(other) => + log.error(s"Error while creating schema $d", other) + throw other + } + } + } + migrate() + } +} diff --git a/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/query/JdbcStateProvider.scala b/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/query/JdbcStateProvider.scala index 3002a0c..905ceca 100644 --- a/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/query/JdbcStateProvider.scala +++ b/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/query/JdbcStateProvider.scala @@ -299,7 +299,7 @@ trait JdbcStateProvider[T <: Timestamped] * @return * whether the operation is successful or not */ - def destroy(uuid: String): Boolean = { + override def destroy(uuid: String): Boolean = { val action = states.filter(_.uuid === uuid).delete.map(_ > 0) db.run(action).complete() match { case Success(value) => diff --git a/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/schema/FlywayMigration.scala b/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/schema/FlywayMigration.scala new file mode 100644 index 0000000..59733b7 --- /dev/null +++ b/jdbc/src/main/scala/app/softnetwork/persistence/jdbc/schema/FlywayMigration.scala @@ -0,0 +1,75 @@ +package app.softnetwork.persistence.jdbc.schema + +import app.softnetwork.persistence.jdbc.db.SlickDatabase +import org.flywaydb.core.Flyway +import org.flywaydb.core.api.MigrationVersion +import org.slf4j.{Logger, LoggerFactory} + +import scala.util.{Failure, Success, Try} + +/** Runs Flyway migrations at startup for a specific table/entity. + * + * Migration files are loaded from classpath location `db/migration/{migrationFolder}/` following + * Flyway naming convention: `V{version}__{description}.sql` + * + * Flyway metadata table is namespaced per entity to avoid conflicts: + * `flyway_schema_history_{migrationFolder}` + */ +trait FlywayMigration { _: SlickDatabase => + + protected lazy val migrationLogger: Logger = + LoggerFactory.getLogger(getClass.getName) + + /** The folder name under `db/migration/` containing migration scripts. Typically the table name + * (e.g., "api_keys", "organizations"). + */ + def migrationFolder: String + + /** Optional schema/dataset for the migration target. When set, Flyway uses this as the default + * schema. + */ + def migrationSchema: Option[String] = None + + /** Whether to baseline an existing database on first Flyway run. Set to `false` for strict V1 + * validation on fresh Flyway adoption. Default `true` for backward compatibility with pre-Flyway + * databases. + */ + def baselineOnMigrate: Boolean = true + + /** Runs pending Flyway migrations. Safe to call multiple times — already-applied migrations are + * skipped. + * + * @throws org.flywaydb.core.api.FlywayException + * if migration fails (fail-fast: schema must be correct before app starts) + */ + def migrate(): Unit = { + Try { + val builder = Flyway + .configure() + .dataSource(dataSource) + .locations(s"db/migration/$migrationFolder") + .table(s"flyway_schema_history_$migrationFolder") + .baselineOnMigrate(baselineOnMigrate) + .baselineVersion(MigrationVersion.fromVersion("0")) + + val configured = migrationSchema match { + case Some(schema) => builder.defaultSchema(schema) + case None => builder + } + + configured.load().migrate() + } match { + case Success(result) => + migrationLogger.info( + s"Flyway migration for '$migrationFolder': " + + s"${result.migrationsExecuted} migration(s) applied" + ) + case Failure(f) => + migrationLogger.error( + s"Flyway migration for '$migrationFolder' failed: ${f.getMessage}", + f + ) + throw f + } + } +} diff --git a/jdbc/testkit/src/test/resources/db/migration/test_entity/V1__create_test_entity.sql b/jdbc/testkit/src/test/resources/db/migration/test_entity/V1__create_test_entity.sql new file mode 100644 index 0000000..b5b43ea --- /dev/null +++ b/jdbc/testkit/src/test/resources/db/migration/test_entity/V1__create_test_entity.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS test_entity ( + uuid VARCHAR(255) NOT NULL PRIMARY KEY, + created_date TIMESTAMP WITH TIME ZONE NOT NULL, + last_updated TIMESTAMP WITH TIME ZONE NOT NULL, + name VARCHAR(255) NOT NULL, + email VARCHAR(255) NOT NULL, + status VARCHAR(50) NOT NULL DEFAULT 'active', + deleted BOOLEAN NOT NULL DEFAULT FALSE +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_test_entity_email + ON test_entity (email); diff --git a/jdbc/testkit/src/test/scala/app/softnetwork/persistence/jdbc/query/ColumnMappedJdbcStateProviderSpec.scala b/jdbc/testkit/src/test/scala/app/softnetwork/persistence/jdbc/query/ColumnMappedJdbcStateProviderSpec.scala new file mode 100644 index 0000000..2629a55 --- /dev/null +++ b/jdbc/testkit/src/test/scala/app/softnetwork/persistence/jdbc/query/ColumnMappedJdbcStateProviderSpec.scala @@ -0,0 +1,129 @@ +package app.softnetwork.persistence.jdbc.query + +import app.softnetwork.persistence.jdbc.scalatest.{H2TestKit, JdbcPersistenceTestKit} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.BeforeAndAfterAll +import slick.jdbc.H2Profile + +import java.time.Instant +import java.time.temporal.ChronoUnit +import scala.concurrent.ExecutionContext + +class ColumnMappedJdbcStateProviderSpec + extends AnyFlatSpec + with Matchers + with BeforeAndAfterAll + with TestEntityProvider + with H2Profile + with H2TestKit { + + override implicit def executionContext: ExecutionContext = classicSystem.dispatcher + + override def beforeAll(): Unit = { + super.beforeAll() + initTable() + } + + private val now = Instant.now().truncatedTo(ChronoUnit.MICROS) + private val alice = + TestEntity("id-1", now, now, "Alice", "alice@acme.com", "active") + private val bob = + TestEntity("id-2", now, now, "Bob", "bob@acme.com", "active") + private val charlie = + TestEntity("id-3", now, now, "Charlie", "charlie@acme.com", "suspended") + + "initTable" should "run Flyway migration and be idempotent" in { + noException should be thrownBy initTable() + } + + "createDocument" should "insert a row with typed columns" in { + createDocument(alice) shouldBe true + createDocument(bob) shouldBe true + createDocument(charlie) shouldBe true + } + + "loadDocument" should "retrieve and deserialize via fromRow" in { + loadDocument("id-1") shouldBe Some(alice) + } + + it should "return None for non-existent uuid" in { + loadDocument("non-existent") shouldBe None + } + + "updateDocument" should "modify typed columns" in { + val updated = alice.copy(name = "Alice Updated", lastUpdated = Instant.now()) + updateDocument(updated) shouldBe true + loadDocument("id-1").map(_.name) shouldBe Some("Alice Updated") + } + + it should "upsert when entity does not exist" in { + val newEntity = TestEntity("id-4", now, now, "Diana", "diana@acme.com", "active") + updateDocument(newEntity, upsert = true) shouldBe true + loadDocument("id-4").map(_.name) shouldBe Some("Diana") + } + + "searchDocuments" should "return entities matching SQL WHERE on typed columns" in { + val results = searchDocuments("status = 'active'") + results.map(_.uuid) should contain theSameElementsAs List("id-1", "id-2", "id-4") + } + + it should "support indexed column lookup" in { + val results = searchDocuments("email = 'bob@acme.com'") + results.map(_.uuid) shouldBe List("id-2") + } + + it should "support compound WHERE clauses" in { + val results = searchDocuments("status = 'active' AND name LIKE 'B%'") + results.map(_.name) shouldBe List("Bob") + } + + it should "return empty list for no matches" in { + searchDocuments("status = 'nonexistent'") shouldBe Nil + } + + "findByEmail (type-safe Slick finder)" should "return the correct entity" in { + findByEmail("bob@acme.com").map(_.uuid) shouldBe Some("id-2") + } + + it should "return None for unknown email" in { + findByEmail("unknown@acme.com") shouldBe None + } + + "findByStatus (type-safe Slick finder)" should "return matching entities" in { + findByStatus("suspended").map(_.uuid) shouldBe List("id-3") + } + + "deleteDocument" should "soft-delete (loadDocument returns None)" in { + deleteDocument("id-3") shouldBe true + loadDocument("id-3") shouldBe None + } + + it should "return true for already-absent entity (idempotent)" in { + deleteDocument("id-3") shouldBe true + } + + it should "return true for non-existent uuid (idempotent)" in { + deleteDocument("never-existed") shouldBe true + } + + "findByEmail" should "return None for soft-deleted entities" in { + findByEmail("charlie@acme.com") shouldBe None + } + + "destroy" should "hard-delete the row physically" in { + val temp = TestEntity("id-temp", now, now, "Temp", "temp@acme.com", "active") + createDocument(temp) shouldBe true + loadDocument("id-temp") shouldBe Some(temp) + + destroy("id-temp") shouldBe true + loadDocument("id-temp") shouldBe None + + // Verify it's really gone (not just soft-deleted) + searchDocuments("uuid = 'id-temp'") shouldBe Nil + } + + it should "return false for non-existent uuid" in { + destroy("never-existed") shouldBe false + } +} diff --git a/jdbc/testkit/src/test/scala/app/softnetwork/persistence/jdbc/query/TestEntity.scala b/jdbc/testkit/src/test/scala/app/softnetwork/persistence/jdbc/query/TestEntity.scala new file mode 100644 index 0000000..d9cd20e --- /dev/null +++ b/jdbc/testkit/src/test/scala/app/softnetwork/persistence/jdbc/query/TestEntity.scala @@ -0,0 +1,16 @@ +package app.softnetwork.persistence.jdbc.query + +import app.softnetwork.persistence.model.Timestamped + +import java.time.Instant + +/** Test entity mimicking a licensing domain object. */ +case class TestEntity( + uuid: String, + createdDate: Instant, + lastUpdated: Instant, + name: String, + email: String, + status: String, + deleted: Boolean = false +) extends Timestamped diff --git a/jdbc/testkit/src/test/scala/app/softnetwork/persistence/jdbc/query/TestEntityProvider.scala b/jdbc/testkit/src/test/scala/app/softnetwork/persistence/jdbc/query/TestEntityProvider.scala new file mode 100644 index 0000000..b74f8ea --- /dev/null +++ b/jdbc/testkit/src/test/scala/app/softnetwork/persistence/jdbc/query/TestEntityProvider.scala @@ -0,0 +1,100 @@ +package app.softnetwork.persistence.jdbc.query + +import app.softnetwork.persistence.ManifestWrapper +import slick.jdbc.{GetResult, JdbcProfile} + +import java.time.Instant +import scala.util.Success + +/** Test implementation of ColumnMappedJdbcStateProvider, database-agnostic. The concrete test spec + * mixes in the desired JdbcProfile (H2Profile, PostgresProfile, etc.). + */ +trait TestEntityProvider + extends ColumnMappedJdbcStateProvider[TestEntity] + with ManifestWrapper[TestEntity] { + _: JdbcProfile => + + override val manifestWrapper: ManifestW = ManifestW() + + override type RowType = + ( + String, + Instant, + Instant, + String, + String, + String, + Boolean + ) // uuid, createdDate, lastUpdated, ... + override type TableType = TestEntities + + import api._ + + class TestEntities(tag: Tag) extends Table[RowType](tag, dataset, tableName) { + def uuid = column[String]("uuid", O.PrimaryKey) + def createdDate = column[Instant]("created_date") + def lastUpdated = column[Instant]("last_updated") + def name = column[String]("name") + def email = column[String]("email") + def status = column[String]("status") + def deleted = column[Boolean]("deleted") + def * = (uuid, createdDate, lastUpdated, name, email, status, deleted) + } + + override def tableQuery: TableQuery[TestEntities] = + TableQuery[TestEntities] + + override def toRow( + entity: TestEntity, + deleted: Boolean + ): RowType = + ( + entity.uuid, + entity.createdDate, + entity.lastUpdated, + entity.name, + entity.email, + entity.status, + deleted || entity.deleted + ) + + override def fromRow(row: RowType): Option[TestEntity] = row match { + case (_, _, _, _, _, _, true) => None // soft-deleted + case (uuid, createdDate, lastUpdated, name, email, status, deleted) => + Some(TestEntity(uuid, createdDate, lastUpdated, name, email, status, deleted)) + } + + override def rowUuidColumn(row: TestEntities): Rep[String] = row.uuid + + override implicit def getResult: GetResult[RowType] = + GetResult { r => + ( + r.nextString(), + r.nextTimestamp().toInstant, + r.nextTimestamp().toInstant, + r.nextString(), + r.nextString(), + r.nextString(), + r.nextBoolean() + ) + } + + // --- Type-safe finder methods --- + + def findByEmail(email: String): Option[TestEntity] = { + val action = + tableQuery.filter(r => r.email === email && r.deleted === false).result.headOption + db.run(action).complete() match { + case Success(Some(row)) => fromRow(row) + case _ => None + } + } + + def findByStatus(status: String): List[TestEntity] = { + val action = tableQuery.filter(r => r.status === status && r.deleted === false).result + db.run(action).complete() match { + case Success(rows) => rows.flatMap(fromRow).toList + case _ => Nil + } + } +} diff --git a/project/Versions.scala b/project/Versions.scala index dfeebef..b97fd86 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -52,5 +52,7 @@ object Versions { val akkaPersistenceCassandra = "1.0.6" + val flyway = "11.8.1" + val testContainers = "1.20.6" } diff --git a/server/testkit/src/main/scala/akka/http/scaladsl/testkit/PersistenceScalatestRouteTest.scala b/server/testkit/src/main/scala/akka/http/scaladsl/testkit/PersistenceScalatestRouteTest.scala index d1247b0..6b1953b 100644 --- a/server/testkit/src/main/scala/akka/http/scaladsl/testkit/PersistenceScalatestRouteTest.scala +++ b/server/testkit/src/main/scala/akka/http/scaladsl/testkit/PersistenceScalatestRouteTest.scala @@ -2,7 +2,7 @@ package akka.http.scaladsl.testkit import akka.actor.ActorSystem import akka.http.scaladsl.model.HttpHeader -import akka.http.scaladsl.model.headers.{Cookie, HttpCookiePair, RawHeader, `Set-Cookie`} +import akka.http.scaladsl.model.headers.{`Set-Cookie`, Cookie, HttpCookiePair, RawHeader} import akka.http.scaladsl.server.directives.RouteDirectives import akka.http.scaladsl.server.{ExceptionHandler, Route} import akka.stream.{Materializer, SystemMaterializer} @@ -19,9 +19,9 @@ import org.scalatest.Suite import scala.concurrent.ExecutionContextExecutor /** Created by smanciot on 24/04/2020. - */ + */ trait PersistenceScalatestRouteTest - extends ApiServer + extends ApiServer with ServerTestKit with PersistenceTestKit with PersistenceRouteTest @@ -74,7 +74,7 @@ trait PersistenceScalatestRouteTest @deprecated("this method has been replaced by findHeader and will be removed", since = "0.3.1.1") def findCookie(name: String): HttpHeader => Option[HttpCookiePair] = { case Cookie(cookies) => cookies.find(_.name == name) - case _ => None + case _ => None } def extractHeaders(headers: Seq[HttpHeader]): Seq[HttpHeader] = { @@ -108,15 +108,15 @@ trait PersistenceScalatestRouteTest } def headerValue(name: String): HttpHeader => Option[String] = { - case Cookie(cookies) => cookies.find(_.name == name).map(_.value) + case Cookie(cookies) => cookies.find(_.name == name).map(_.value) case r: RawHeader if r.name == name => Some(r.value) - case _ => None + case _ => None } def findHeader(name: String): HttpHeader => Option[HttpHeader] = { case c: Cookie if c.cookies.exists(_.name == name) => Some(c) - case other if other.name() == name => Some(other) - case _ => None + case other if other.name() == name => Some(other) + case _ => None } def existHeader(name: String): HttpHeader => Boolean = header => @@ -124,7 +124,7 @@ trait PersistenceScalatestRouteTest } trait InMemoryPersistenceScalatestRouteTest - extends PersistenceScalatestRouteTest + extends PersistenceScalatestRouteTest with InMemoryPersistenceTestKit { _: Suite with ApiRoutes => } @@ -133,7 +133,7 @@ import akka.http.scaladsl.Http import akka.http.scaladsl.client.RequestBuilding import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart import akka.http.scaladsl.model._ -import akka.http.scaladsl.model.headers.{Host, Upgrade, `Sec-WebSocket-Protocol`} +import akka.http.scaladsl.model.headers.{`Sec-WebSocket-Protocol`, Host, Upgrade} import akka.http.scaladsl.server._ import akka.http.scaladsl.settings.ParserSettings import akka.http.scaladsl.settings.RoutingSettings @@ -151,7 +151,11 @@ import scala.concurrent.{Await, ExecutionContext, Future} import scala.reflect.ClassTag import scala.util.DynamicVariable -trait PersistenceRouteTest extends RequestBuilding with WSTestRequestBuilding with RouteTestResultComponent with MarshallingTestUtils { +trait PersistenceRouteTest + extends RequestBuilding + with WSTestRequestBuilding + with RouteTestResultComponent + with MarshallingTestUtils { this: TestFrameworkInterface => /** Override to supply a custom ActorSystem */ @@ -184,9 +188,11 @@ trait PersistenceRouteTest extends RequestBuilding with WSTestRequestBuilding wi if (dynRR.value ne null) dynRR.value else sys.error("This value is only available inside of a `check` construct!") - def check[T](body: => T): RouteTestResult => T = result => dynRR.withValue(result.awaitResult)(body) + def check[T](body: => T): RouteTestResult => T = result => + dynRR.withValue(result.awaitResult)(body) - private def responseSafe = if (dynRR.value ne null) dynRR.value.response else "" + private def responseSafe = + if (dynRR.value ne null) dynRR.value.response else "" def handled: Boolean = result.handled @@ -200,16 +206,26 @@ trait PersistenceRouteTest extends RequestBuilding with WSTestRequestBuilding wi def chunksStream: Source[ChunkStreamPart, Any] = result.chunksStream - def entityAs[T: FromEntityUnmarshaller : ClassTag](implicit timeout: Duration = 1.second): T = { - def msg(e: Throwable) = s"Could not unmarshal entity to type '${implicitly[ClassTag[T]]}' for `entityAs` assertion: $e\n\nResponse was: $responseSafe" + def entityAs[T: FromEntityUnmarshaller: ClassTag](implicit timeout: Duration = 1.second): T = { + def msg(e: Throwable) = + s"Could not unmarshal entity to type '${implicitly[ClassTag[T]]}' for `entityAs` assertion: $e\n\nResponse was: $responseSafe" - Await.result(Unmarshal(responseEntity).to[T].fast.recover[T] { case error => failTest(msg(error)) }, timeout) + Await.result( + Unmarshal(responseEntity).to[T].fast.recover[T] { case error => failTest(msg(error)) }, + timeout + ) } - def responseAs[T: FromResponseUnmarshaller : ClassTag](implicit timeout: Duration = 1.second): T = { - def msg(e: Throwable) = s"Could not unmarshal response to type '${implicitly[ClassTag[T]]}' for `responseAs` assertion: $e\n\nResponse was: $responseSafe" + def responseAs[T: FromResponseUnmarshaller: ClassTag](implicit + timeout: Duration = 1.second + ): T = { + def msg(e: Throwable) = + s"Could not unmarshal response to type '${implicitly[ClassTag[T]]}' for `responseAs` assertion: $e\n\nResponse was: $responseSafe" - Await.result(Unmarshal(response).to[T].fast.recover[T] { case error => failTest(msg(error)) }, timeout) + Await.result( + Unmarshal(response).to[T].fast.recover[T] { case error => failTest(msg(error)) }, + timeout + ) } def contentType: ContentType = rawResponse.entity.contentType @@ -218,11 +234,13 @@ trait PersistenceRouteTest extends RequestBuilding with WSTestRequestBuilding wi def charsetOption: Option[HttpCharset] = contentType.charsetOption - def charset: HttpCharset = charsetOption getOrElse sys.error("Binary entity does not have charset") + def charset: HttpCharset = + charsetOption getOrElse sys.error("Binary entity does not have charset") def headers: immutable.Seq[HttpHeader] = rawResponse.headers - def header[T >: Null <: HttpHeader : ClassTag]: Option[T] = rawResponse.header[T](implicitly[ClassTag[T]]) + def header[T >: Null <: HttpHeader: ClassTag]: Option[T] = + rawResponse.header[T](implicitly[ClassTag[T]]) def header(name: String): Option[HttpHeader] = rawResponse.headers.find(_.is(name.toLowerCase)) @@ -230,28 +248,28 @@ trait PersistenceRouteTest extends RequestBuilding with WSTestRequestBuilding wi def closingExtension: String = chunks.lastOption match { case Some(HttpEntity.LastChunk(extension, _)) => extension - case _ => "" + case _ => "" } def trailer: immutable.Seq[HttpHeader] = chunks.lastOption match { case Some(HttpEntity.LastChunk(_, trailer)) => trailer - case _ => Nil + case _ => Nil } def rejections: immutable.Seq[Rejection] = result.rejections def rejection: Rejection = { val r = rejections - if (r.size == 1) r.head else failTest("Expected a single rejection but got %s (%s)".format(r.size, r)) + if (r.size == 1) r.head + else failTest("Expected a single rejection but got %s (%s)".format(r.size, r)) } def isWebSocketUpgrade: Boolean = status == StatusCodes.SwitchingProtocols && header[Upgrade].exists(_.hasWebSocket) - /** - * Asserts that the received response is a WebSocket upgrade response and the extracts - * the chosen subprotocol and passes it to the handler. - */ + /** Asserts that the received response is a WebSocket upgrade response and the extracts the chosen + * subprotocol and passes it to the handler. + */ def expectWebSocketUpgradeWithProtocol(body: String => Unit): Unit = { if (!isWebSocketUpgrade) failTest("Response was no WebSocket Upgrade response") header[`Sec-WebSocket-Protocol`] match { @@ -260,29 +278,27 @@ trait PersistenceRouteTest extends RequestBuilding with WSTestRequestBuilding wi } } - /** - * A dummy that can be used as `~> runRoute` to run the route but without blocking for the result. - * The result of the pipeline is the result that can later be checked with `check`. See the - * "separate running route from checking" example from ScalatestRouteTestSpec.scala. - */ + /** A dummy that can be used as `~> runRoute` to run the route but without blocking for the + * result. The result of the pipeline is the result that can later be checked with `check`. See + * the "separate running route from checking" example from ScalatestRouteTestSpec.scala. + */ def runRoute: RouteTestResult => RouteTestResult = ConstantFun.scalaIdentityFunction // there is already an implicit class WithTransformation in scope (inherited from akka.http.scaladsl.testkit.TransformerPipelineSupport) // however, this one takes precedence implicit class WithTransformation2(request: HttpRequest) { - /** - * Apply request to given routes for further inspection in `check { }` block. - */ + + /** Apply request to given routes for further inspection in `check { }` block. + */ def ~>[A, B](f: A => B)(implicit ta: TildeArrow[A, B]): ta.Out = ta(request, f) - /** - * Evaluate request against routes run in server mode for further - * inspection in `check { }` block. - * - * Compared to [[~>]], the given routes are run in a fully fledged - * server, which allows more types of directives to be tested at the - * cost of additional overhead related with server setup. - */ + /** Evaluate request against routes run in server mode for further inspection in `check { }` + * block. + * + * Compared to [[~>]], the given routes are run in a fully fledged server, which allows more + * types of directives to be tested at the cost of additional overhead related with server + * setup. + */ def ~!>[A, B](f: A => B)(implicit tba: TildeBangArrow[A, B]): tba.Out = tba(request, f) } @@ -295,7 +311,8 @@ trait PersistenceRouteTest extends RequestBuilding with WSTestRequestBuilding wi case class DefaultHostInfo(host: Host, securedConnection: Boolean) object DefaultHostInfo { - implicit def defaultHost: DefaultHostInfo = DefaultHostInfo(Host("example.com"), securedConnection = false) + implicit def defaultHost: DefaultHostInfo = + DefaultHostInfo(Host("example.com"), securedConnection = false) } object TildeArrow { @@ -305,14 +322,19 @@ trait PersistenceRouteTest extends RequestBuilding with WSTestRequestBuilding wi def apply(request: HttpRequest, f: HttpRequest => HttpRequest) = f(request) } - implicit def injectIntoRoute(implicit timeout: RouteTestTimeout, defaultHostInfo: DefaultHostInfo): TildeArrow[RequestContext, Future[RouteResult]] {type Out = RouteTestResult} = + implicit def injectIntoRoute(implicit + timeout: RouteTestTimeout, + defaultHostInfo: DefaultHostInfo + ): TildeArrow[RequestContext, Future[RouteResult]] { type Out = RouteTestResult } = new TildeArrow[RequestContext, Future[RouteResult]] { type Out = RouteTestResult def apply(request: HttpRequest, route: Route): Out = { if (request.method == HttpMethods.HEAD && ServerSettings(system).transparentHeadRequests) - failTest("`akka.http.server.transparent-head-requests = on` not supported in PersistenceRouteTest using `~>`. Use `~!>` instead " + - "for a full-stack test, e.g. `req ~!> route ~> check {...}`") + failTest( + "`akka.http.server.transparent-head-requests = on` not supported in PersistenceRouteTest using `~>`. Use `~!>` instead " + + "for a full-stack test, e.g. `req ~!> route ~> check {...}`" + ) implicit val executionContext: ExecutionContext = system.classicSystem.dispatcher val routingSettings = RoutingSettings(system) @@ -322,9 +344,15 @@ trait PersistenceRouteTest extends RequestBuilding with WSTestRequestBuilding wi val effectiveRequest = request.withEffectiveUri( securedConnection = defaultHostInfo.securedConnection, - defaultHostHeader = defaultHostInfo.host) + defaultHostHeader = defaultHostInfo.host + ) val parserSettings = ParserSettings.forServer(system) - val ctx = new RequestContextImpl(effectiveRequest, routingLog.requestLog(effectiveRequest), routingSettings, parserSettings) + val ctx = new RequestContextImpl( + effectiveRequest, + routingLog.requestLog(effectiveRequest), + routingSettings, + parserSettings + ) val sealedExceptionHandler = ExceptionHandler.seal(testExceptionHandler) @@ -344,7 +372,10 @@ trait PersistenceRouteTest extends RequestBuilding with WSTestRequestBuilding wi } object TildeBangArrow { - implicit def injectIntoRoute(implicit timeout: RouteTestTimeout, serverSettings: ServerSettings): TildeBangArrow[RequestContext, Future[RouteResult]] {type Out = RouteTestResult} = + implicit def injectIntoRoute(implicit + timeout: RouteTestTimeout, + serverSettings: ServerSettings + ): TildeBangArrow[RequestContext, Future[RouteResult]] { type Out = RouteTestResult } = new TildeBangArrow[RequestContext, Future[RouteResult]] { type Out = RouteTestResult @@ -360,10 +391,15 @@ trait PersistenceRouteTest extends RequestBuilding with WSTestRequestBuilding wi } private[http] object PersistenceRouteTest { - def runRouteClientServer(request: HttpRequest, route: Route, serverSettings: ServerSettings)(implicit system: ActorSystem): Future[HttpResponse] = { + def runRouteClientServer(request: HttpRequest, route: Route, serverSettings: ServerSettings)( + implicit system: ActorSystem + ): Future[HttpResponse] = { import system.dispatcher for { - binding <- Http().newServerAt("127.0.0.1", 0).withSettings(settings = serverSettings).bind(route) + binding <- Http() + .newServerAt("127.0.0.1", 0) + .withSettings(settings = serverSettings) + .bind(route) port = binding.localAddress.getPort targetUri = request.uri.withHost("127.0.0.1").withPort(port).withScheme("http") diff --git a/session/testkit/src/main/scala/app/softnetwork/session/scalatest/RefreshableSessionTestKit.scala b/session/testkit/src/main/scala/app/softnetwork/session/scalatest/RefreshableSessionTestKit.scala index 26362a9..e816fe7 100644 --- a/session/testkit/src/main/scala/app/softnetwork/session/scalatest/RefreshableSessionTestKit.scala +++ b/session/testkit/src/main/scala/app/softnetwork/session/scalatest/RefreshableSessionTestKit.scala @@ -19,7 +19,8 @@ trait RefreshableSessionTestKit[T <: SessionData with SessionDataDecorator[T]] value match { case Some(value) => refreshable.refreshTokenManager - .sessionFromValue(value).complete() match { + .sessionFromValue(value) + .complete() match { case Success(value) => value match { case _ @SessionResult.CreatedFromToken(session) => Some(session)