diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2279c8f..f365e55 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,12 +15,7 @@ jobs: fail-fast: false matrix: dart: [3.6, 3.12] - package: [cli_tools, config, serverpod_logging] - exclude: - - package: serverpod_logging - include: - - package: serverpod_logging - dart: 3.10.3 + package: [cli_tools, config, isolated_object, serverpod_logging] runs-on: ubuntu-latest defaults: run: @@ -40,7 +35,7 @@ jobs: fail-fast: false matrix: dart: [3.6, 3.12] - package: [cli_tools, config, serverpod_logging] + package: [cli_tools, config, isolated_object, serverpod_logging] platform: [ubuntu-latest] include: - package: cli_tools diff --git a/.github/workflows/publish-isolated_object.yaml b/.github/workflows/publish-isolated_object.yaml new file mode 100644 index 0000000..d55bb09 --- /dev/null +++ b/.github/workflows/publish-isolated_object.yaml @@ -0,0 +1,16 @@ +name: Publish isolated_object package + +on: + push: + tags: + # Matches tags like isolated_object-v1.2.3 and isolated_object-v1.2.3-pre.1 + - 'isolated_object-v[0-9]+.[0-9]+.[0-9]+' + - 'isolated_object-v[0-9]+.[0-9]+.[0-9]+-*' + +jobs: + publish: + permissions: + id-token: write + uses: dart-lang/setup-dart/.github/workflows/publish.yml@v1 + with: + working-directory: packages/isolated_object diff --git a/packages/isolated_object/CHANGELOG.md b/packages/isolated_object/CHANGELOG.md new file mode 100644 index 0000000..a0712a7 --- /dev/null +++ b/packages/isolated_object/CHANGELOG.md @@ -0,0 +1,3 @@ +## 0.1.0 + +- Initial version. diff --git a/packages/isolated_object/LICENSE b/packages/isolated_object/LICENSE new file mode 100644 index 0000000..dcdffe5 --- /dev/null +++ b/packages/isolated_object/LICENSE @@ -0,0 +1,28 @@ +BSD 3-Clause License + +Copyright (c) 2024, Serverpod + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/packages/isolated_object/README.md b/packages/isolated_object/README.md new file mode 100644 index 0000000..f3cfd18 --- /dev/null +++ b/packages/isolated_object/README.md @@ -0,0 +1,24 @@ +# isolated_object + +Wraps an object in a dedicated isolate and forwards method calls to it, so +timer- or event-loop-driven work (e.g. progress spinners) keeps animating even +when the calling isolate's event loop is blocked by heavy synchronous work. + +Dependency-free. + +## Usage + +```dart +import 'package:isolated_object/isolated_object.dart'; + +final counter = IsolatedObject(() => Counter()); + +await counter.evaluate((c) => c.increment()); +final value = await counter.evaluate((c) => c.value); + +await counter.close(); +``` + +The factory runs inside the child isolate; each `evaluate` forwards a closure +to that isolate and returns the result. `close()` shuts the isolate down, +failing any in-flight calls rather than letting them hang. diff --git a/packages/isolated_object/analysis_options.yaml b/packages/isolated_object/analysis_options.yaml new file mode 100644 index 0000000..c249a90 --- /dev/null +++ b/packages/isolated_object/analysis_options.yaml @@ -0,0 +1,14 @@ +include: package:serverpod_lints/cli.yaml + +analyzer: + language: + strict-raw-types: false + errors: + inference_failure_on_instance_creation: ignore + inference_failure_on_function_invocation: ignore + inference_failure_on_untyped_parameter: ignore + prefer_final_parameters: ignore + +linter: + rules: + prefer_relative_imports: true diff --git a/packages/isolated_object/lib/isolated_object.dart b/packages/isolated_object/lib/isolated_object.dart new file mode 100644 index 0000000..0636d61 --- /dev/null +++ b/packages/isolated_object/lib/isolated_object.dart @@ -0,0 +1,4 @@ +/// Wraps an object in a dedicated isolate, forwarding method calls to it. +library; + +export 'src/isolated_object.dart'; diff --git a/packages/isolated_object/lib/src/isolated_object.dart b/packages/isolated_object/lib/src/isolated_object.dart new file mode 100644 index 0000000..429677b --- /dev/null +++ b/packages/isolated_object/lib/src/isolated_object.dart @@ -0,0 +1,156 @@ +import 'dart:async'; +import 'dart:isolate'; + +/// Synchronous or asynchronous producer of a [T]. Used by [IsolatedObject] +/// to create the wrapped object inside the child isolate. +typedef Factory = FutureOr Function(); + +typedef _Action = ({int id, dynamic Function(T) function}); +typedef _Response = ({int id, dynamic result}); +typedef _Inflight = Map; +typedef _Setup = (SendPort, ReceivePort, _Inflight); + +/// Wraps an object of type [T] in a dedicated isolate, allowing method calls +/// to be forwarded via [evaluate]. +class IsolatedObject { + final Future<_Setup> _connected; + + /// Spawns a dedicated isolate, constructs a [T] there via [create], and + /// returns immediately. Subsequent [evaluate] calls forward work to that + /// isolate. + /// + /// If [keepIsolateAlive] is true (the default), the parent isolate stays + /// alive as long as this object is open, matching the usual service-style + /// lifecycle. Set to false for fire-and-forget isolates that shouldn't + /// block process shutdown on their own. + IsolatedObject(Factory create, {bool keepIsolateAlive = true}) + : _connected = _connect(create, keepIsolateAlive); + + static Future<_Setup> _connect( + Factory create, + bool keepIsolateAlive, + ) async { + final parentPort = RawReceivePort()..keepIsolateAlive = keepIsolateAlive; + final setupDone = Completer<_Setup>(); + + parentPort.handler = (dynamic message) async { + if (message case final RemoteError e) { + parentPort.close(); + setupDone.completeError(e, e.stackTrace); + } else { + final toChild = message as SendPort; + final fromChild = ReceivePort.fromRawReceivePort(parentPort); + final inflight = _Inflight(); + setupDone.complete((toChild, fromChild, inflight)); + } + }; + + try { + await _spawn(create, parentPort.sendPort); + } catch (_) { + parentPort.close(); + rethrow; + } + + final result = await setupDone.future; + final (toChild, fromChild, inflight) = result; + + fromChild.listen( + (dynamic message) async { + if (message case final _Response response) { + final completer = inflight.remove(response.id); + assert(completer != null, 'PROTOCOL BUG. No such ID ${response.id}'); + if (completer == null) return; + switch (response.result) { + case final RemoteError e: + completer.completeError(e, e.stackTrace); + default: + completer.complete(await response.result); + } + } + }, + onDone: () { + // ReceivePort closed. Fail any pending requests to avoid hangs. + for (final c in inflight.values) { + if (!c.isCompleted) { + c.completeError(StateError('IsolatedObject<$T> channel closed')); + } + } + inflight.clear(); + }, + ); + + return result; + } + + static Future _spawn(Factory create, SendPort toParent) { + return Isolate.spawn((SendPort toParent) async { + final childPort = ReceivePort(); + final T isolatedObject; + try { + isolatedObject = await create(); + toParent.send(childPort.sendPort); + } catch (e, st) { + toParent.send(RemoteError('$e', '$st')); + return; + } + + await for (final message in childPort) { + if (message == null) { + childPort.close(); + break; + } else if (message case final _Action action) { + try { + final result = await action.function(isolatedObject); + toParent.send((id: action.id, result: result)); + } catch (e, st) { + toParent.send((id: action.id, result: RemoteError('$e', '$st'))); + } + } + } + }, toParent); + } + + int _nextId = 0; + bool _isClosed = false; + + /// Whether [close] has been called. Once true, further [evaluate] calls + /// throw a [StateError]. + bool get isClosed => _isClosed; + + /// Evaluates [function] on the isolated object and returns the result. + Future evaluate(FutureOr Function(T) function) async { + if (_isClosed) { + throw StateError('IsolatedObject<$T> is closed'); + } + + final (toChild, _, inflight) = await _connected; + + final id = _nextId++; + final completer = Completer(); + inflight[id] = completer; + + toChild.send((id: id, function: function)); + return completer.future; + } + + /// Shuts down the isolate. Idempotent. Any outstanding [evaluate] calls are + /// failed with a [StateError] rather than left to hang. + Future close() async { + if (_isClosed) return; + _isClosed = true; + + final (toChild, fromChild, inflight) = await _connected; + + // Fail any outstanding requests. + for (final c in inflight.values) { + if (!c.isCompleted) { + c.completeError(StateError('$runtimeType is closed')); + } + } + inflight.clear(); + + toChild.send(null); + fromChild.close(); + } +} diff --git a/packages/isolated_object/pubspec.yaml b/packages/isolated_object/pubspec.yaml new file mode 100644 index 0000000..ee8a1b5 --- /dev/null +++ b/packages/isolated_object/pubspec.yaml @@ -0,0 +1,18 @@ +name: isolated_object +description: >- + Wraps an object in a dedicated isolate and forwards method calls to it, so + timer- or event-loop-driven work (e.g. progress spinners) keeps running even + when the calling isolate's event loop is blocked. Dependency-free. +version: 0.1.0 +repository: https://github.com/serverpod/serverpod +homepage: https://serverpod.dev +issue_tracker: https://github.com/serverpod/serverpod/issues + +environment: + sdk: ^3.6.0 + +resolution: workspace + +dev_dependencies: + serverpod_lints: '>=2.7.0' + test: ^1.25.6 diff --git a/packages/isolated_object/test/close_test.dart b/packages/isolated_object/test/close_test.dart new file mode 100644 index 0000000..3c567d6 --- /dev/null +++ b/packages/isolated_object/test/close_test.dart @@ -0,0 +1,55 @@ +// Ported from relic's isolated_object_close_test.dart. The pending-operation +// message assertion is relaxed to `contains('closed')` so it holds for the +// canonical (serverpod) teardown, which fails in-flight calls with +// " is closed" rather than relic's "channel closed". +import 'dart:async'; + +import 'package:isolated_object/isolated_object.dart'; +import 'package:test/test.dart'; + +void main() { + test( + 'Given an IsolatedObject, ' + 'when close is called multiple times, ' + 'then it handles it gracefully', () async { + final isolated = IsolatedObject<_Counter>(() => _Counter(0)); + + await isolated.close(); + await isolated.close(); // Second close should not throw. + + expect(isolated.isClosed, isTrue); + }); + + test( + 'Given an IsolatedObject with pending operations, ' + 'when it is closed, ' + 'then pending operations fail with a closed error', () async { + final isolated = IsolatedObject<_Counter>(() => _Counter(0)); + + final pendingOperation = isolated.evaluate((counter) async { + await Future.delayed(const Duration(seconds: 10)); + return counter.value; + }); + + // Give the operation time to register as inflight. + await Future.delayed(const Duration(milliseconds: 50)); + + await isolated.close(); + + await expectLater( + pendingOperation, + throwsA( + isA().having( + (e) => e.message, + 'message', + contains('closed'), + ), + ), + ); + }); +} + +class _Counter { + int value; + _Counter(this.value); +} diff --git a/packages/isolated_object/test/create_test.dart b/packages/isolated_object/test/create_test.dart new file mode 100644 index 0000000..e830ef9 --- /dev/null +++ b/packages/isolated_object/test/create_test.dart @@ -0,0 +1,55 @@ +// Ported from relic's isolated_object_create_test.dart. +import 'dart:async'; + +import 'package:isolated_object/isolated_object.dart'; +import 'package:test/test.dart'; + +void main() { + test( + 'Given a synchronous factory, ' + 'when IsolatedObject is created, ' + 'then it successfully initializes', () async { + final isolated = IsolatedObject<_Counter>(() => _Counter(0)); + + final result = await isolated.evaluate((counter) => counter.value); + expect(result, 0); + + await isolated.close(); + }); + + test( + 'Given an async factory, ' + 'when IsolatedObject is created, ' + 'then it successfully initializes', () async { + final isolated = IsolatedObject<_Counter>(() async { + await Future.delayed(const Duration(milliseconds: 10)); + return _Counter(42); + }); + + final result = await isolated.evaluate((counter) => counter.value); + expect(result, 42); + + await isolated.close(); + }); + + test( + 'Given a factory that throws, ' + 'when IsolatedObject is created, ' + 'then subsequent operations fail gracefully', () async { + final isolated = IsolatedObject<_Counter>(() { + throw Exception('Factory failed'); + }); + + await expectLater( + isolated.evaluate((counter) => counter.value), + throwsA(anything), + ); + }); +} + +class _Counter { + int value; + _Counter(this.value); + + void increment() => value++; +} diff --git a/packages/isolated_object/test/evaluate_test.dart b/packages/isolated_object/test/evaluate_test.dart new file mode 100644 index 0000000..cfa8424 --- /dev/null +++ b/packages/isolated_object/test/evaluate_test.dart @@ -0,0 +1,255 @@ +// Ported from relic's isolated_object_evaluate_test.dart. +import 'dart:async'; +import 'dart:isolate'; + +import 'package:isolated_object/isolated_object.dart'; +import 'package:test/test.dart'; + +void main() { + group('evaluate', () { + late IsolatedObject<_Counter> isolated; + + setUp(() { + isolated = IsolatedObject<_Counter>(() => _Counter(0)); + }); + + tearDown(() async { + await isolated.close(); + }); + + test( + 'Given an IsolatedObject, ' + 'when evaluate is called with a getter, ' + 'then it returns the correct value', () async { + final result = await isolated.evaluate((counter) => counter.value); + expect(result, 0); + }); + + test( + 'Given an IsolatedObject, ' + 'when evaluate is called with a mutation, ' + 'then the state persists across evaluations', () async { + await isolated.evaluate((counter) { + counter.increment(); + return null; + }); + + final result = await isolated.evaluate((counter) => counter.value); + expect(result, 1); + }); + + test( + 'Given an IsolatedObject, ' + 'when evaluate is called multiple times sequentially, ' + 'then all operations execute correctly', () async { + await isolated.evaluate((counter) => counter.increment()); + await isolated.evaluate((counter) => counter.increment()); + await isolated.evaluate((counter) => counter.increment()); + + final result = await isolated.evaluate((counter) => counter.value); + expect(result, 3); + }); + + test( + 'Given an IsolatedObject, ' + 'when evaluate is called with multiple concurrent operations, ' + 'then all operations complete successfully', () async { + final futures = List.generate( + 10, + (i) => isolated.evaluate((counter) => counter.increment()), + ); + + await futures.wait; + + final result = await isolated.evaluate((counter) => counter.value); + expect(result, 10); + }); + + test( + 'Given an IsolatedObject, ' + 'when evaluate is called with different return types, ' + 'then it returns the correct types', () async { + final intResult = await isolated.evaluate( + (counter) => counter.value, + ); + expect(intResult, isA()); + expect(intResult, 0); + + final stringResult = await isolated.evaluate( + (counter) => 'Value: ${counter.value}', + ); + expect(stringResult, isA()); + expect(stringResult, 'Value: 0'); + + final boolResult = await isolated.evaluate( + (counter) => counter.value > 0, + ); + expect(boolResult, isA()); + expect(boolResult, false); + }); + + test( + 'Given an IsolatedObject, ' + 'when evaluate is called with async function, ' + 'then it awaits the result correctly', () async { + await isolated.evaluate((counter) async { + await Future.delayed(const Duration(milliseconds: 10)); + counter.increment(); + }); + + final result = await isolated.evaluate((counter) => counter.value); + expect(result, 1); + }); + + test( + 'Given an IsolatedObject, ' + 'when evaluate is called with a void function that throws, ' + 'then it propagates the error', () async { + await expectLater( + isolated.evaluate((counter) { + throw Exception('Test error'); + }), + throwsA(isA()), + ); + }); + }); + + group('error handling', () { + test( + 'Given an IsolatedObject, ' + 'when a function throws in the isolate, ' + 'then RemoteError contains error information', () async { + final isolated = IsolatedObject<_Counter>(() => _Counter(0)); + + try { + await isolated.evaluate((counter) { + throw StateError('Custom error message'); + }); + } catch (e) { + expect(e, isA()); + expect(e.toString(), contains('Custom error message')); + } finally { + await isolated.close(); + } + }); + + test( + 'Given an IsolatedObject, ' + 'when an error occurs, ' + 'then subsequent operations still work', () async { + final isolated = IsolatedObject<_Counter>(() => _Counter(5)); + + try { + await isolated.evaluate((counter) { + throw Exception('First error'); + }); + } catch (_) { + // Expected. + } + + final result = await isolated.evaluate((counter) => counter.value); + expect(result, 5); + + await isolated.close(); + }); + }); + + group('concurrent operations', () { + test( + 'Given an IsolatedObject, ' + 'when many operations are queued concurrently, ' + 'then all complete with unique IDs', () async { + final isolated = IsolatedObject<_Counter>(() => _Counter(0)); + + final futures = >[]; + for (var i = 0; i < 100; i++) { + futures.add( + isolated.evaluate((counter) { + counter.increment(); + return counter.value; + }), + ); + } + + final results = await futures.wait; + expect(results.length, 100); + + final finalValue = await isolated.evaluate((counter) => counter.value); + expect(finalValue, 100); + + await isolated.close(); + }); + + test( + 'Given an IsolatedObject, ' + 'when concurrent operations include some that throw, ' + 'then successful operations still complete', () async { + final isolated = IsolatedObject<_Counter>(() => _Counter(0)); + + const ops = 10; + try { + await [ + for (var i = 0; i < ops; i++) + i % 2 == 0 + ? isolated.evaluate((counter) => counter.increment()) + : isolated.evaluate((_) => throw Exception('Error $i')), + ].wait; + } on ParallelWaitError> catch (e) { + expect(e.errors.nonNulls.length, ops / 2); + expect(e.values.nonNulls.length, ops / 2); + } + + final result = await isolated.evaluate((counter) => counter.value); + expect(result, ops / 2); + + await isolated.close(); + }); + }); + + group('complex objects', () { + test( + 'Given an IsolatedObject with a complex state object, ' + 'when operations modify nested state, ' + 'then changes persist correctly', () async { + final isolated = IsolatedObject<_ComplexObject>(() => _ComplexObject()); + + await isolated.evaluate((obj) { + obj.data['key1'] = 'value1'; + obj.data['key2'] = 'value2'; + }); + + final result = await isolated.evaluate((obj) => obj.data); + expect(result, {'key1': 'value1', 'key2': 'value2'}); + + await isolated.close(); + }); + + test( + 'Given an IsolatedObject with state, ' + 'when evaluating functions that return lists, ' + 'then lists are correctly transferred', () async { + final isolated = IsolatedObject<_ComplexObject>(() => _ComplexObject()); + + await isolated.evaluate((obj) { + obj.items.addAll([1, 2, 3, 4, 5]); + }); + + final result = await isolated.evaluate((obj) => obj.items); + expect(result, [1, 2, 3, 4, 5]); + + await isolated.close(); + }); + }); +} + +class _Counter { + int value; + _Counter(this.value); + + void increment() => value++; +} + +class _ComplexObject { + final Map data = {}; + final List items = []; +} diff --git a/packages/isolated_object/test/lifecycle_test.dart b/packages/isolated_object/test/lifecycle_test.dart new file mode 100644 index 0000000..890f8f6 --- /dev/null +++ b/packages/isolated_object/test/lifecycle_test.dart @@ -0,0 +1,49 @@ +// New tests covering serverpod's superset semantics that relic's suite (built +// against the leaner copy) does not exercise: the isClosed guard, use-after- +// close, and the keepIsolateAlive flag. +import 'package:isolated_object/isolated_object.dart'; +import 'package:test/test.dart'; + +void main() { + test( + 'Given an open IsolatedObject, ' + 'then isClosed is false until close is called', () async { + final isolated = IsolatedObject<_Counter>(() => _Counter(0)); + + expect(isolated.isClosed, isFalse); + await isolated.close(); + expect(isolated.isClosed, isTrue); + }); + + test( + 'Given a closed IsolatedObject, ' + 'when evaluate is called, ' + 'then it throws a StateError', () async { + final isolated = IsolatedObject<_Counter>(() => _Counter(0)); + await isolated.close(); + + expect( + () => isolated.evaluate((counter) => counter.value), + throwsA(isA()), + ); + }); + + test( + 'Given keepIsolateAlive: false, ' + 'when the object is used, ' + 'then it still evaluates correctly', () async { + final isolated = IsolatedObject<_Counter>( + () => _Counter(7), + keepIsolateAlive: false, + ); + + expect(await isolated.evaluate((counter) => counter.value), 7); + + await isolated.close(); + }); +} + +class _Counter { + int value; + _Counter(this.value); +} diff --git a/packages/serverpod_logging/pubspec.yaml b/packages/serverpod_logging/pubspec.yaml index 09044f7..0c5a294 100644 --- a/packages/serverpod_logging/pubspec.yaml +++ b/packages/serverpod_logging/pubspec.yaml @@ -11,6 +11,7 @@ environment: resolution: workspace dependencies: + isolated_object: ^0.1.0 meta: ^1.15.0 dev_dependencies: diff --git a/pubspec.yaml b/pubspec.yaml index 8d13910..6b4b966 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -9,6 +9,7 @@ workspace: - packages/config - packages/cli_tools - packages/serverpod_logging + - packages/isolated_object topics: - cli