all repos

rss-tools @ a5ac52722b131734c74504b6e6f4d9900536cac7

get rss feed from sources that(i need and) dont provide one

rss-tools/vendor/go.etcd.io/bbolt/tx.go (view raw)

Oleksandr Smirnov Oleksandr Smirnov
olexsmir@gmail.com
we're vendoring now, 7 days ago
1
package bbolt
2
3
import (
4
	"errors"
5
	"fmt"
6
	"io"
7
	"os"
8
	"runtime"
9
	"sort"
10
	"strings"
11
	"sync/atomic"
12
	"time"
13
	"unsafe"
14
15
	berrors "go.etcd.io/bbolt/errors"
16
	"go.etcd.io/bbolt/internal/common"
17
)
18
19
// Tx represents a read-only or read/write transaction on the database.
20
// Read-only transactions can be used for retrieving values for keys and creating cursors.
21
// Read/write transactions can create and remove buckets and create and remove keys.
22
//
23
// IMPORTANT: You must commit or rollback transactions when you are done with
24
// them. Pages can not be reclaimed by the writer until no more transactions
25
// are using them. A long running read transaction can cause the database to
26
// quickly grow.
27
type Tx struct {
28
	writable       bool
29
	managed        bool
30
	db             *DB
31
	meta           *common.Meta
32
	root           Bucket
33
	pages          map[common.Pgid]*common.Page
34
	stats          TxStats
35
	commitHandlers []func()
36
37
	// WriteFlag specifies the flag for write-related methods like WriteTo().
38
	// Tx opens the database file with the specified flag to copy the data.
39
	//
40
	// By default, the flag is unset, which works well for mostly in-memory
41
	// workloads. For databases that are much larger than available RAM,
42
	// set the flag to syscall.O_DIRECT to avoid trashing the page cache.
43
	WriteFlag int
44
}
45
46
// init initializes the transaction.
47
func (tx *Tx) init(db *DB) {
48
	tx.db = db
49
	tx.pages = nil
50
51
	// Copy the meta page since it can be changed by the writer.
52
	tx.meta = &common.Meta{}
53
	db.meta().Copy(tx.meta)
54
55
	// Copy over the root bucket.
56
	tx.root = newBucket(tx)
57
	tx.root.InBucket = &common.InBucket{}
58
	*tx.root.InBucket = *(tx.meta.RootBucket())
59
60
	// Increment the transaction id and add a page cache for writable transactions.
61
	if tx.writable {
62
		tx.pages = make(map[common.Pgid]*common.Page)
63
		tx.meta.IncTxid()
64
	}
65
}
66
67
// ID returns the transaction id.
68
func (tx *Tx) ID() int {
69
	if tx == nil || tx.meta == nil {
70
		return -1
71
	}
72
	return int(tx.meta.Txid())
73
}
74
75
// DB returns a reference to the database that created the transaction.
76
func (tx *Tx) DB() *DB {
77
	return tx.db
78
}
79
80
// Size returns current database size in bytes as seen by this transaction.
81
func (tx *Tx) Size() int64 {
82
	return int64(tx.meta.Pgid()) * int64(tx.db.pageSize)
83
}
84
85
// Writable returns whether the transaction can perform write operations.
86
func (tx *Tx) Writable() bool {
87
	return tx.writable
88
}
89
90
// Cursor creates a cursor associated with the root bucket.
91
// All items in the cursor will return a nil value because all root bucket keys point to buckets.
92
// The cursor is only valid as long as the transaction is open.
93
// Do not use a cursor after the transaction is closed.
94
func (tx *Tx) Cursor() *Cursor {
95
	return tx.root.Cursor()
96
}
97
98
// Stats retrieves a copy of the current transaction statistics.
99
func (tx *Tx) Stats() TxStats {
100
	return tx.stats
101
}
102
103
// Inspect returns the structure of the database.
104
func (tx *Tx) Inspect() BucketStructure {
105
	return tx.root.Inspect()
106
}
107
108
// Bucket retrieves a bucket by name.
109
// Returns nil if the bucket does not exist.
110
// The bucket instance is only valid for the lifetime of the transaction.
111
func (tx *Tx) Bucket(name []byte) *Bucket {
112
	return tx.root.Bucket(name)
113
}
114
115
// CreateBucket creates a new bucket.
116
// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
117
// The bucket instance is only valid for the lifetime of the transaction.
118
func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
119
	return tx.root.CreateBucket(name)
