-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbuffer_replace.go
More file actions
243 lines (227 loc) · 9.34 KB
/
Copy pathbuffer_replace.go
File metadata and controls
243 lines (227 loc) · 9.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
// Bulk prepare-then-commit pipeline used by /warm and /rebuild.
//
// The shape is: build a complete replacement state OFF the buffer
// (validate every item, assign seqs, compute size), then commit it
// atomically under b.mu in a single state-swap. This separation is
// what lets multi-scope /warm be all-or-nothing — every scope is
// validated and built before any is committed; if any scope fails
// validation the existing state is untouched.
//
// Two commit variants exist:
//
// - commitReplacement: stand-alone commit; computes its byte delta
// against the buffer's current b.bytes under the lock. Used by
// replaceAll (single-scope test path).
//
// - commitReplacementPreReserved: batch-aware commit used by
// Store.replaceScopes. The batch has already CAS-reserved its net
// delta against totalBytes; this variant reconciles drift caused
// by concurrent writes between snapshot and commit, but does NOT
// re-add the delta itself.
//
// The drift-handling math in commitReplacementPreReserved is
// correctness-sensitive — concurrent writes between snapshot and
// commit must reconcile against the batch's pre-reserved delta.
// Re-run TestStore_ReplaceScopes_RaceVsWipe and
// TestStore_ReplaceScopes_RaceVsRebuild under stress after changes.
package scopecache
import (
"errors"
"time"
)
// scopeReplacement holds a fully built scope state ready to be atomically
// swapped into a scopeBuffer. Separating "prepare" from "commit" lets callers
// like /warm and /rebuild validate every scope up-front and only mutate state
// once they know all scopes will succeed.
type scopeReplacement struct {
items []*Item
byID map[string]*Item
lastSeq uint64
// storage is the single backing block the items pointers point into
// (&storage[i]). One alloc for the whole batch instead of one per
// item; the committing buffer adopts it as its sole Item chunk.
storage []Item
}
// buildReplacementState converts a caller-supplied item list into the
// internal state a scope buffer can adopt atomically. Callers are expected
// to have already enforced the per-scope capacity; this function does not
// trim — if len(items) exceeds the cap it would simply build an over-full
// state. The capacity check lives in the Store layer so one place owns it.
//
// canonicalScope is the shard-map key the buffer will hold this state
// under; every built item.Scope is set to it so all items share one
// scope-string allocation instead of N fresh per-request copies.
func buildReplacementState(canonicalScope string, items []Item) (scopeReplacement, error) {
if len(items) == 0 {
return scopeReplacement{
items: []*Item{},
byID: make(map[string]*Item),
}, nil
}
// Pre-count id'd items so byID is sized exactly in one allocation.
// byID then doubles as the duplicate-id detector during the build —
// a collision on insert IS a duplicate — so the separate `seen` set
// the old two-loop form allocated (a second full id-keyed map, pure
// transient overhead on an id'd batch) is gone.
nonEmptyIDs := 0
for i := range items {
if items[i].ID != "" {
nonEmptyIDs++
}
}
built := make([]*Item, 0, len(items))
byID := make(map[string]*Item, nonEmptyIDs)
// Single backing block for the whole batch; pointers below index
// into it and stay valid because storage never reallocs.
storage := make([]Item, len(items))
next := 0
// seq is a cache-local cursor that is NOT stable across /warm or /rebuild.
// We regenerate it from 1 for every call so scope buffers have monotonic,
// dense seq values even when the input items came from elsewhere.
//
// ts is cache-owned: every item in a /warm or /rebuild batch is stamped
// with the same now() value. The cache cannot honestly recover "when did
// this item originally arrive in the universe" from a rebuild input —
// that's source-of-truth metadata. Stamping now() captures the only
// time the cache itself can attest to: when it received this batch.
nowUs := time.Now().UnixMicro()
var lastSeq uint64
for _, src := range items {
lastSeq++
item := src
item.Seq = lastSeq
item.Ts = nowUs
// Canonicalise scope onto the shared map-key string; see scope
// field comment in buffer.go.
item.Scope = canonicalScope
// /warm and /rebuild's per-item validateWriteItem already filled
// rawBytes for string payloads; recompute defensively for
// internal callers / tests that bypass the validator.
if item.rawBytes == nil {
item.rawBytes = precomputeRawBytes(item.Payload)
}
// One block slot shared by built and byID; &storage[next] is
// stable for the item's lifetime. Seq lookups go through
// indexBySeqLocked on `built` post-commit — no per-item map
// insertion here.
p := &storage[next]
next++
*p = item
built = append(built, p)
if item.ID != "" {
if _, ok := byID[item.ID]; ok {
return scopeReplacement{}, errors.New("duplicate 'id' value within scope: '" + item.ID + "'")
}
byID[item.ID] = p
}
}
return scopeReplacement{
items: built,
byID: byID,
lastSeq: lastSeq,
storage: storage,
}, nil
}
// sumItemBytes returns the total approxItemSize across a flat item slice.
// Used by batch operations to compute per-plan newBytes before commit.
func sumItemBytes(items []*Item) int64 {
var n int64
for i := range items {
n += approxItemSize(*items[i])
}
return n
}
// commitReplacement atomically swaps the scope's state and adjusts the store
// byte counter by the *actual* delta (newBytes - b.bytes at commit time).
// Reading b.bytes under b.mu here makes the commit robust against a
// concurrent /append that completed between the caller's pre-check and this
// commit: any bytes it added to the store counter are cancelled out by the
// fresh delta, because its item is being replaced anyway.
//
// The caller must have already validated and built the replacement
// via buildReplacementState. Both commit variants are infallible
// after that point — that's what lets the broader prepare-then-
// commit pipeline (see file header) give /warm and /rebuild their
// all-or-nothing semantics.
func (b *scopeBuffer) commitReplacement(r scopeReplacement, newBytes int64) {
b.mu.Lock()
defer b.mu.Unlock()
now := time.Now().UnixMicro()
b.store.totalBytes.Add(newBytes - b.bytes)
// itemDelta uses the CURRENT len(b.items) under b.mu, not a
// pre-snapshot, so a stale-pointer concurrent /append that
// landed between the caller's snapshot and this commit is
// folded into the delta naturally — its +1 to totalItems is
// undone here because its item is being discarded by the
// swap. No drift parameter needed (unlike newBytes - oldSnapshot
// for bytes, which is pre-reserved in the PreReserved variant).
b.store.totalItems.Add(int64(len(r.items)) - int64(len(b.items)))
b.store.bumpLastWriteTS(now)
b.bytes = newBytes
b.items = r.items
b.byID = r.byID
b.lastSeq = r.lastSeq
b.bumpLastWriteTSLocked(now)
// Adopt the batch's backing block as the sole chunk; the old chunks
// (with the replaced Items) drop to GC.
b.adoptStorageLocked(r.storage)
}
// commitReplacementPreReserved is the batch-aware commit used by
// Store.replaceScopes. The caller has already atomically reserved
// (newBytes - oldSnapshot) bytes against the store counter via reserveBytes,
// so this commit must NOT re-add that delta; it only releases drift caused
// by concurrent writes to this scope between the snapshot and the commit,
// which keeps the store-wide byte cap strict across batch replacements.
//
// Drift handling, using oldSnapshot (b.bytes as read under RLock during
// the batch's cap check):
//
// - Concurrent /append on this scope in the window: b.bytes grew by +X
// and the appender did totalBytes.Add(+X). Drift = b.bytes - oldSnapshot
// = X; we Add(-X), releasing that reservation (the appended item gets
// discarded by the replacement anyway).
// - Concurrent /delete on this scope in the window: b.bytes shrank by Y
// and the deleter did totalBytes.Add(-Y). Drift is negative; Add(-drift)
// is positive, compensating for the extra release so the scope's net
// contribution to totalBytes is exactly (newBytes - oldSnapshot).
// - No concurrent activity: drift = 0, no counter adjustment.
func (b *scopeBuffer) commitReplacementPreReserved(r scopeReplacement, newBytes int64, oldSnapshot int64) {
b.mu.Lock()
defer b.mu.Unlock()
now := time.Now().UnixMicro()
drift := b.bytes - oldSnapshot
if drift != 0 {
b.store.totalBytes.Add(-drift)
}
// totalItems has no pre-reservation: len(b.items) under the
// lock captures any concurrent /append's contribution
// naturally — its item is being discarded by the swap.
b.store.totalItems.Add(int64(len(r.items)) - int64(len(b.items)))
b.store.bumpLastWriteTS(now)
b.bytes = newBytes
b.items = r.items
b.byID = r.byID
b.lastSeq = r.lastSeq
b.bumpLastWriteTSLocked(now)
// Adopt the batch's backing block as the sole chunk; the old chunks
// (with the replaced Items) drop to GC.
b.adoptStorageLocked(r.storage)
}
func (b *scopeBuffer) replaceAll(items []Item) ([]Item, error) {
if b.itemCapExceeded(len(items)) {
return nil, &ScopeFullError{Count: len(items), Cap: b.store.defaultMaxItems}
}
r, err := buildReplacementState(b.scope, items)
if err != nil {
return nil, err
}
newBytes := sumItemBytes(r.items)
b.commitReplacement(r, newBytes)
b.mu.RLock()
defer b.mu.RUnlock()
out := make([]Item, len(b.items))
for i, p := range b.items {
out[i] = *p
}
return out, nil
}