all repos

rss-tools @ master

get rss feed from sources that(i need and) dont provide one

rss-tools/vendor/go.etcd.io/bbolt/bucket.go (view raw)

Oleksandr Smirnov Oleksandr Smirnov
olexsmir@gmail.com
we're vendoring now, 7 days ago
1
package bbolt
2
3
import (
4
	"bytes"
5
	"fmt"
6
	"unsafe"
7
8
	"go.etcd.io/bbolt/errors"
9
	"go.etcd.io/bbolt/internal/common"
10
)
11
12
const (
13
	// MaxKeySize is the maximum length of a key, in bytes.
14
	MaxKeySize = 32768
15
16
	// MaxValueSize is the maximum length of a value, in bytes.
17
	MaxValueSize = (1 << 31) - 2
18
)
19
20
const (
21
	minFillPercent = 0.1
22
	maxFillPercent = 1.0
23
)
24
25
// DefaultFillPercent is the percentage that split pages are filled.
26
// This value can be changed by setting Bucket.FillPercent.
27
const DefaultFillPercent = 0.5
28
29
// Bucket represents a collection of key/value pairs inside the database.
30
type Bucket struct {
31
	*common.InBucket
32
	tx       *Tx                   // the associated transaction
33
	buckets  map[string]*Bucket    // subbucket cache
34
	page     *common.Page          // inline page reference
35
	rootNode *node                 // materialized node for the root page.
36
	nodes    map[common.Pgid]*node // node cache
37
38
	// Sets the threshold for filling nodes when they split. By default,
39
	// the bucket will fill to 50% but it can be useful to increase this
40
	// amount if you know that your write workloads are mostly append-only.
41
	//
42
	// This is non-persisted across transactions so it must be set in every Tx.
43
	FillPercent float64
44
}
45
46
// newBucket returns a new bucket associated with a transaction.
47
func newBucket(tx *Tx) Bucket {
48
	var b = Bucket{tx: tx, FillPercent: DefaultFillPercent}
49
	if tx.writable {
50
		b.buckets = make(map[string]*Bucket)
51
		b.nodes = make(map[common.Pgid]*node)
52
	}
53
	return b
54
}
55
56
// Tx returns the tx of the bucket.
57
func (b *Bucket) Tx() *Tx {
58
	return b.tx
59
}
60
61
// Root returns the root of the bucket.
62
func (b *Bucket) Root() common.Pgid {
63
	return b.RootPage()
64
}
65
66
// Writable returns whether the bucket is writable.
67
func (b *Bucket) Writable() bool {
68
	return b.tx.writable
69
}
70
71
// Cursor creates a cursor associated with the bucket.
72
// The cursor is only valid as long as the transaction is open.
73
// Do not use a cursor after the transaction is closed.
74
func (b *Bucket) Cursor() *Cursor {
75
	// Update transaction statistics.
76
	b.tx.stats.IncCursorCount(1)
77
78
	// Allocate and return a cursor.
79
	return &Cursor{
80
		bucket: b,
81
		stack:  make([]elemRef, 0),
82
	}
83
}
84
85
// Bucket retrieves a nested bucket by name.
86
// Returns nil if the bucket does not exist.
87
// The bucket instance is only valid for the lifetime of the transaction.
88
func (b *Bucket) Bucket(name []byte) *Bucket {
89
	if b.buckets != nil {
90
		if child := b.buckets[string(name)]; child != nil {
91
			return child
92
		}
93
	}
94
95
	// Move cursor to key.
96
	c := b.Cursor()
97
	k, v, flags := c.seek(name)
98
99
	// Return nil if the key doesn't exist or it is not a bucket.
100
	if !bytes.Equal(name, k) || (flags&common.BucketLeafFlag) == 0 {
101
		return nil
102
	}
103
104
	// Otherwise create a bucket and cache it.
105
	var child = b.openBucket(v)
106
	if b.buckets != nil {
107
		b.buckets[string(name)] = child
108
	}
109
110
	return child
111
}
112
113
// Helper method that re-interprets a sub-bucket value
114
// from a parent into a Bucket
115
func (b *Bucket) openBucket(value []byte) *Bucket {
116
	var child = newBucket(b.tx)
117
118
	// Unaligned access requires a copy to be made.
119
	const unalignedMask = unsafe.Alignof(struct {
120
		common.InBucket
121
		common.Page
122
	}{}) - 1
123
	unaligned := uintptr(unsafe.Pointer(&value[0]))&unalignedMask != 0
124
	if unaligned {
125
		value = cloneBytes(value)
126
	}
127
128
	// If this is a writable transaction then we need to copy the bucket entry.
129
	// Read-only transactions can point directly at the mmap entry.
130
	if b.tx.writable && !unaligned {
131
		child.InBucket = &common.InBucket{}
132
		*child.InBucket = *(*common.InBucket)(unsafe.Pointer(&value[0]))
133
	} else {
134
		child.InBucket = (*common.InBucket)(unsafe.Pointer(&value[0]))
135
	}
136
137
	// Save a reference to the inline page if the bucket is inline.
138
	if child.RootPage() == 0 {
139
		child.page = (*common.Page)(unsafe.Pointer(&value[common.BucketHeaderSize]))
140
	}
141
142
	return &child
143
}
144
145
// CreateBucket creates a new bucket at the given key and returns the new bucket.
146
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
147
// The bucket instance is only valid for the lifetime of the transaction.
148
func (b *Bucket) CreateBucket(key []byte) (rb *Bucket, err error) {
149
	if lg := b.tx.db.Logger(); lg != discardLogger {
150
		lg.Debugf("Creating bucket %q", key)
151
		defer func() {
152
			if err != nil {
153
				lg.Errorf("Creating bucket %q failed: %v", key, err)
154
			} else {
155
				lg.Debugf("Creating bucket %q successfully", key)
156
			}
157
		}()
158
	}
159
	if b.tx.db == nil {
160
		return nil, errors.ErrTxClosed
161
	} else if !b.tx.writable {
162
		return nil, errors.ErrTxNotWritable
163
	} else if len(key) == 0 {
164
		return nil, errors.ErrBucketNameRequired
165
	}
166
167
	// Insert into node.
168
	// Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
169
	// it from being marked as leaking, and accordingly cannot be allocated on stack.
170
	newKey := cloneBytes(key)
171
172
	// Move cursor to correct position.
173
	c := b.Cursor()
174
	k, _, flags := c.seek(newKey)
175
176
	// Return an error if there is an existing key.
177
	if bytes.Equal(newKey, k) {
178
		if (flags & common.BucketLeafFlag) != 0 {
179
			return nil, errors.ErrBucketExists
180
		}
181
		return nil, errors.ErrIncompatibleValue
182
	}
183
184
	// Create empty, inline bucket.
185
	var bucket = Bucket{
186
		InBucket:    &common.InBucket{},
187
		rootNode:    &node{isLeaf: true},
188
		FillPercent: DefaultFillPercent,
189
	}
190
	var value = bucket.write()
191
192
	c.node().put(newKey, newKey, value, 0, common.BucketLeafFlag)
193
194
	// Since subbuckets are not allowed on inline buckets, we need to
195
	// dereference the inline page, if it exists. This will cause the bucket
196
	// to be treated as a regular, non-inline bucket for the rest of the tx.
197
	b.page = nil
198
199
	return b.Bucket(newKey), nil
200
}
201
202
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
203
// Returns an error if the bucket name is blank, or if the bucket name is too long.
204
// The bucket instance is only valid for the lifetime of the transaction.
205
func (b *Bucket) CreateBucketIfNotExists(key []byte) (rb *Bucket, err error) {
206
	if lg := b.tx.db.Logger(); lg != discardLogger {
207
		lg.Debugf("Creating bucket if not exist %q", key)
208
		defer func() {
209
			if err != nil {
210
				lg.Errorf("Creating bucket if not exist %q failed: %v", key, err)
211
			} else {
212
				lg.Debugf("Creating bucket if not exist %q successfully", key)
213
			}
214
		}()
215
	}
216
217
	if b.tx.db == nil {
218
		return nil, errors.ErrTxClosed
219
	} else if !b.tx.writable {
220
		return nil, errors.ErrTxNotWritable
221
	} else if len(key) == 0 {
222
		return nil, errors.ErrBucketNameRequired
223
	}
224
225
	// Insert into node.
226
	// Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
227
	// it from being marked as leaking, and accordingly cannot be allocated on stack.
228
	newKey := cloneBytes(key)
229
230
	if b.buckets != nil {
231
		if child := b.buckets[string(newKey)]; child != nil {
232
			return child, nil
233
		}
234
	}
235
236
	// Move cursor to correct position.
237
	c := b.Cursor()
238
	k, v, flags := c.seek(newKey)
239
240
	// Return an error if there is an existing non-bucket key.
241
	if bytes.Equal(newKey, k) {
242
		if (flags & common.BucketLeafFlag) != 0 {
243
			var child = b.openBucket(v)
244
			if b.buckets != nil {
245
				b.buckets[string(newKey)] = child
246
			}
247
248
			return child, nil
249
		}
250
		return nil, errors.ErrIncompatibleValue
251
	}
252
253
	// Create empty, inline bucket.
254
	var bucket = Bucket{
255
		InBucket:    &common.InBucket{},
256
		rootNode:    &node{isLeaf: true},
257
		FillPercent: DefaultFillPercent,
258
	}
259
	var value = bucket.write()
260
261
	c.node().put(newKey, newKey, value, 0, common.BucketLeafFlag)
262
263
	// Since subbuckets are not allowed on inline buckets, we need to
264
	// dereference the inline page, if it exists. This will cause the bucket
265
	// to be treated as a regular, non-inline bucket for the rest of the tx.
266
	b.page = nil
267
268
	return b.Bucket(newKey), nil
269
}
270
271
// DeleteBucket deletes a bucket at the given key.
272
// Returns an error if the bucket does not exist, or if the key represents a non-bucket value.
273
func (b *Bucket) DeleteBucket(key []byte) (err error) {
274
	if lg := b.tx.db.Logger(); lg != discardLogger {
275
		lg.Debugf("Deleting bucket %q", key)
276
		defer func() {
277
			if err != nil {
278
				lg.Errorf("Deleting bucket %q failed: %v", key, err)
279
			} else {
280
				lg.Debugf("Deleting bucket %q successfully", key)
281
			}
282
		}()
283
	}
284
285
	if b.tx.db == nil {
286
		return errors.ErrTxClosed
287
	} else if !b.Writable() {
288
		return errors.ErrTxNotWritable
289
	}
290
291
	newKey := cloneBytes(key)
292
293
	// Move cursor to correct position.
294
	c := b.Cursor()
295
	k, _, flags := c.seek(newKey)
296
297
	// Return an error if bucket doesn't exist or is not a bucket.
298
	if !bytes.Equal(newKey, k) {
299
		return errors.ErrBucketNotFound
300
	} else if (flags & common.BucketLeafFlag) == 0 {
301
		return errors.ErrIncompatibleValue
302
	}
303
304
	// Recursively delete all child buckets.
305
	child := b.Bucket(newKey)
306
	err = child.ForEachBucket(func(k []byte) error {
307
		if err := child.DeleteBucket(k); err != nil {
308
			return fmt.Errorf("delete bucket: %s", err)
309
		}
310
		return nil
311
	})
312
	if err != nil {
313
		return err
314
	}
315
316
	// Remove cached copy.
317
	delete(b.buckets, string(newKey))
318
319
	// Release all bucket pages to freelist.
320
	child.nodes = nil
321
	child.rootNode = nil
322
	child.free()
323
324
	// Delete the node if we have a matching key.
325
	c.node().del(newKey)
326
327
	return nil
328
}
329
330
// MoveBucket moves a sub-bucket from the source bucket to the destination bucket.
331
// Returns an error if
332
//  1. the sub-bucket cannot be found in the source bucket;
333
//  2. or the key already exists in the destination bucket;
334
//  3. or the key represents a non-bucket value;
335
//  4. the source and destination buckets are the same.
336
func (b *Bucket) MoveBucket(key []byte, dstBucket *Bucket) (err error) {
337
	lg := b.tx.db.Logger()
338
	if lg != discardLogger {
339
		lg.Debugf("Moving bucket %q", key)
340
		defer func() {
341
			if err != nil {
342
				lg.Errorf("Moving bucket %q failed: %v", key, err)
343
			} else {
344
				lg.Debugf("Moving bucket %q successfully", key)
345
			}
346
		}()
347
	}
348
349
	if b.tx.db == nil || dstBucket.tx.db == nil {
350
		return errors.ErrTxClosed
351
	} else if !b.Writable() || !dstBucket.Writable() {
352
		return errors.ErrTxNotWritable
353
	}
354
355
	if b.tx.db.Path() != dstBucket.tx.db.Path() || b.tx != dstBucket.tx {
356
		lg.Errorf("The source and target buckets are not in the same db file, source bucket in %s and target bucket in %s", b.tx.db.Path(), dstBucket.tx.db.Path())
357
		return errors.ErrDifferentDB
358
	}
359
360
	newKey := cloneBytes(key)
361
362
	// Move cursor to correct position.
363
	c := b.Cursor()
364
	k, v, flags := c.seek(newKey)
365
366
	// Return an error if bucket doesn't exist or is not a bucket.
367
	if !bytes.Equal(newKey, k) {
368
		return errors.ErrBucketNotFound
369
	} else if (flags & common.BucketLeafFlag) == 0 {
370
		lg.Errorf("An incompatible key %s exists in the source bucket", newKey)
371
		return errors.ErrIncompatibleValue
372
	}
373
374
	// Do nothing (return true directly) if the source bucket and the
375
	// destination bucket are actually the same bucket.
376
	if b == dstBucket || (b.RootPage() == dstBucket.RootPage() && b.RootPage() != 0) {
377
		lg.Errorf("The source bucket (%s) and the target bucket (%s) are the same bucket", b, dstBucket)
378
		return errors.ErrSameBuckets
379
	}
380
381
	// check whether the key already exists in the destination bucket
382
	curDst := dstBucket.Cursor()
383
	k, _, flags = curDst.seek(newKey)
384
385
	// Return an error if there is an existing key in the destination bucket.
386
	if bytes.Equal(newKey, k) {
387
		if (flags & common.BucketLeafFlag) != 0 {
388
			return errors.ErrBucketExists
389
		}
390
		lg.Errorf("An incompatible key %s exists in the target bucket", newKey)
391
		return errors.ErrIncompatibleValue
392
	}
393
394
	// remove the sub-bucket from the source bucket
395
	delete(b.buckets, string(newKey))
396
	c.node().del(newKey)
397
398
	// add te sub-bucket to the destination bucket
399
	newValue := cloneBytes(v)
400
	curDst.node().put(newKey, newKey, newValue, 0, common.BucketLeafFlag)
401
402
	return nil
403
}
404
405
// Inspect returns the structure of the bucket.
406
func (b *Bucket) Inspect() BucketStructure {
407
	return b.recursivelyInspect([]byte("root"))
408
}
409
410
func (b *Bucket) recursivelyInspect(name []byte) BucketStructure {
411
	bs := BucketStructure{Name: string(name)}
412
413
	keyN := 0
414
	c := b.Cursor()
415
	for k, _, flags := c.first(); k != nil; k, _, flags = c.next() {
416
		if flags&common.BucketLeafFlag != 0 {
417
			childBucket := b.Bucket(k)
418
			childBS := childBucket.recursivelyInspect(k)
419
			bs.Children = append(bs.Children, childBS)
420
		} else {
421
			keyN++
422
		}
423
	}
424
	bs.KeyN = keyN
425
426
	return bs
427
}
428
429
// Get retrieves the value for a key in the bucket.
430
// Returns a nil value if the key does not exist or if the key is a nested bucket.
431
// The returned value is only valid for the life of the transaction.
432
// The returned memory is owned by bbolt and must never be modified; writing to this memory might corrupt the database.
433
func (b *Bucket) Get(key []byte) []byte {
434
	k, v, flags := b.Cursor().seek(key)
435
436
	// Return nil if this is a bucket.
437
	if (flags & common.BucketLeafFlag) != 0 {
438
		return nil
439
	}
440
441
	// If our target node isn't the same key as what's passed in then return nil.
442
	if !bytes.Equal(key, k) {
443
		return nil
444
	}
445
	return v
446
}
447
448
// Put sets the value for a key in the bucket.
449
// If the key exist then its previous value will be overwritten.
450
// Supplied value must remain valid for the life of the transaction.
451
// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
452
func (b *Bucket) Put(key []byte, value []byte) (err error) {
453
	if lg := b.tx.db.Logger(); lg != discardLogger {
454
		lg.Debugf("Putting key %q", key)
455
		defer func() {
456
			if err != nil {
457
				lg.Errorf("Putting key %q failed: %v", key, err)
458
			} else {
459
				lg.Debugf("Putting key %q successfully", key)
460
			}
461
		}()
462
	}
463
	if b.tx.db == nil {
464
		return errors.ErrTxClosed
465
	} else if !b.Writable() {
466
		return errors.ErrTxNotWritable
467
	} else if len(key) == 0 {
468
		return errors.ErrKeyRequired
469
	} else if len(key) > MaxKeySize {
470
		return errors.ErrKeyTooLarge
471
	} else if int64(len(value)) > MaxValueSize {
472
		return errors.ErrValueTooLarge
473
	}
474
475
	// Insert into node.
476
	// Tip: Use a new variable `newKey` instead of reusing the existing `key` to prevent
477
	// it from being marked as leaking, and accordingly cannot be allocated on stack.
478
	newKey := cloneBytes(key)
479
480
	// Move cursor to correct position.
481
	c := b.Cursor()
482
	k, _, flags := c.seek(newKey)
483
484
	// Return an error if there is an existing key with a bucket value.
485
	if bytes.Equal(newKey, k) && (flags&common.BucketLeafFlag) != 0 {
486
		return errors.ErrIncompatibleValue
487
	}
488
489
	// gofail: var beforeBucketPut struct{}
490
491
	c.node().put(newKey, newKey, value, 0, 0)
492
493
	return nil
494
}
495
496
// Delete removes a key from the bucket.
497
// If the key does not exist then nothing is done and a nil error is returned.
498
// Returns an error if the bucket was created from a read-only transaction.
499
func (b *Bucket) Delete(key []byte) (err error) {
500
	if lg := b.tx.db.Logger(); lg != discardLogger {
501
		lg.Debugf("Deleting key %q", key)
502
		defer func() {
503
			if err != nil {
504
				lg.Errorf("Deleting key %q failed: %v", key, err)
505
			} else {
506
				lg.Debugf("Deleting key %q successfully", key)
507
			}
508
		}()
509
	}
510
511
	if b.tx.db == nil {
512
		return errors.ErrTxClosed
513
	} else if !b.Writable() {
514
		return errors.ErrTxNotWritable
515
	}
516
517
	// Move cursor to correct position.
518
	c := b.Cursor()
519
	k, _, flags := c.seek(key)
520
521
	// Return nil if the key doesn't exist.
522
	if !bytes.Equal(key, k) {
523
		return nil
524
	}
525
526
	// Return an error if there is already existing bucket value.
527
	if (flags & common.BucketLeafFlag) != 0 {
528
		return errors.ErrIncompatibleValue
529
	}
530
531
	// Delete the node if we have a matching key.
532
	c.node().del(key)
533
534
	return nil
535
}
536
537
// Sequence returns the current integer for the bucket without incrementing it.
538
func (b *Bucket) Sequence() uint64 {
539
	return b.InSequence()
540
}
541
542
// SetSequence updates the sequence number for the bucket.
543
func (b *Bucket) SetSequence(v uint64) error {
544
	if b.tx.db == nil {
545
		return errors.ErrTxClosed
546
	} else if !b.Writable() {
547
		return errors.ErrTxNotWritable
548
	}
549
550
	// Materialize the root node if it hasn't been already so that the
551
	// bucket will be saved during commit.
552
	if b.rootNode == nil {
553
		_ = b.node(b.RootPage(), nil)
554
	}
555
556
	// Set the sequence.
557
	b.SetInSequence(v)
558
	return nil
559
}
560
561
// NextSequence returns an autoincrementing integer for the bucket.
562
func (b *Bucket) NextSequence() (uint64, error) {
563
	if b.tx.db == nil {
564
		return 0, errors.ErrTxClosed
565
	} else if !b.Writable() {
566
		return 0, errors.ErrTxNotWritable
567
	}
568
569
	// Materialize the root node if it hasn't been already so that the
570
	// bucket will be saved during commit.
571
	if b.rootNode == nil {
572
		_ = b.node(b.RootPage(), nil)
573
	}
574
575
	// Increment and return the sequence.
576
	b.IncSequence()
577
	return b.Sequence(), nil
578
}
579
580
// ForEach executes a function for each key/value pair in a bucket.
581
// Because ForEach uses a Cursor, the iteration over keys is in lexicographical order.
582
// If the provided function returns an error then the iteration is stopped and
583
// the error is returned to the caller. The provided function must not modify
584
// the bucket; this will result in undefined behavior.
585
func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
586
	if b.tx.db == nil {
587
		return errors.ErrTxClosed
588
	}