120
}
121
122
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
123
// Returns an error if the bucket name is blank, or if the bucket name is too long.
124
// The bucket instance is only valid for the lifetime of the transaction.
125
func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
126
	return tx.root.CreateBucketIfNotExists(name)
127
}
128
129
// DeleteBucket deletes a bucket.
130
// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
131
func (tx *Tx) DeleteBucket(name []byte) error {
132
	return tx.root.DeleteBucket(name)
133
}
134
135
// MoveBucket moves a sub-bucket from the source bucket to the destination bucket.
136
// Returns an error if
137
//  1. the sub-bucket cannot be found in the source bucket;
138
//  2. or the key already exists in the destination bucket;
139
//  3. the key represents a non-bucket value.
140
//
141
// If src is nil, it means moving a top level bucket into the target bucket.
142
// If dst is nil, it means converting the child bucket into a top level bucket.
143
func (tx *Tx) MoveBucket(child []byte, src *Bucket, dst *Bucket) error {
144
	if src == nil {
145
		src = &tx.root
146
	}
147
	if dst == nil {
148
		dst = &tx.root
149
	}
150
	return src.MoveBucket(child, dst)
151
}
152
153
// ForEach executes a function for each bucket in the root.
154
// If the provided function returns an error then the iteration is stopped and
155
// the error is returned to the caller.
156
func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
157
	return tx.root.ForEach(func(k, v []byte) error {
158
		return fn(k, tx.root.Bucket(k))
159
	})
