shock/store.go
jolheiser de4e822f7d
Refactor to using Init funcs and add gob serializer
Signed-off-by: jolheiser <john.olheiser@gmail.com>
2020-08-03 16:11:20 -05:00

98 lines
2.3 KiB
Go

package shock
import (
"fmt"
"go.etcd.io/bbolt"
)
// Sequencer defines a way to assign a unique sequence ID to an object
type Sequencer interface {
AssignSeq(uint64)
}
// Put adds a new value to a bucket
func (d *DB) Put(bucket string, val Sequencer) error {
return d.Bolt.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucket))
seq, err := b.NextSequence()
if err != nil {
return err
}
val.AssignSeq(seq)
serial, err := d.Serializer.Marshal(val)
if err != nil {
return err
}
return b.Put([]byte(fmt.Sprintf("%d", seq)), serial)
})
}
// PutWithKey adds a new value to the bucket with a defined key
func (d *DB) PutWithKey(bucket string, key, val interface{}) error {
return d.Bolt.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucket))
serial, err := d.Serializer.Marshal(val)
if err != nil {
return err
}
return b.Put([]byte(fmt.Sprintf("%v", key)), serial)
})
}
// Get returns a value from a bucket with the specified sequence ID
func (d *DB) Get(bucket string, id, val interface{}) error {
if err := d.Bolt.View(func(tx *bbolt.Tx) error {
serial := tx.Bucket([]byte(bucket)).Get([]byte(fmt.Sprintf("%v", id)))
return d.Serializer.Unmarshal(serial, val)
}); err != nil {
return err
}
return nil
}
// Count returns the number of objects in a bucket
func (d *DB) Count(bucket string) (int, error) {
count := 0
if err := d.Bolt.View(func(tx *bbolt.Tx) error {
return tx.Bucket([]byte(bucket)).ForEach(func(_, _ []byte) error {
count++
return nil
})
}); err != nil {
return count, err
}
return count, nil
}
// EachFunc defines a way to interact with each object in a bucket while iterating
type EachFunc func(id, serial []byte) error
// ViewEach iterates over each object in a bucket in read-only mode
func (d *DB) ViewEach(bucket string, fn EachFunc) error {
return d.forEach(bucket, false, fn)
}
// UpdateEach iterates over each object in a bucket in write mode
func (d *DB) UpdateEach(bucket string, fn EachFunc) error {
return d.forEach(bucket, true, fn)
}
func (d *DB) forEach(bucket string, writable bool, fn EachFunc) error {
tx, err := d.Bolt.Begin(writable)
if err != nil {
return err
}
if err := tx.Bucket([]byte(bucket)).ForEach(fn); err != nil {
return err
}
if writable {
return tx.Commit()
}
return tx.Rollback()
}