589
	c := b.Cursor()
590
	for k, v := c.First(); k != nil; k, v = c.Next() {
591
		if err := fn(k, v); err != nil {
592
			return err
593
		}
594
	}
595
	return nil
596
}
597
598
func (b *Bucket) ForEachBucket(fn func(k []byte) error) error {
599
	if b.tx.db == nil {
600
		return errors.ErrTxClosed
601
	}
602
	c := b.Cursor()
603
	for k, _, flags := c.first(); k != nil; k, _, flags = c.next() {
604
		if flags&common.BucketLeafFlag != 0 {
605
			if err := fn(k); err != nil {
606
				return err
607
			}
608
		}
609
	}
610
	return nil
611
}
612
613
// Stats returns stats on a bucket.
614
func (b *Bucket) Stats() BucketStats {
615
	var s, subStats BucketStats
616
	pageSize := b.tx.db.pageSize
617
	s.BucketN += 1
618
	if b.RootPage() == 0 {
619
		s.InlineBucketN += 1
620
	}
621
	b.forEachPage(func(p *common.Page, depth int, pgstack []common.Pgid) {
622
		if p.IsLeafPage() {
623
			s.KeyN += int(p.Count())
624
625
			// used totals the used bytes for the page
626
			used := common.PageHeaderSize
627
628
			if p.Count() != 0 {
629
				// If page has any elements, add all element headers.
630
				used += common.LeafPageElementSize * uintptr(p.Count()-1)
631
632
				// Add all element key, value sizes.
633
				// The computation takes advantage of the fact that the position
634
				// of the last element's key/value equals to the total of the sizes
635
				// of all previous elements' keys and values.
636
				// It also includes the last element's header.
637
				lastElement := p.LeafPageElement(p.Count() - 1)
638
				used += uintptr(lastElement.Pos() + lastElement.Ksize() + lastElement.Vsize())
639
			}
640
641
			if b.RootPage() == 0 {
642
				// For inlined bucket just update the inline stats
643
				s.InlineBucketInuse += int(used)
644
			} else {
645
				// For non-inlined bucket update all the leaf stats
646
				s.LeafPageN++
647
				s.LeafInuse += int(used)
648
				s.LeafOverflowN += int(p.Overflow())
649
650
				// Collect stats from sub-buckets.
651
				// Do that by iterating over all element headers
652
				// looking for the ones with the bucketLeafFlag.
653
				for i := uint16(0); i < p.Count(); i++ {
654
					e := p.LeafPageElement(i)
655
					if (e.Flags() & common.BucketLeafFlag) != 0 {
656
						// For any bucket element, open the element value
657
						// and recursively call Stats on the contained bucket.
658
						subStats.Add(b.openBucket(e.Value()).Stats())
659
					}
660
				}
661
			}
662
		} else if p.IsBranchPage() {
663
			s.BranchPageN++
664
			lastElement := p.BranchPageElement(p.Count() - 1)
665
666
			// used totals the used bytes for the page
667
			// Add header and all element headers.
668
			used := common.PageHeaderSize + (common.BranchPageElementSize * uintptr(p.Count()-1))
669
670
			// Add size of all keys and values.
671
			// Again, use the fact that last element's position equals to
672
			// the total of key, value sizes of all previous elements.
673
			used += uintptr(lastElement.Pos() + lastElement.Ksize())
674
			s.BranchInuse += int(used)
675
			s.BranchOverflowN += int(p.Overflow())
676
		}
677
678
		// Keep track of maximum page depth.
679
		if depth+1 > s.Depth {
680
			s.Depth = depth + 1
681
		}
682
	})
683
684
	// Alloc stats can be computed from page counts and pageSize.
685
	s.BranchAlloc = (s.BranchPageN + s.BranchOverflowN) * pageSize
686
	s.LeafAlloc = (s.LeafPageN + s.LeafOverflowN) * pageSize
687
688
	// Add the max depth of sub-buckets to get total nested depth.
689
	s.Depth += subStats.Depth
690
	// Add the stats for all sub-buckets
691
	s.Add(subStats)
692
	return s
693
}
694
695
// forEachPage iterates over every page in a bucket, including inline pages.
696
func (b *Bucket) forEachPage(fn func(*common.Page, int, []common.Pgid)) {
697
	// If we have an inline page then just use that.
698
	if b.page != nil {
699
		fn(b.page, 0, []common.Pgid{b.RootPage()})
700
		return
701
	}
702
703
	// Otherwise traverse the page hierarchy.
704
	b.tx.forEachPage(b.RootPage(), fn)
705
}
706
707
// forEachPageNode iterates over every page (or node) in a bucket.
708
// This also includes inline pages.
709
func (b *Bucket) forEachPageNode(fn func(*common.Page, *node, int)) {
710
	// If we have an inline page or root node then just use that.
711
	if b.page != nil {
712
		fn(b.page, nil, 0)
713
		return
714
	}
715
	b._forEachPageNode(b.RootPage(), 0, fn)
716
}
717
718
func (b *Bucket) _forEachPageNode(pgId common.Pgid, depth int, fn func(*common.Page, *node, int)) {
719
	var p, n = b.pageNode(pgId)
720
721
	// Execute function.
722
	fn(p, n, depth)
723
724
	// Recursively loop over children.
725
	if p != nil {
726
		if p.IsBranchPage() {
727
			for i := 0; i < int(p.Count()); i++ {
728
				elem := p.BranchPageElement(uint16(i))
729
				b._forEachPageNode(elem.Pgid(), depth+1, fn)
730
			}
731
		}
732
	} else {
733
		if !n.isLeaf {
734
			for _, inode := range n.inodes {
735
				b._forEachPageNode(inode.Pgid(), depth+1, fn)
736
			}
737
		}
738
	}
739
}
740
741
// spill writes all the nodes for this bucket to dirty pages.
742
func (b *Bucket) spill() error {
743
	// Spill all child buckets first.
744
	for name, child := range b.buckets {
745
		// If the child bucket is small enough and it has no child buckets then
746
		// write it inline into the parent bucket's page. Otherwise spill it
747
		// like a normal bucket and make the parent value a pointer to the page.
748
		var value []byte
749
		if child.inlineable() {
750
			child.free()
751
			value = child.write()
752
		} else {
753
			if err := child.spill(); err != nil {
754
				return err
755
			}
756
757
			// Update the child bucket header in this bucket.
758
			value = make([]byte, unsafe.Sizeof(common.InBucket{}))
759
			var bucket = (*common.InBucket)(unsafe.Pointer(&value[0]))
760
			*bucket = *child.InBucket
761
		}
762
763
		// Skip writing the bucket if there are no materialized nodes.
764
		if child.rootNode == nil {
765
			continue
766
		}
767
768
		// Update parent node.
769
		var c = b.Cursor()
770
		k, _, flags := c.seek([]byte(name))
771
		if !bytes.Equal([]byte(name), k) {
772
			panic(fmt.Sprintf("misplaced bucket header: %x -> %x", []byte(name), k))
773
		}
774
		if flags&common.BucketLeafFlag == 0 {
775
			panic(fmt.Sprintf("unexpected bucket header flag: %x", flags))
776
		}
777
		c.node().put([]byte(name), []byte(name), value, 0, common.BucketLeafFlag)
778
	}
779
780
	// Ignore if there's not a materialized root node.
781
	if b.rootNode == nil {
782
		return nil
783
	}
784
785
	// Spill nodes.
786
	if err := b.rootNode.spill(); err != nil {
787
		return err
788
	}
789
	b.rootNode = b.rootNode.root()
790
791
	// Update the root node for this bucket.
792
	if b.rootNode.pgid >= b.tx.meta.Pgid() {
793
		panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.Pgid()))