160
}
161
162
// OnCommit adds a handler function to be executed after the transaction successfully commits.
163
func (tx *Tx) OnCommit(fn func()) {
164
	tx.commitHandlers = append(tx.commitHandlers, fn)
165
}
166
167
// Commit writes all changes to disk, updates the meta page and closes the transaction.
168
// Returns an error if a disk write error occurs, or if Commit is
169
// called on a read-only transaction.
170
func (tx *Tx) Commit() (err error) {
171
	txId := tx.ID()
172
	lg := tx.db.Logger()
173
	if lg != discardLogger {
174
		lg.Debugf("Committing transaction %d", txId)
175
		defer func() {
176
			if err != nil {
177
				lg.Errorf("Committing transaction failed: %v", err)
178
			} else {
179
				lg.Debugf("Committing transaction %d successfully", txId)
180
			}
181
		}()
182
	}
183
184
	common.Assert(!tx.managed, "managed tx commit not allowed")
185
	if tx.db == nil {
186
		return berrors.ErrTxClosed
187
	} else if !tx.writable {
188
		return berrors.ErrTxNotWritable
189
	}
190
191
	// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
192
193
	// Rebalance nodes which have had deletions.
194
	var startTime = time.Now()
195
	tx.root.rebalance()
196
	if tx.stats.GetRebalance() > 0 {
197
		tx.stats.IncRebalanceTime(time.Since(startTime))
198
	}
199
200
	opgid := tx.meta.Pgid()
201
202
	// spill data onto dirty pages.
203
	startTime = time.Now()
204
	if err = tx.root.spill(); err != nil {
205
		lg.Errorf("spilling data onto dirty pages failed: %v", err)
206
		tx.rollback()
207
		return err
208
	}
209
	tx.stats.IncSpillTime(time.Since(startTime))
210
211
	// Free the old root bucket.
212
	tx.meta.RootBucket().SetRootPage(tx.root.RootPage())
213
214
	// Free the old freelist because commit writes out a fresh freelist.
215
	if tx.meta.Freelist() != common.PgidNoFreelist {
216
		tx.db.freelist.Free(tx.meta.Txid(), tx.db.page(tx.meta.Freelist()))
217
	}
218
219
	if !tx.db.NoFreelistSync {
220
		err = tx.commitFreelist()
221
		if err != nil {
222
			lg.Errorf("committing freelist failed: %v", err)
223
			return err
224
		}
225
	} else {
226
		tx.meta.SetFreelist(common.PgidNoFreelist)
227
	}
228
229
	// If the high water mark has moved up then attempt to grow the database.
230
	if tx.meta.Pgid() > opgid {
231
		_ = errors.New("")
232
		// gofail: var lackOfDiskSpace string
233
		// tx.rollback()
234
		// return errors.New(lackOfDiskSpace)
235
		if err = tx.db.grow(int(tx.meta.Pgid()+1) * tx.db.pageSize); err != nil {
236
			lg.Errorf("growing db size failed, pgid: %d, pagesize: %d, error: %v", tx.meta.Pgid(), tx.db.pageSize, err)
237
			tx.rollback()
238
			return err
239
		}
240
	}
241
242
	// Write dirty pages to disk.
243
	startTime = time.Now()
244
	if err = tx.write(); err != nil {
245
		lg.Errorf("writing data failed: %v", err)
246
		tx.rollback()
247
		return err
248
	}
249
250
	// If strict mode is enabled then perform a consistency check.
251
	if tx.db.StrictMode {
252
		ch := tx.Check()
253
		var errs []string
254
		for {
255
			chkErr, ok := <-ch
256
			if !ok {
257
				break
258
			}
259
			errs = append(errs, chkErr.Error())
260
		}
261
		if len(errs) > 0 {
262
			panic("check fail: " + strings.Join(errs, "\n"))
263
		}
264
	}
265
266
	// Write meta to disk.
267
	if err = tx.writeMeta(); err != nil {
268
		lg.Errorf("writeMeta failed: %v", err)
269
		tx.rollback()
270
		return err
271
	}
272
	tx.stats.IncWriteTime(time.Since(startTime))
273
274
	// Finalize the transaction.
275
	tx.close()
276
277
	// Execute commit handlers now that the locks have been removed.
278
	for _, fn := range tx.commitHandlers {
279
		fn()
280
	}
281
282
	return nil
283
}
284
285
func (tx *Tx) commitFreelist() error {
286
	// Allocate new pages for the new free list. This will overestimate
287
	// the size of the freelist but not underestimate the size (which would be bad).
288
	p, err := tx.allocate((tx.db.freelist.EstimatedWritePageSize() / tx.db.pageSize) + 1)
289
	if err != nil {
290
		tx.rollback()
291
		return err
292
	}
293
294
	tx.db.freelist.Write(p)
295
	tx.meta.SetFreelist(p.Id())
296
297
	return nil
298
}
299
300
// Rollback closes the transaction and ignores all previous updates. Read-only
301
// transactions must be rolled back and not committed.
302
func (tx *Tx) Rollback() error {
303
	common.Assert(!tx.managed, "managed tx rollback not allowed")
304
	if tx.db == nil {
305
		return berrors.ErrTxClosed
306
	}
307
	tx.nonPhysicalRollback()
308
	return nil
309
}
310
311
// nonPhysicalRollback is called when user calls Rollback directly, in this case we do not need to reload the free pages from disk.
312
func (tx *Tx) nonPhysicalRollback() {
313
	if tx.db == nil {
314
		return
315
	}
316
	if tx.writable {
317
		tx.db.freelist.Rollback(tx.meta.Txid())
318
	}
319
	tx.close()
320
}
321
322
// rollback needs to reload the free pages from disk in case some system error happens like fsync error.
323
func (tx *Tx) rollback() {
324
	if tx.db == nil {
325
		return
326
	}
327
	if tx.writable {
328
		tx.db.freelist.Rollback(tx.meta.Txid())
329
		// When mmap fails, the `data`, `dataref` and `datasz` may be reset to
330
		// zero values, and there is no way to reload free page IDs in this case.
331
		if tx.db.data != nil {
332
			if !tx.db.hasSyncedFreelist() {
333
				// Reconstruct free page list by scanning the DB to get the whole free page list.
334
				// Note: scanning the whole db is heavy if your db size is large in NoSyncFreeList mode.
335
				tx.db.freelist.NoSyncReload(tx.db.freepages())
336
			} else {
337
				// Read free page list from freelist page.
338
				tx.db.freelist.Reload(tx.db.page(tx.db.meta().Freelist()))
339
			}
340
		}
341
	}
342
	tx.close()
343
}
344
345
func (tx *Tx) close() {
346
	if tx.db == nil {
347
		return
348
	}
349
	if tx.writable {
350
		// Grab freelist stats.
351
		var freelistFreeN = tx.db.freelist.FreeCount()
352
		var freelistPendingN = tx.db.freelist.PendingCount()
353
		var freelistAlloc = tx.db.freelist.EstimatedWritePageSize()
354
355
		// Remove transaction ref & writer lock.
356
		tx.db.rwtx = nil
357
		tx.db.rwlock.Unlock()
358
359
		// Merge statistics.
360
		tx.db.statlock.Lock()
361
		tx.db.stats.FreePageN = freelistFreeN
362
		tx.db.stats.PendingPageN = freelistPendingN
363
		tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
364
		tx.db.stats.FreelistInuse = freelistAlloc
365
		tx.db.stats.TxStats.add(&tx.stats)
366
		tx.db.statlock.Unlock()
367
	} else {
368
		tx.db.removeTx(tx)
369
	}
370
371
	// Clear all references.
372
	tx.db = nil
373
	tx.meta = nil
374
	tx.root = Bucket{tx: tx}
375
	tx.pages = nil
376
}
377
378
// Copy writes the entire database to a writer.
379
// This function exists for backwards compatibility.
380
//
381
// Deprecated: Use WriteTo() instead.
382
func (tx *Tx) Copy(w io.Writer) error {
383
	_, err := tx.WriteTo(w)
384
	return err
385
}
386
387
// WriteTo writes the entire database to a writer.
388
// If err == nil then exactly tx.Size() bytes will be written into the writer.
389
func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
390
	var f *os.File
