all repos

rss-tools @ a5ac527

get rss feed from sources that(i need and) dont provide one

rss-tools/vendor/go.etcd.io/bbolt/internal/freelist/shared.go (view raw)

Oleksandr Smirnov Oleksandr Smirnov
olexsmir@gmail.com
we're vendoring now, 7 days ago
1
package freelist
2
3
import (
4
	"fmt"
5
	"math"
6
	"sort"
7
	"unsafe"
8
9
	"go.etcd.io/bbolt/internal/common"
10
)
11
12
type txPending struct {
13
	ids              []common.Pgid
14
	alloctx          []common.Txid // txids allocating the ids
15
	lastReleaseBegin common.Txid   // beginning txid of last matching releaseRange
16
}
17
18
type shared struct {
19
	Interface
20
21
	readonlyTXIDs []common.Txid               // all readonly transaction IDs.
22
	allocs        map[common.Pgid]common.Txid // mapping of Txid that allocated a pgid.
23
	cache         map[common.Pgid]struct{}    // fast lookup of all free and pending page ids.
24
	pending       map[common.Txid]*txPending  // mapping of soon-to-be free page ids by tx.
25
}
26
27
func newShared() *shared {
28
	return &shared{
29
		pending: make(map[common.Txid]*txPending),
30
		allocs:  make(map[common.Pgid]common.Txid),
31
		cache:   make(map[common.Pgid]struct{}),
32
	}
33
}
34
35
func (t *shared) pendingPageIds() map[common.Txid]*txPending {
36
	return t.pending
37
}
38
39
func (t *shared) PendingCount() int {
40
	var count int
41
	for _, txp := range t.pending {
42
		count += len(txp.ids)
43
	}
44
	return count
45
}
46
47
func (t *shared) Count() int {
48
	return t.FreeCount() + t.PendingCount()
49
}
50
51
func (t *shared) Freed(pgId common.Pgid) bool {
52
	_, ok := t.cache[pgId]
53
	return ok
54
}
55
56
func (t *shared) Free(txid common.Txid, p *common.Page) {
57
	if p.Id() <= 1 {
58
		panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.Id()))
59
	}
60
61
	// Free page and all its overflow pages.
62
	txp := t.pending[txid]
63
	if txp == nil {
64
		txp = &txPending{}
65
		t.pending[txid] = txp
66
	}
67
	allocTxid, ok := t.allocs[p.Id()]
68
	common.Verify(func() {
69
		if allocTxid == txid {
70
			panic(fmt.Sprintf("free: freed page (%d) was allocated by the same transaction (%d)", p.Id(), txid))
71
		}
72
	})
73
	if ok {
74
		delete(t.allocs, p.Id())
75
	}
76
77
	for id := p.Id(); id <= p.Id()+common.Pgid(p.Overflow()); id++ {
78
		// Verify that page is not already free.
79
		if _, ok := t.cache[id]; ok {
80
			panic(fmt.Sprintf("page %d already freed", id))
81
		}
82
		// Add to the freelist and cache.
83
		txp.ids = append(txp.ids, id)
84
		txp.alloctx = append(txp.alloctx, allocTxid)
85
		t.cache[id] = struct{}{}
86
	}
