98 lines
2.3 KiB
Go
98 lines
2.3 KiB
Go
package shock
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
// Sequencer defines a way to assign a unique sequence ID to an object
|
|
type Sequencer interface {
|
|
AssignSeq(uint64)
|
|
}
|
|
|
|
// Put adds a new value to a bucket
|
|
func (d *DB) Put(bucket string, val Sequencer) error {
|
|
return d.Bolt.Update(func(tx *bbolt.Tx) error {
|
|
b := tx.Bucket([]byte(bucket))
|
|
|
|
seq, err := b.NextSequence()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
val.AssignSeq(seq)
|
|
|
|
serial, err := d.Serializer.Marshal(val)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return b.Put([]byte(fmt.Sprintf("%d", seq)), serial)
|
|
})
|
|
}
|
|
|
|
// PutWithKey adds a new value to the bucket with a defined key
|
|
func (d *DB) PutWithKey(bucket string, key, val interface{}) error {
|
|
return d.Bolt.Update(func(tx *bbolt.Tx) error {
|
|
b := tx.Bucket([]byte(bucket))
|
|
|
|
serial, err := d.Serializer.Marshal(val)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return b.Put([]byte(fmt.Sprintf("%v", key)), serial)
|
|
})
|
|
}
|
|
|
|
// Get returns a value from a bucket with the specified sequence ID
|
|
func (d *DB) Get(bucket string, id, val interface{}) error {
|
|
if err := d.Bolt.View(func(tx *bbolt.Tx) error {
|
|
serial := tx.Bucket([]byte(bucket)).Get([]byte(fmt.Sprintf("%v", id)))
|
|
return d.Serializer.Unmarshal(serial, val)
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Count returns the number of objects in a bucket
|
|
func (d *DB) Count(bucket string) (int, error) {
|
|
count := 0
|
|
if err := d.Bolt.View(func(tx *bbolt.Tx) error {
|
|
return tx.Bucket([]byte(bucket)).ForEach(func(_, _ []byte) error {
|
|
count++
|
|
return nil
|
|
})
|
|
}); err != nil {
|
|
return count, err
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
// EachFunc defines a way to interact with each object in a bucket while iterating
|
|
type EachFunc func(id, serial []byte) error
|
|
|
|
// ViewEach iterates over each object in a bucket in read-only mode
|
|
func (d *DB) ViewEach(bucket string, fn EachFunc) error {
|
|
return d.forEach(bucket, false, fn)
|
|
}
|
|
|
|
// UpdateEach iterates over each object in a bucket in write mode
|
|
func (d *DB) UpdateEach(bucket string, fn EachFunc) error {
|
|
return d.forEach(bucket, true, fn)
|
|
}
|
|
|
|
func (d *DB) forEach(bucket string, writable bool, fn EachFunc) error {
|
|
tx, err := d.Bolt.Begin(writable)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := tx.Bucket([]byte(bucket)).ForEach(fn); err != nil {
|
|
return err
|
|
}
|
|
|
|
if writable {
|
|
return tx.Commit()
|
|
}
|
|
return tx.Rollback()
|
|
}
|