Skip to content

Commit eb4e376

Browse files
committed
add a unit test for the LambdaHTTPServer Pool
1 parent 3ce0a87 commit eb4e376

File tree

3 files changed

+143
-3
lines changed

3 files changed

+143
-3
lines changed

Package@swift-6.0.swift

+6-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,12 @@ let package = Package(
5656
.byName(name: "AWSLambdaRuntime"),
5757
.product(name: "NIOTestUtils", package: "swift-nio"),
5858
.product(name: "NIOFoundationCompat", package: "swift-nio"),
59-
]
59+
],
60+
swiftSettings: [
61+
.define("FoundationJSONSupport"),
62+
.define("ServiceLifecycleSupport"),
63+
.define("LocalServerSupport"),
64+
]
6065
),
6166
// for perf testing
6267
.executableTarget(

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ extension Lambda {
7575
/// 1. POST /invoke - the client posts the event to the lambda function
7676
///
7777
/// This server passes the data received from /invoke POST request to the lambda function (GET /next) and then forwards the response back to the client.
78-
private struct LambdaHTTPServer {
78+
internal struct LambdaHTTPServer {
7979
private let invocationEndpoint: String
8080

8181
private let invocationPool = Pool<LocalServerInvocation>()
@@ -425,7 +425,7 @@ private struct LambdaHTTPServer {
425425
/// A shared data structure to store the current invocation or response requests and the continuation objects.
426426
/// This data structure is shared between instances of the HTTPHandler
427427
/// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function).
428-
private final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable {
428+
internal final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable {
429429
typealias Element = T
430430

431431
enum State: ~Copyable {
+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import Testing
2+
@testable import AWSLambdaRuntime
3+
4+
struct PoolTests {
5+
6+
@Test
7+
func testBasicPushAndIteration() async throws {
8+
let pool = LambdaHTTPServer.Pool<String>()
9+
10+
// Push values
11+
await pool.push("first")
12+
await pool.push("second")
13+
14+
// Iterate and verify order
15+
var values = [String]()
16+
for try await value in pool {
17+
values.append(value)
18+
if values.count == 2 { break }
19+
}
20+
21+
#expect(values == ["first", "second"])
22+
}
23+
24+
@Test
25+
func testCancellation() async throws {
26+
let pool = LambdaHTTPServer.Pool<String>()
27+
28+
// Create a task that will be cancelled
29+
let task = Task {
30+
for try await _ in pool {
31+
Issue.record("Should not receive any values after cancellation")
32+
}
33+
}
34+
35+
// Cancel the task immediately
36+
task.cancel()
37+
38+
// This should complete without receiving any values
39+
try await task.value
40+
}
41+
42+
@Test
43+
func testConcurrentPushAndIteration() async throws {
44+
let pool = LambdaHTTPServer.Pool<Int>()
45+
let iterations = 1000
46+
var receivedValues = Set<Int>()
47+
48+
// Start consumer task first
49+
let consumer = Task {
50+
var count = 0
51+
for try await value in pool {
52+
receivedValues.insert(value)
53+
count += 1
54+
if count >= iterations { break }
55+
}
56+
}
57+
58+
// Create multiple producer tasks
59+
try await withThrowingTaskGroup(of: Void.self) { group in
60+
for i in 0..<iterations {
61+
group.addTask {
62+
await pool.push(i)
63+
}
64+
}
65+
try await group.waitForAll()
66+
}
67+
68+
// Wait for consumer to complete
69+
try await consumer.value
70+
71+
// Verify all values were received exactly once
72+
#expect(receivedValues.count == iterations)
73+
#expect(Set(0..<iterations) == receivedValues)
74+
}
75+
76+
@Test
77+
func testPushToWaitingConsumer() async throws {
78+
let pool = LambdaHTTPServer.Pool<String>()
79+
let expectedValue = "test value"
80+
81+
// Start a consumer that will wait for a value
82+
let consumer = Task {
83+
for try await value in pool {
84+
#expect(value == expectedValue)
85+
break
86+
}
87+
}
88+
89+
// Give consumer time to start waiting
90+
try await Task.sleep(nanoseconds: 100_000_000) // 0.1 seconds
91+
92+
// Push a value
93+
await pool.push(expectedValue)
94+
95+
// Wait for consumer to complete
96+
try await consumer.value
97+
}
98+
99+
@Test
100+
func testStressTest() async throws {
101+
let pool = LambdaHTTPServer.Pool<Int>()
102+
let producerCount = 10
103+
let messagesPerProducer = 1000
104+
var receivedValues = [Int]()
105+
106+
// Start consumer
107+
let consumer = Task {
108+
var count = 0
109+
for try await value in pool {
110+
receivedValues.append(value)
111+
count += 1
112+
if count >= producerCount * messagesPerProducer { break }
113+
}
114+
}
115+
116+
// Create multiple producers
117+
try await withThrowingTaskGroup(of: Void.self) { group in
118+
for p in 0..<producerCount {
119+
group.addTask {
120+
for i in 0..<messagesPerProducer {
121+
await pool.push(p * messagesPerProducer + i)
122+
}
123+
}
124+
}
125+
try await group.waitForAll()
126+
}
127+
128+
// Wait for consumer to complete
129+
try await consumer.value
130+
131+
// Verify we received all values
132+
#expect(receivedValues.count == producerCount * messagesPerProducer)
133+
#expect(Set(receivedValues).count == producerCount * messagesPerProducer)
134+
}
135+
}

0 commit comments

Comments
 (0)