You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
97 lines
2.3 KiB
97 lines
2.3 KiB
package shock |
|
|
|
import ( |
|
"fmt" |
|
|
|
"go.etcd.io/bbolt" |
|
) |
|
|
|
// Sequencer defines a way to assign a unique sequence ID to an object |
|
type Sequencer interface { |
|
AssignSeq(uint64) |
|
} |
|
|
|
// Put adds a new value to a bucket |
|
func (d *DB) Put(bucket string, val Sequencer) error { |
|
return d.Bolt.Update(func(tx *bbolt.Tx) error { |
|
b := tx.Bucket([]byte(bucket)) |
|
|
|
seq, err := b.NextSequence() |
|
if err != nil { |
|
return err |
|
} |
|
val.AssignSeq(seq) |
|
|
|
serial, err := d.Serializer.Marshal(val) |
|
if err != nil { |
|
return err |
|
} |
|
return b.Put([]byte(fmt.Sprintf("%d", seq)), serial) |
|
}) |
|
} |
|
|
|
// PutWithKey adds a new value to the bucket with a defined key |
|
func (d *DB) PutWithKey(bucket string, key, val interface{}) error { |
|
return d.Bolt.Update(func(tx *bbolt.Tx) error { |
|
b := tx.Bucket([]byte(bucket)) |
|
|
|
serial, err := d.Serializer.Marshal(val) |
|
if err != nil { |
|
return err |
|
} |
|
return b.Put([]byte(fmt.Sprintf("%v", key)), serial) |
|
}) |
|
} |
|
|
|
// Get returns a value from a bucket with the specified sequence ID |
|
func (d *DB) Get(bucket string, id, val interface{}) error { |
|
if err := d.Bolt.View(func(tx *bbolt.Tx) error { |
|
serial := tx.Bucket([]byte(bucket)).Get([]byte(fmt.Sprintf("%v", id))) |
|
return d.Serializer.Unmarshal(serial, val) |
|
}); err != nil { |
|
return err |
|
} |
|
return nil |
|
} |
|
|
|
// Count returns the number of objects in a bucket |
|
func (d *DB) Count(bucket string) (int, error) { |
|
count := 0 |
|
if err := d.Bolt.View(func(tx *bbolt.Tx) error { |
|
return tx.Bucket([]byte(bucket)).ForEach(func(_, _ []byte) error { |
|
count++ |
|
return nil |
|
}) |
|
}); err != nil { |
|
return count, err |
|
} |
|
return count, nil |
|
} |
|
|
|
// EachFunc defines a way to interact with each object in a bucket while iterating |
|
type EachFunc func(id, serial []byte) error |
|
|
|
// ViewEach iterates over each object in a bucket in read-only mode |
|
func (d *DB) ViewEach(bucket string, fn EachFunc) error { |
|
return d.forEach(bucket, false, fn) |
|
} |
|
|
|
// UpdateEach iterates over each object in a bucket in write mode |
|
func (d *DB) UpdateEach(bucket string, fn EachFunc) error { |
|
return d.forEach(bucket, true, fn) |
|
} |
|
|
|
func (d *DB) forEach(bucket string, writable bool, fn EachFunc) error { |
|
tx, err := d.Bolt.Begin(writable) |
|
if err != nil { |
|
return err |
|
} |
|
if err := tx.Bucket([]byte(bucket)).ForEach(fn); err != nil { |
|
return err |
|
} |
|
|
|
if writable { |
|
return tx.Commit() |
|
} |
|
return tx.Rollback() |
|
}
|
|
|