391
	// There is a risk that between the time a read-only transaction
392
	// is created and the time the file is actually opened, the
393
	// underlying db file at tx.db.path may have been replaced
394
	// (e.g. via rename). In that case, opening the file again would
395
	// unexpectedly point to a different file, rather than the one
396
	// the transaction was based on.
397
	//
398
	// To overcome this, we reuse the already opened file handle when
399
	// WritFlag not set. When the WriteFlag is set, we reopen the file
400
	// but verify that it still refers to the same underlying file
401
	// (by device and inode). If it does not, we fall back to
402
	// reusing the existing already opened file handle.
403
	if tx.WriteFlag != 0 {
404
		// Attempt to open reader with WriteFlag
405
		f, err = tx.db.openFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0)
406
		if err != nil {
407
			return 0, err
408
		}
409
410
		if ok, err := sameFile(tx.db.file, f); !ok {
411
			lg := tx.db.Logger()
412
			if cerr := f.Close(); cerr != nil {
413
				lg.Errorf("failed to close the file (%s): %v", tx.db.path, cerr)
414
			}
415
			lg.Warningf("The underlying file has changed, so reuse the already opened file (%s): %v", tx.db.path, err)
416
			f = tx.db.file
417
		} else {
418
			defer func() {
419
				if cerr := f.Close(); err == nil {
420
					err = cerr
421
				}
422
			}()
423
		}
424
	} else {
425
		f = tx.db.file
426
	}
427
428
	// Generate a meta page. We use the same page data for both meta pages.
429
	buf := make([]byte, tx.db.pageSize)
430
	page := (*common.Page)(unsafe.Pointer(&buf[0]))
431
	page.SetFlags(common.MetaPageFlag)
432
	*page.Meta() = *tx.meta
433
434
	// Write meta 0.
435
	page.SetId(0)
436
	page.Meta().SetChecksum(page.Meta().Sum64())
437
	nn, err := w.Write(buf)
438
	n += int64(nn)
439
	if err != nil {
440
		return n, fmt.Errorf("meta 0 copy: %s", err)
441
	}
442
443
	// Write meta 1 with a lower transaction id.
444
	page.SetId(1)
445
	page.Meta().DecTxid()
446
	page.Meta().SetChecksum(page.Meta().Sum64())
447
	nn, err = w.Write(buf)
448
	n += int64(nn)
449
	if err != nil {
450
		return n, fmt.Errorf("meta 1 copy: %s", err)
451
	}
452
453
	// Copy data pages using a SectionReader to avoid affecting f's offset.
454
	dataOffset := int64(tx.db.pageSize * 2)
455
	dataSize := tx.Size() - dataOffset
456
	sr := io.NewSectionReader(f, dataOffset, dataSize)
457
458
	// Copy data pages.
459
	wn, err := io.CopyN(w, sr, dataSize)
460
	n += wn
461
	if err != nil {
462
		return n, err
463
	}
464
465
	return n, nil