794
	}
795
	b.SetRootPage(b.rootNode.pgid)
796
797
	return nil
798
}
799
800
// inlineable returns true if a bucket is small enough to be written inline
801
// and if it contains no subbuckets. Otherwise, returns false.
802
func (b *Bucket) inlineable() bool {
803
	var n = b.rootNode
804
805
	// Bucket must only contain a single leaf node.
806
	if n == nil || !n.isLeaf {
807
		return false
808
	}
809
810
	// Bucket is not inlineable if it contains subbuckets or if it goes beyond
811
	// our threshold for inline bucket size.
812
	var size = common.PageHeaderSize
813
	for _, inode := range n.inodes {
814
		size += common.LeafPageElementSize + uintptr(len(inode.Key())) + uintptr(len(inode.Value()))
815
816
		if inode.Flags()&common.BucketLeafFlag != 0 {
817
			return false
818
		} else if size > b.maxInlineBucketSize() {
819
			return false
820
		}
821
	}
822
823
	return true
824
}
825
826
// Returns the maximum total size of a bucket to make it a candidate for inlining.
827
func (b *Bucket) maxInlineBucketSize() uintptr {
828
	return uintptr(b.tx.db.pageSize / 4)
829
}
830
831
// write allocates and writes a bucket to a byte slice.
832
func (b *Bucket) write() []byte {
833
	// Allocate the appropriate size.
834
	var n = b.rootNode
835
	var value = make([]byte, common.BucketHeaderSize+n.size())
836
837
	// Write a bucket header.
838
	var bucket = (*common.InBucket)(unsafe.Pointer(&value[0]))
839
	*bucket = *b.InBucket
840
841
	// Convert byte slice to a fake page and write the root node.
842
	var p = (*common.Page)(unsafe.Pointer(&value[common.BucketHeaderSize]))
843
	n.write(p)
844
845
	return value
846
}
847
848
// rebalance attempts to balance all nodes.
849
func (b *Bucket) rebalance() {
850
	for _, n := range b.nodes {
851
		n.rebalance()
852
	}
853
	for _, child := range b.buckets {
854
		child.rebalance()
855
	}
856
}
857
858
// node creates a node from a page and associates it with a given parent.
859
func (b *Bucket) node(pgId common.Pgid, parent *node) *node {
860
	common.Assert(b.nodes != nil, "nodes map expected")
861
862
	// Retrieve node if it's already been created.
863
	if n := b.nodes[pgId]; n != nil {
864
		return n
865
	}
866
867
	// Otherwise create a node and cache it.
868
	n := &node{bucket: b, parent: parent}
869
	if parent == nil {
870
		b.rootNode = n
871
	} else {
872
		parent.children = append(parent.children, n)
873
	}
874
875
	// Use the inline page if this is an inline bucket.
876
	var p = b.page
877
	if p == nil {
878
		p = b.tx.page(pgId)
879
	} else {
880
		// if p isn't nil, then it's an inline bucket.
881
		// The pgId must be 0 in this case.
882
		common.Verify(func() {
883
			common.Assert(pgId == 0, "The page ID (%d) isn't 0 for an inline bucket", pgId)
884
		})
885
	}
886
887
	// Read the page into the node and cache it.
888
	n.read(p)
889
	b.nodes[pgId] = n
890
891
	// Update statistics.
892
	b.tx.stats.IncNodeCount(1)
893
894
	return n
895
}
896
897
// free recursively frees all pages in the bucket.
898
func (b *Bucket) free() {
899
	if b.RootPage() == 0 {
900
		return
901
	}
902
903
	var tx = b.tx
904
	b.forEachPageNode(func(p *common.Page, n *node, _ int) {
905
		if p != nil {
906
			tx.db.freelist.Free(tx.meta.Txid(), p)
907
		} else {
908
			n.free()
909
		}
910
	})
911
	b.SetRootPage(0)
912
}
913
914
// dereference removes all references to the old mmap.
915
func (b *Bucket) dereference() {
916
	if b.rootNode != nil {
917
		b.rootNode.root().dereference()
918
	}
919
920
	for _, child := range b.buckets {
921
		child.dereference()
922
	}
923
}
924
925
// pageNode returns the in-memory node, if it exists.
926
// Otherwise, returns the underlying page.
927
func (b *Bucket) pageNode(id common.Pgid) (*common.Page, *node) {
928
	// Inline buckets have a fake page embedded in their value so treat them
929
	// differently. We'll return the rootNode (if available) or the fake page.
930
	if b.RootPage() == 0 {
931
		if id != 0 {
932
			panic(fmt.Sprintf("inline bucket non-zero page access(2): %d != 0", id))
933
		}
934
		if b.rootNode != nil {
935
			return nil, b.rootNode
936
		}
937
		return b.page, nil
938
	}
939
940
	// Check the node cache for non-inline buckets.
941
	if b.nodes != nil {
942
		if n := b.nodes[id]; n != nil {
943
			return nil, n
944
		}
945
	}
946
947
	// Finally lookup the page from the transaction if no node is materialized.
948
	return b.tx.page(id), nil
949
}
950
951
// BucketStats records statistics about resources used by a bucket.
952
type BucketStats struct {
953
	// Page count statistics.
954
	BranchPageN     int // number of logical branch pages
955
	BranchOverflowN int // number of physical branch overflow pages
956
	LeafPageN       int // number of logical leaf pages
957
	LeafOverflowN   int // number of physical leaf overflow pages
958
959
	// Tree statistics.
960
	KeyN  int // number of keys/value pairs
961
	Depth int // number of levels in B+tree
962
963
	// Page size utilization.
964
	BranchAlloc int // bytes allocated for physical branch pages
965
	BranchInuse int // bytes actually used for branch data
966
	LeafAlloc   int // bytes allocated for physical leaf pages
967
	LeafInuse   int // bytes actually used for leaf data
968
969
	// Bucket statistics
970
	BucketN           int // total number of buckets including the top bucket
971
	InlineBucketN     int // total number on inlined buckets
972
	InlineBucketInuse int // bytes used for inlined buckets (also accounted for in LeafInuse)
973
}
974
975
func (s *BucketStats) Add(other BucketStats) {
976
	s.BranchPageN += other.BranchPageN
977
	s.BranchOverflowN += other.BranchOverflowN
978
	s.LeafPageN += other.LeafPageN
979
	s.LeafOverflowN += other.LeafOverflowN
980
	s.KeyN += other.KeyN
981
	if s.Depth < other.Depth {
982
		s.Depth = other.Depth
983
	}
984
	s.BranchAlloc += other.BranchAlloc
985
	s.BranchInuse += other.BranchInuse
986
	s.LeafAlloc += other.LeafAlloc
987
	s.LeafInuse += other.LeafInuse
988
989
	s.BucketN += other.BucketN
990
	s.InlineBucketN += other.InlineBucketN
991
	s.InlineBucketInuse += other.InlineBucketInuse
992
}
993
994
// cloneBytes returns a copy of a given slice.
995
func cloneBytes(v []byte) []byte {
996
	var clone = make([]byte, len(v))
997
	copy(clone, v)
998
	return clone
999
}
1000
1001
type BucketStructure struct {
1002
	Name     string            `json:"name"`              // name of the bucket
1003
	KeyN     int               `json:"keyN"`              // number of key/value pairs
1004
	Children []BucketStructure `json:"buckets,omitempty"` // child buckets
1005
}