87
}
88
89
func (t *shared) Rollback(txid common.Txid) {
90
	// Remove page ids from cache.
91
	txp := t.pending[txid]
92
	if txp == nil {
93
		return
94
	}
95
	for i, pgid := range txp.ids {
96
		delete(t.cache, pgid)
97
		tx := txp.alloctx[i]
98
		if tx == 0 {
99
			continue
100
		}
101
		if tx != txid {
102
			// Pending free aborted; restore page back to alloc list.
103
			t.allocs[pgid] = tx
104
		} else {
105
			// A writing TXN should never free a page which was allocated by itself.
106
			panic(fmt.Sprintf("rollback: freed page (%d) was allocated by the same transaction (%d)", pgid, txid))
107
		}
108
	}
109
	// Remove pages from pending list and mark as free if allocated by txid.
110
	delete(t.pending, txid)
111
112
	// Remove pgids which are allocated by this txid
113
	for pgid, tid := range t.allocs {
114
		if tid == txid {
115
			delete(t.allocs, pgid)
116
		}
117
	}
118
}
119
120
func (t *shared) AddReadonlyTXID(tid common.Txid) {
121
	t.readonlyTXIDs = append(t.readonlyTXIDs, tid)
122
}
123
124
func (t *shared) RemoveReadonlyTXID(tid common.Txid) {
125
	for i := range t.readonlyTXIDs {
126
		if t.readonlyTXIDs[i] == tid {
127
			last := len(t.readonlyTXIDs) - 1
128
			t.readonlyTXIDs[i] = t.readonlyTXIDs[last]
129
			t.readonlyTXIDs = t.readonlyTXIDs[:last]
130
			break
131
		}
132
	}
133
}
134
135
type txIDx []common.Txid
136
137
func (t txIDx) Len() int           { return len(t) }
138
func (t txIDx) Swap(i, j int)      { t[i], t[j] = t[j], t[i] }
139
func (t txIDx) Less(i, j int) bool { return t[i] < t[j] }
140
141
func (t *shared) ReleasePendingPages() {
142
	// Free all pending pages prior to the earliest open transaction.
143
	sort.Sort(txIDx(t.readonlyTXIDs))
144
	minid := common.Txid(math.MaxUint64)
145
	if len(t.readonlyTXIDs) > 0 {
146
		minid = t.readonlyTXIDs[0]
147
	}
148
	if minid > 0 {
149
		t.release(minid - 1)
150
	}
151
	// Release unused txid extents.
152
	for _, tid := range t.readonlyTXIDs {
153
		t.releaseRange(minid, tid-1)
154
		minid = tid + 1
155
	}
156
	t.releaseRange(minid, common.Txid(math.MaxUint64))
157
	// Any page both allocated and freed in an extent is safe to release.
158
}
159
160
func (t *shared) release(txid common.Txid) {
161
	m := make(common.Pgids, 0)
162
	for tid, txp := range t.pending {
163
		if tid <= txid {
164
			// Move transaction's pending pages to the available freelist.
165
			// Don't remove from the cache since the page is still free.
166
			m = append(m, txp.ids...)
167
			delete(t.pending, tid)
168
		}
169
	}
170
	t.mergeSpans(m)
171
}
172
173
func (t *shared) releaseRange(begin, end common.Txid) {
174
	if begin > end {
175
		return
176
	}
177
	m := common.Pgids{}
178
	for tid, txp := range t.pending {
179
		if tid < begin || tid > end {
180
			continue
181
		}
182
		// Don't recompute freed pages if ranges haven't updated.
183
		if txp.lastReleaseBegin == begin {
184
			continue
185
		}
186
		for i := 0; i < len(txp.ids); i++ {
187
			if atx := txp.alloctx[i]; atx < begin || atx > end {
188
				continue
189
			}
190
			m = append(m, txp.ids[i])
191
			txp.ids[i] = txp.ids[len(txp.ids)-1]
192
			txp.ids = txp.ids[:len(txp.ids)-1]
193
			txp.alloctx[i] = txp.alloctx[len(txp.alloctx)-1]
194
			txp.alloctx = txp.alloctx[:len(txp.alloctx)-1]
195
			i--
196
		}
197
		txp.lastReleaseBegin = begin
198
		if len(txp.ids) == 0 {
199
			delete(t.pending, tid)
200
		}
201
	}
202
	t.mergeSpans(m)
203
}
204
205
// Copyall copies a list of all free ids and all pending ids in one sorted list.
206
// f.count returns the minimum length required for dst.
207
func (t *shared) Copyall(dst []common.Pgid) {
208
	m := make(common.Pgids, 0, t.PendingCount())
209
	for _, txp := range t.pendingPageIds() {
210
		m = append(m, txp.ids...)
211
	}
212
	sort.Sort(m)
213
	common.Mergepgids(dst, t.freePageIds(), m)
214
}
215
216
func (t *shared) Reload(p *common.Page) {
217
	t.Read(p)
218
	t.NoSyncReload(t.freePageIds())
219
}
220
221
func (t *shared) NoSyncReload(pgIds common.Pgids) {
222
	// Build a cache of only pending pages.
223
	pcache := make(map[common.Pgid]bool)
224
	for _, txp := range t.pending {
225
		for _, pendingID := range txp.ids {
226
			pcache[pendingID] = true
227
		}
228
	}
229
230
	// Check each page in the freelist and build a new available freelist
231
	// with any pages not in the pending lists.
232
	a := []common.Pgid{}
233
	for _, id := range pgIds {
234
		if !pcache[id] {
235
			a = append(a, id)
236
		}
237
	}
238
239
	t.Init(a)
240
}
241
242
// reindex rebuilds the free cache based on available and pending free lists.
243
func (t *shared) reindex() {
244
	free := t.freePageIds()
245
	pending := t.pendingPageIds()
246
	t.cache = make(map[common.Pgid]struct{}, len(free))
247
	for _, id := range free {
248
		t.cache[id] = struct{}{}
249
	}
250
	for _, txp := range pending {
251
		for _, pendingID := range txp.ids {
252
			t.cache[pendingID] = struct{}{}
253
		}
254
	}
255
}
256
257
func (t *shared) Read(p *common.Page) {
258
	if !p.IsFreelistPage() {
259
		panic(fmt.Sprintf("invalid freelist page: %d, page type is %s", p.Id(), p.Typ()))
260
	}
261
262
	ids := p.FreelistPageIds()
263
264
	// Copy the list of page ids from the freelist.
265
	if len(ids) == 0 {
266
		t.Init([]common.Pgid{})
267
	} else {
268
		// copy the ids, so we don't modify on the freelist page directly
269
		idsCopy := make([]common.Pgid, len(ids))
270
		copy(idsCopy, ids)
271
		// Make sure they're sorted.
272
		sort.Sort(common.Pgids(idsCopy))
273
274
		t.Init(idsCopy)
275
	}
276
}
277
278
func (t *shared) EstimatedWritePageSize() int {
279
	n := t.Count()
280
	if n >= 0xFFFF {
281
		// The first element will be used to store the count. See freelist.write.
282
		n++
283
	}
284
	return int(common.PageHeaderSize) + (int(unsafe.Sizeof(common.Pgid(0))) * n)
285
}
286
287
func (t *shared) Write(p *common.Page) {
288
	// Combine the old free pgids and pgids waiting on an open transaction.
289
290
	// Update the header flag.
291
	p.SetFlags(common.FreelistPageFlag)
292
293
	// The page.count can only hold up to 64k elements so if we overflow that
294
	// number then we handle it by putting the size in the first element.
295
	l := t.Count()
296
	if l == 0 {
297
		p.SetCount(uint16(l))
298
	} else if l < 0xFFFF {
299
		p.SetCount(uint16(l))
300
		data := common.UnsafeAdd(unsafe.Pointer(p), unsafe.Sizeof(*p))
301
		ids := unsafe.Slice((*common.Pgid)(data), l)
302
		t.Copyall(ids)
303
	} else {
304
		p.SetCount(0xFFFF)
305
		data := common.UnsafeAdd(unsafe.Pointer(p), unsafe.Sizeof(*p))
306
		ids := unsafe.Slice((*common.Pgid)(data), l+1)
307
		ids[0] = common.Pgid(l)
308
		t.Copyall(ids[1:])
309
	}
310
}