466
}
467
468
func sameFile(f1, f2 *os.File) (bool, error) {
469
	fi1, err := f1.Stat()
470
	if err != nil {
471
		return false, fmt.Errorf("failed to get fileInfo of the first file (%s): %w", f1.Name(), err)
472
	}
473
	fi2, err := f2.Stat()
474
	if err != nil {
475
		return false, fmt.Errorf("failed to get fileInfo of the second file (%s): %w", f2.Name(), err)
476
	}
477
478
	return os.SameFile(fi1, fi2), nil
479
}
480
481
// CopyFile copies the entire database to file at the given path.
482
// A reader transaction is maintained during the copy so it is safe to continue
483
// using the database while a copy is in progress.
484
func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
485
	f, err := tx.db.openFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
486
	if err != nil {
487
		return err
488
	}
489
490
	_, err = tx.WriteTo(f)
491
	if err != nil {
492
		_ = f.Close()
493
		return err
494
	}
495
	return f.Close()
496
}
497
498
// allocate returns a contiguous block of memory starting at a given page.
499
func (tx *Tx) allocate(count int) (*common.Page, error) {
500
	lg := tx.db.Logger()
501
	p, err := tx.db.allocate(tx.meta.Txid(), count)
502
	if err != nil {
503
		lg.Errorf("allocating failed, txid: %d, count: %d, error: %v", tx.meta.Txid(), count, err)
504
		return nil, err
505
	}
506
507
	// Save to our page cache.
508
	tx.pages[p.Id()] = p
509
510
	// Update statistics.
511
	tx.stats.IncPageCount(int64(count))
512
	tx.stats.IncPageAlloc(int64(count * tx.db.pageSize))
513
514
	return p, nil
515
}
516
517
// write writes any dirty pages to disk.
518
func (tx *Tx) write() error {
519
	// Sort pages by id.
520
	lg := tx.db.Logger()
521
	pages := make(common.Pages, 0, len(tx.pages))
522
	for _, p := range tx.pages {
523
		pages = append(pages, p)
524
	}
525
	// Clear out page cache early.
526
	tx.pages = make(map[common.Pgid]*common.Page)
527
	sort.Sort(pages)
528
529
	// Write pages to disk in order.
530
	for _, p := range pages {
531
		rem := (uint64(p.Overflow()) + 1) * uint64(tx.db.pageSize)
532
		offset := int64(p.Id()) * int64(tx.db.pageSize)
533
		var written uintptr
534
535
		// Write out page in "max allocation" sized chunks.
536
		for {
537
			sz := rem
538
			if sz > common.MaxAllocSize-1 {
539
				sz = common.MaxAllocSize - 1
540
			}
541
			buf := common.UnsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))
542
543
			if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
544
				lg.Errorf("writeAt failed, offset: %d: %w", offset, err)
545
				return err
546
			}
547
548
			// Update statistics.
549
			tx.stats.IncWrite(1)
550
551
			// Exit inner for loop if we've written all the chunks.
552
			rem -= sz
553
			if rem == 0 {
554
				break
555
			}
556
557
			// Otherwise move offset forward and move pointer to next chunk.
558
			offset += int64(sz)
559
			written += uintptr(sz)
560
		}
561
	}
562
563
	// Ignore file sync if flag is set on DB.
564
	if !tx.db.NoSync || common.IgnoreNoSync {
565
		// gofail: var beforeSyncDataPages struct{}
566
		if err := fdatasync(tx.db); err != nil {
567
			lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err)
568
			return err
569
		}
570
	}
571
572
	// Put small pages back to page pool.
573
	for _, p := range pages {
574
		// Ignore page sizes over 1 page.
575
		// These are allocated using make() instead of the page pool.
576
		if int(p.Overflow()) != 0 {
577
			continue
578
		}
579
580
		buf := common.UnsafeByteSlice(unsafe.Pointer(p), 0, 0, tx.db.pageSize)
581
582
		// See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
583
		for i := range buf {
584
			buf[i] = 0
585
		}
586
		tx.db.pagePool.Put(buf) //nolint:staticcheck
587
	}
588
589
	return nil
