-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbuffer_write.go
More file actions
264 lines (235 loc) · 8.82 KB
/
Copy pathbuffer_write.go
File metadata and controls
264 lines (235 loc) · 8.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
// Single-item write paths on *scopeBuffer:
//
// - appendItem — insert a fresh item; rejects on dup id, capacity, or byte cap
// - upsertByID — insert-or-replace by id; replace-whole-item on hit
// - updateByID — modify payload at an existing id
// - updateBySeq — same, addressed by seq
//
// All four take b.mu exclusively, check b.detached first, and route
// their byte-budget reservation through s.reserveBytes. Shared
// helpers (precomputeRawBytes, indexBySeqLocked,
// reservePayloadDeltaLocked, replaceItemFieldsLocked) live in
// buffer_locked.go. insertNewItemLocked at the bottom is the local
// helper that collapses the fresh-insert pipeline shared by
// appendItem and upsertByID's miss-branch.
//
// Every successful write stores a fresh microsecond Ts under b.mu
// before storing or replacing — Ts contract lives on the Item type
// in types.go.
package scopecache
import (
"encoding/json"
"errors"
"fmt"
"time"
)
func (b *scopeBuffer) appendItem(item Item) (Item, error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.detached {
return Item{}, &ScopeDetachedError{}
}
if b.itemCapExceeded(len(b.items) + 1) {
return Item{}, &ScopeFullError{Count: len(b.items), Cap: b.store.defaultMaxItems}
}
if item.ID != "" {
if _, exists := b.byID[item.ID]; exists {
return Item{}, errors.New("an item with this 'id' already exists in the scope")
}
}
return b.insertNewItemLocked(item, time.Now().UnixMicro())
}
// upsertByID replaces the payload of the item with this id if it exists,
// or appends a new item with this id if it does not. Both paths run under a
// single scope write-lock so concurrent upserts cannot race between the
// existence check and the mutation. Seq is preserved on replace (stable
// cursor for consumers) and freshly assigned on create (matches /append).
// Returns the final item and whether a new item was created.
func (b *scopeBuffer) upsertByID(item Item) (Item, bool, error) {
// Hoist the clock read out of the lock-held path: time.Now() is a
// vDSO syscall (~20-30 ns) that under contention shows up sizeably
// in lockSlow / futex profiles. Cost on the miss-but-no-insert
// path (e.g. ScopeFullError) is one stray ts read; bumpLastWriteTSLocked
// makes the racing-hoister case safe.
nowUs := time.Now().UnixMicro()
b.mu.Lock()
defer b.mu.Unlock()
if b.detached {
return Item{}, false, &ScopeDetachedError{}
}
if existing, exists := b.byID[item.ID]; exists {
// validateUpsertItem fills item.rawBytes for string
// payloads; recompute defensively for internal callers and
// tests that built an Item without going through the
// validator.
newRaw := item.rawBytes
if newRaw == nil {
newRaw = precomputeRawBytes(item.Payload)
}
delta, err := b.reservePayloadDeltaLocked(
payloadAndRawBytes(existing),
int64(len(item.Payload)+len(newRaw)),
)
if err != nil {
return Item{}, false, err
}
// /upsert is whole-item replacement: refresh ts to "now" so the
// stored ts always reflects when the current content arrived.
b.replaceItemFieldsLocked(existing, item.Payload, nowUs, newRaw, delta)
return *existing, false, nil
}
if b.itemCapExceeded(len(b.items) + 1) {
return Item{}, false, &ScopeFullError{Count: len(b.items), Cap: b.store.defaultMaxItems}
}
// Reuse the replace branch's nowUs so create-vs-replace is
// indistinguishable in Ts to observers.
inserted, err := b.insertNewItemLocked(item, nowUs)
if err != nil {
return Item{}, false, err
}
return inserted, true, nil
}
// updateByID mutates the item at (scope, id). Payload is always overwritten;
// ts is refreshed to time.Now().UnixMicro() — every write that touches an
// item refreshes ts to "when did the cache write this content."
//
// preRaw is the validator's precomputed rawBytes for the new payload.
// Pass nil from internal callers / tests that bypass the validator; the
// helper falls back to precomputeRawBytes(payload) in that case.
func (b *scopeBuffer) updateByID(id string, payload json.RawMessage, preRaw []byte) (int, error) {
// Clock-syscall hoisted out of b.mu — see upsertByID for the rationale.
nowUs := time.Now().UnixMicro()
b.mu.Lock()
defer b.mu.Unlock()
if b.detached {
return 0, &ScopeDetachedError{}
}
existing, ok := b.byID[id]
if !ok {
return 0, nil
}
// Scope/id are unchanged on /update, so the byte delta reduces to
// new vs old payload-bytes via payloadAndRawBytes.
newRaw := preRaw
if newRaw == nil {
newRaw = precomputeRawBytes(payload)
}
delta, err := b.reservePayloadDeltaLocked(
payloadAndRawBytes(existing),
int64(len(payload)+len(newRaw)),
)
if err != nil {
return 0, err
}
b.replaceItemFieldsLocked(existing, payload, nowUs, newRaw, delta)
return 1, nil
}
// preRaw mirrors updateByID: the validator's rawBytes for the new
// payload, or nil to recompute.
func (b *scopeBuffer) updateBySeq(seq uint64, payload json.RawMessage, preRaw []byte) (int, error) {
// Clock-syscall hoisted out of b.mu — see upsertByID for the rationale.
nowUs := time.Now().UnixMicro()
b.mu.Lock()
defer b.mu.Unlock()
if b.detached {
return 0, &ScopeDetachedError{}
}
i, ok := b.indexBySeqLocked(seq)
if !ok {
return 0, nil
}
existing := b.items[i]
newRaw := preRaw
if newRaw == nil {
newRaw = precomputeRawBytes(payload)
}
// Per-item cap re-check on the fully-materialised post-update
// item. The validator's checkItemSize ran on the request body,
// where ID is empty for seq-based updates — so its measurement
// undercounts by len(existing.ID). Without this re-check a
// long-id scope can bypass MaxItemBytes by addressing the item
// via seq. updateByID needs no re-check: its validator path sees
// the stored id (the request *is* the address), so request-side
// and stored-side approxItemSize agree by construction.
maxItemBytes := b.store.maxItemBytes
candidate := Item{
Scope: existing.Scope,
ID: existing.ID,
Payload: payload,
rawBytes: newRaw,
}
if size := approxItemSize(candidate); size > maxItemBytes {
return 0, fmt.Errorf("%w: the item's approximate size (%d bytes) exceeds the maximum of %d bytes",
ErrInvalidInput, size, maxItemBytes)
}
delta, err := b.reservePayloadDeltaLocked(
payloadAndRawBytes(existing),
int64(len(payload)+len(newRaw)),
)
if err != nil {
return 0, err
}
b.replaceItemFieldsLocked(existing, payload, nowUs, newRaw, delta)
return 1, nil
}
// insertNewItemLocked is the shared fresh-insert pipeline used by
// appendItem and upsertByID's miss-branch. Pipeline order is
// intentional and must stay coherent across both paths:
// ts-stamp → rawBytes precompute → size → store-byte reservation
// → seq assignment → b.items append → b.byID sync (when ID != "")
// → b.bytes update.
//
// PRECONDITIONS — caller responsibilities, not re-checked:
// - holds b.mu (write lock)
// - b.detached == false
// - len(b.items) < b.store.defaultMaxItems
// - approxItemSize(item) <= b.store.maxItemBytes
// - duplicate-ID ruled out (when item.ID != "")
// - client-supplied Seq/Ts already rejected at the validator
//
// nowUs is caller-supplied so /upsert keeps create- and replace-
// paths on identical Ts (observers cannot infer create-vs-replace
// from timestamp drift). /append computes its own at the call site.
//
// Returns *StoreFullError on cap reservation failure; scope state is
// untouched in that case (no Seq increment, no b.items mutation, no
// b.bytes increment), so the caller returns without rollback.
func (b *scopeBuffer) insertNewItemLocked(item Item, nowUs int64) (Item, error) {
item.Ts = nowUs
// Canonicalise: every stored item shares the buffer's scope string
// (the same Go string the shard map already retains) instead of the
// fresh per-request allocation the caller delivered. See scope field
// comment in buffer.go.
item.Scope = b.scope
// validator's checkItemSize normally fills rawBytes already;
// the recompute is a defensive fallback for internal callers /
// tests that built an Item without going through the validator.
if item.rawBytes == nil {
item.rawBytes = precomputeRawBytes(item.Payload)
}
size := approxItemSize(item)
ok, current, max := b.store.reserveBytes(size)
if !ok {
return Item{}, &StoreFullError{StoreBytes: current, AddedBytes: size, Cap: max}
}
b.lastSeq++
item.Seq = b.lastSeq
// One chunk-allocated *Item, shared by both indexes — items and
// byID hold the same pointer, so later in-place mutations need no
// re-sync. Seq lookups go through indexBySeqLocked (binary search
// on items, which is monotonic by seq). See the chunk allocator
// in buffer.go for the storage + lifecycle contract.
stored := b.allocItemLocked(item)
b.items = append(b.items, stored)
if item.ID != "" {
if b.byID == nil {
b.byID = make(map[string]*Item)
}
b.byID[item.ID] = stored
}
b.bytes += size
b.store.totalItems.Add(1)
b.store.bumpLastWriteTS(nowUs)
b.bumpLastWriteTSLocked(nowUs)
return item, nil
}