590
}
591
592
// writeMeta writes the meta to the disk.
593
func (tx *Tx) writeMeta() error {
594
	// gofail: var beforeWriteMetaError string
595
	// return errors.New(beforeWriteMetaError)
596
597
	// Create a temporary buffer for the meta page.
598
	lg := tx.db.Logger()
599
	buf := make([]byte, tx.db.pageSize)
600
	p := tx.db.pageInBuffer(buf, 0)
601
	tx.meta.Write(p)
602
603
	// Write the meta page to file.
604
	tx.db.metalock.Lock()
605
	if _, err := tx.db.ops.writeAt(buf, int64(p.Id())*int64(tx.db.pageSize)); err != nil {
606
		tx.db.metalock.Unlock()
607
		lg.Errorf("writeAt failed, pgid: %d, pageSize: %d, error: %v", p.Id(), tx.db.pageSize, err)
608
		return err
609
	}
610
	tx.db.metalock.Unlock()
611
	if !tx.db.NoSync || common.IgnoreNoSync {
612
		// gofail: var beforeSyncMetaPage struct{}
613
		if err := fdatasync(tx.db); err != nil {
614
			lg.Errorf("[GOOS: %s, GOARCH: %s] fdatasync failed: %w", runtime.GOOS, runtime.GOARCH, err)
615
			return err
616
		}
617
	}
618
619
	// Update statistics.
620
	tx.stats.IncWrite(1)
621
622
	return nil
623
}
624
625
// page returns a reference to the page with a given id.
626
// If page has been written to then a temporary buffered page is returned.
627
func (tx *Tx) page(id common.Pgid) *common.Page {
628
	// Check the dirty pages first.
629
	if tx.pages != nil {
630
		if p, ok := tx.pages[id]; ok {
631
			p.FastCheck(id)
632
			return p
633
		}
634
	}
635
636
	// Otherwise return directly from the mmap.
637
	p := tx.db.page(id)
638
	p.FastCheck(id)
639
	return p
640
}
641
642
// forEachPage iterates over every page within a given page and executes a function.
643
func (tx *Tx) forEachPage(pgidnum common.Pgid, fn func(*common.Page, int, []common.Pgid)) {
644
	stack := make([]common.Pgid, 10)
645
	stack[0] = pgidnum
646
	tx.forEachPageInternal(stack[:1], fn)
647
}
648
649
func (tx *Tx) forEachPageInternal(pgidstack []common.Pgid, fn func(*common.Page, int, []common.Pgid)) {
650
	p := tx.page(pgidstack[len(pgidstack)-1])
651
652
	// Execute function.
653
	fn(p, len(pgidstack)-1, pgidstack)
654
655
	// Recursively loop over children.
656
	if p.IsBranchPage() {
657
		for i := 0; i < int(p.Count()); i++ {
658
			elem := p.BranchPageElement(uint16(i))
659
			tx.forEachPageInternal(append(pgidstack, elem.Pgid()), fn)
660
		}
661
	}
662
}
663
664
// Page returns page information for a given page number.
665
// This is only safe for concurrent use when used by a writable transaction.
666
func (tx *Tx) Page(id int) (*common.PageInfo, error) {
667
	if tx.db == nil {
668
		return nil, berrors.ErrTxClosed
669
	} else if common.Pgid(id) >= tx.meta.Pgid() {
670
		return nil, nil
671
	}
672
673
	if tx.db.freelist == nil {
674
		return nil, berrors.ErrFreePagesNotLoaded
675
	}
676
677
	// Build the page info.
678
	p := tx.db.page(common.Pgid(id))
679
	info := &common.PageInfo{
680
		ID:            id,
681
		Count:         int(p.Count()),
682
		OverflowCount: int(p.Overflow()),
683
	}
684
685
	// Determine the type (or if it's free).
686
	if tx.db.freelist.Freed(common.Pgid(id)) {
687
		info.Type = "free"
688
	} else {
689
		info.Type = p.Typ()
690
	}
691
692
	return info, nil
693
}
694
695
// TxStats represents statistics about the actions performed by the transaction.
696
type TxStats struct {
697
	// Page statistics.
698
	//
699
	// DEPRECATED: Use GetPageCount() or IncPageCount()
700
	PageCount int64 // number of page allocations
701
	// DEPRECATED: Use GetPageAlloc() or IncPageAlloc()
702
	PageAlloc int64 // total bytes allocated
703
704
	// Cursor statistics.
705
	//
706
	// DEPRECATED: Use GetCursorCount() or IncCursorCount()
707
	CursorCount int64 // number of cursors created
708
709
	// Node statistics
710
	//
711
	// DEPRECATED: Use GetNodeCount() or IncNodeCount()
712
	NodeCount int64 // number of node allocations
713
	// DEPRECATED: Use GetNodeDeref() or IncNodeDeref()
714
	NodeDeref int64 // number of node dereferences
715
716
	// Rebalance statistics.
717
	//
718
	// DEPRECATED: Use GetRebalance() or IncRebalance()
719
	Rebalance int64 // number of node rebalances
720
	// DEPRECATED: Use GetRebalanceTime() or IncRebalanceTime()
721
	RebalanceTime time.Duration // total time spent rebalancing
722
723
	// Split/Spill statistics.
724
	//
725
	// DEPRECATED: Use GetSplit() or IncSplit()
726
	Split int64 // number of nodes split
727
	// DEPRECATED: Use GetSpill() or IncSpill()
728
	Spill int64 // number of nodes spilled
729
	// DEPRECATED: Use GetSpillTime() or IncSpillTime()
730
	SpillTime time.Duration // total time spent spilling
731
732
	// Write statistics.
733
	//
734
	// DEPRECATED: Use GetWrite() or IncWrite()
735
	Write int64 // number of writes performed
736
	// DEPRECATED: Use GetWriteTime() or IncWriteTime()
737
	WriteTime time.Duration // total time spent writing to disk
738
}
739
740
func (s *TxStats) add(other *TxStats) {
741
	s.IncPageCount(other.GetPageCount())
742
	s.IncPageAlloc(other.GetPageAlloc())
743
	s.IncCursorCount(other.GetCursorCount())
744
	s.IncNodeCount(other.GetNodeCount())
745
	s.IncNodeDeref(other.GetNodeDeref())
746
	s.IncRebalance(other.GetRebalance())
747
	s.IncRebalanceTime(other.GetRebalanceTime())
748
	s.IncSplit(other.GetSplit())
749
	s.IncSpill(other.GetSpill())
750
	s.IncSpillTime(other.GetSpillTime())
751
	s.IncWrite(other.GetWrite())
752
	s.IncWriteTime(other.GetWriteTime())
753
}
754
755
// Sub calculates and returns the difference between two sets of transaction stats.
756
// This is useful when obtaining stats at two different points and time and
757
// you need the performance counters that occurred within that time span.
758
func (s *TxStats) Sub(other *TxStats) TxStats {
759
	var diff TxStats
760
	diff.PageCount = s.GetPageCount() - other.GetPageCount()
761
	diff.PageAlloc = s.GetPageAlloc() - other.GetPageAlloc()
762
	diff.CursorCount = s.GetCursorCount() - other.GetCursorCount()
763
	diff.NodeCount = s.GetNodeCount() - other.GetNodeCount()
764
	diff.NodeDeref = s.GetNodeDeref() - other.GetNodeDeref()
765
	diff.Rebalance = s.GetRebalance() - other.GetRebalance()
766
	diff.RebalanceTime = s.GetRebalanceTime() - other.GetRebalanceTime()
767
	diff.Split = s.GetSplit() - other.GetSplit()
768
	diff.Spill = s.GetSpill() - other.GetSpill()
769
	diff.SpillTime = s.GetSpillTime() - other.GetSpillTime()
770
	diff.Write = s.GetWrite() - other.GetWrite()
771
	diff.WriteTime = s.GetWriteTime() - other.GetWriteTime()
772
	return diff
773
}
774
775
// GetPageCount returns PageCount atomically.
776
func (s *TxStats) GetPageCount() int64 {
777
	return atomic.LoadInt64(&s.PageCount)
778
}
779
780
// IncPageCount increases PageCount atomically and returns the new value.
781
func (s *TxStats) IncPageCount(delta int64) int64 {
782
	return atomic.AddInt64(&s.PageCount, delta)
783
}
784
785
// GetPageAlloc returns PageAlloc atomically.
786
func (s *TxStats) GetPageAlloc() int64 {
787
	return atomic.LoadInt64(&s.PageAlloc)
788
}
789
790
// IncPageAlloc increases PageAlloc atomically and returns the new value.
791
func (s *TxStats) IncPageAlloc(delta int64) int64 {
792
	return atomic.AddInt64(&s.PageAlloc, delta)
793
}
794
795
// GetCursorCount returns CursorCount atomically.
796
func (s *TxStats) GetCursorCount() int64 {
797
	return atomic.LoadInt64(&s.CursorCount)
798
}
799
800
// IncCursorCount increases CursorCount atomically and return the new value.
801
func (s *TxStats) IncCursorCount(delta int64) int64 {
802
	return atomic.AddInt64(&s.CursorCount, delta)
803
}
804
805
// GetNodeCount returns NodeCount atomically.
806
func (s *TxStats) GetNodeCount() int64 {
807
	return atomic.LoadInt64(&s.NodeCount)
808
}
809
810
// IncNodeCount increases NodeCount atomically and returns the new value.
811
func (s *TxStats) IncNodeCount(delta int64) int64 {
812
	return atomic.AddInt64(&s.NodeCount, delta)
813
}
814
815
// GetNodeDeref returns NodeDeref atomically.
816
func (s *TxStats) GetNodeDeref() int64 {
817
	return atomic.LoadInt64(&s.NodeDeref)
818
}
819
820
// IncNodeDeref increases NodeDeref atomically and returns the new value.
821
func (s *TxStats) IncNodeDeref(delta int64) int64 {
822
	return atomic.AddInt64(&s.NodeDeref, delta)
823
}
824
825
// GetRebalance returns Rebalance atomically.
826
func (s *TxStats) GetRebalance() int64 {
827
	return atomic.LoadInt64(&s.Rebalance)
828
}
829
830
// IncRebalance increases Rebalance atomically and returns the new value.
831
func (s *TxStats) IncRebalance(delta int64) int64 {
832
	return atomic.AddInt64(&s.Rebalance, delta)
833
}
834
835
// GetRebalanceTime returns RebalanceTime atomically.
836
func (s *TxStats) GetRebalanceTime() time.Duration {
837
	return atomicLoadDuration(&s.RebalanceTime)
838
}
839
840
// IncRebalanceTime increases RebalanceTime atomically and returns the new value.
841
func (s *TxStats) IncRebalanceTime(delta time.Duration) time.Duration {
842
	return atomicAddDuration(&s.RebalanceTime, delta)
843
}
844
845
// GetSplit returns Split atomically.
846
func (s *TxStats) GetSplit() int64 {
847
	return atomic.LoadInt64(&s.Split)
848
}
849
850
// IncSplit increases Split atomically and returns the new value.
851
func (s *TxStats) IncSplit(delta int64) int64 {
852
	return atomic.AddInt64(&s.Split, delta)
853
}
854
855
// GetSpill returns Spill atomically.
856
func (s *TxStats) GetSpill() int64 {
857
	return atomic.LoadInt64(&s.Spill)
858
}
859
860
// IncSpill increases Spill atomically and returns the new value.
861
func (s *TxStats) IncSpill(delta int64) int64 {
862
	return atomic.AddInt64(&s.Spill, delta)
863
}
864
865
// GetSpillTime returns SpillTime atomically.
866
func (s *TxStats) GetSpillTime() time.Duration {
867
	return atomicLoadDuration(&s.SpillTime)
868
}
869
870
// IncSpillTime increases SpillTime atomically and returns the new value.
871
func (s *TxStats) IncSpillTime(delta time.Duration) time.Duration {
872
	return atomicAddDuration(&s.SpillTime, delta)
873
}
874
875
// GetWrite returns Write atomically.
876
func (s *TxStats) GetWrite() int64 {
877
	return atomic.LoadInt64(&s.Write)
878
}
879
880
// IncWrite increases Write atomically and returns the new value.
881
func (s *TxStats) IncWrite(delta int64) int64 {
882
	return atomic.AddInt64(&s.Write, delta)
883
}
884
885
// GetWriteTime returns WriteTime atomically.
886
func (s *TxStats) GetWriteTime() time.Duration {
887
	return atomicLoadDuration(&s.WriteTime)
888
}
889
890
// IncWriteTime increases WriteTime atomically and returns the new value.
891
func (s *TxStats) IncWriteTime(delta time.Duration) time.Duration {
892
	return atomicAddDuration(&s.WriteTime, delta)
893
}
894
895
func atomicAddDuration(ptr *time.Duration, du time.Duration) time.Duration {
896
	return time.Duration(atomic.AddInt64((*int64)(unsafe.Pointer(ptr)), int64(du)))
897
}
898
899
func atomicLoadDuration(ptr *time.Duration) time.Duration {
900
	return time.Duration(atomic.LoadInt64((*int64)(unsafe.Pointer(ptr))))
901
}