Add Set and UniqueQueue implementations #1
6
.gitignore
vendored
6
.gitignore
vendored
@ -1,3 +1,7 @@
|
||||
queue/
|
||||
queue_pop/
|
||||
queue_push/
|
||||
queue_push/
|
||||
uniquequeue/
|
||||
uniquequeue_pop/
|
||||
uniquequeue_push/
|
||||
set/
|
||||
|
34
README.md
34
README.md
@ -25,4 +25,36 @@ data, err = queue.LPop()
|
||||
queue.LHandle(func(dt []byte) error{
|
||||
return nil
|
||||
})
|
||||
```
|
||||
```
|
||||
|
||||
You can now create a Set from a leveldb:
|
||||
|
||||
```Go
|
||||
set, err := levelqueue.OpenSet("./set")
|
||||
|
||||
added, err:= set.Add([]byte("member1"))
|
||||
|
||||
has, err := set.Has([]byte("member1"))
|
||||
|
||||
members, err := set.Members()
|
||||
|
||||
removed, err := set.Remove([]byte("member1"))
|
||||
```
|
||||
|
||||
And you can create a UniqueQueue from a leveldb:
|
||||
|
||||
```Go
|
||||
queue, err := levelqueue.OpenUnique("./queue")
|
||||
|
||||
err := queue.RPush([]byte("member1"))
|
||||
|
||||
err = queue.LPush([]byte("member1"))
|
||||
// Will return ErrAlreadyInQueue
|
||||
|
||||
// and so on.
|
||||
```
|
||||
|
||||
## Creating Queues, UniqueQueues and Sets from already open DB
|
||||
|
||||
If you have an already open DB you can create these from this using the
|
||||
`NewQueue`, `NewUniqueQueue` and `NewSet` functions.
|
4
error.go
4
error.go
@ -7,6 +7,8 @@ package levelqueue
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
// ErrNotFound means no element in queue
|
||||
// ErrNotFound means no elements in queue
|
||||
ErrNotFound = errors.New("no key found")
|
||||
|
||||
ErrAlreadyInQueue = errors.New("value already in queue")
|
||||
)
|
||||
|
97
queue.go
97
queue.go
@ -12,37 +12,62 @@ import (
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
)
|
||||
|
||||
const (
|
||||
lowKeyStr = "low"
|
||||
highKeyStr = "high"
|
||||
)
|
||||
|
||||
// Queue defines a queue struct
|
||||
type Queue struct {
|
||||
db *leveldb.DB
|
||||
highLock sync.Mutex
|
||||
lowLock sync.Mutex
|
||||
low int64
|
||||
high int64
|
||||
db *leveldb.DB
|
||||
highLock sync.Mutex
|
||||
lowLock sync.Mutex
|
||||
low int64
|
||||
high int64
|
||||
lowKey []byte
|
||||
highKey []byte
|
||||
prefix []byte
|
||||
closeUnderlyingDB bool
|
||||
}
|
||||
|
||||
// Open opens a queue object or create it if not exist
|
||||
// Open opens a queue from the db path or creates a
|
||||
// queue if it doesn't exist.
|
||||
// The keys will not be prefixed by default
|
||||
func Open(dataDir string) (*Queue, error) {
|
||||
db, err := leveldb.OpenFile(dataDir, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewQueue(db, []byte{}, true)
|
||||
}
|
||||
|
||||
// NewQueue creates a queue from a db. The keys will be prefixed with prefix
|
||||
// and at close the db will be closed as per closeUnderlyingDB
|
||||
func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) {
|
||||
var err error
|
||||
|
||||
var queue = &Queue{
|
||||
db: db,
|
||||
db: db,
|
||||
closeUnderlyingDB: closeUnderlyingDB,
|
||||
}
|
||||
queue.low, err = queue.readID(lowKey)
|
||||
|
||||
queue.prefix = make([]byte, len(prefix))
|
||||
copy(queue.prefix, prefix)
|
||||
queue.lowKey = withPrefix(prefix, []byte(lowKeyStr))
|
||||
queue.highKey = withPrefix(prefix, []byte(highKeyStr))
|
||||
|
||||
queue.low, err = queue.readID(queue.lowKey)
|
||||
if err == leveldb.ErrNotFound {
|
||||
queue.low = 1
|
||||
err = db.Put(lowKey, id2bytes(1), nil)
|
||||
err = db.Put(queue.lowKey, id2bytes(1), nil)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queue.high, err = queue.readID(highKey)
|
||||
queue.high, err = queue.readID(queue.highKey)
|
||||
if err == leveldb.ErrNotFound {
|
||||
err = db.Put(highKey, id2bytes(0), nil)
|
||||
err = db.Put(queue.highKey, id2bytes(0), nil)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -59,15 +84,10 @@ func (queue *Queue) readID(key []byte) (int64, error) {
|
||||
return bytes2id(bs)
|
||||
}
|
||||
|
||||
var (
|
||||
lowKey = []byte("low")
|
||||
highKey = []byte("high")
|
||||
)
|
||||
|
||||
func (queue *Queue) highincrement() (int64, error) {
|
||||
id := queue.high + 1
|
||||
queue.high = id
|
||||
err := queue.db.Put(highKey, id2bytes(queue.high), nil)
|
||||
err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
|
||||
if err != nil {
|
||||
queue.high = queue.high - 1
|
||||
return 0, err
|
||||
@ -77,7 +97,7 @@ func (queue *Queue) highincrement() (int64, error) {
|
||||
|
||||
func (queue *Queue) highdecrement() (int64, error) {
|
||||
queue.high = queue.high - 1
|
||||
err := queue.db.Put(highKey, id2bytes(queue.high), nil)
|
||||
err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
|
||||
if err != nil {
|
||||
queue.high = queue.high + 1
|
||||
return 0, err
|
||||
@ -87,7 +107,7 @@ func (queue *Queue) highdecrement() (int64, error) {
|
||||
|
||||
func (queue *Queue) lowincrement() (int64, error) {
|
||||
queue.low = queue.low + 1
|
||||
err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
|
||||
err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
|
||||
if err != nil {
|
||||
queue.low = queue.low - 1
|
||||
return 0, err
|
||||
@ -97,7 +117,7 @@ func (queue *Queue) lowincrement() (int64, error) {
|
||||
|
||||
func (queue *Queue) lowdecrement() (int64, error) {
|
||||
queue.low = queue.low - 1
|
||||
err := queue.db.Put(lowKey, id2bytes(queue.low), nil)
|
||||
err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
|
||||
if err != nil {
|
||||
queue.low = queue.low + 1
|
||||
return 0, err
|
||||
@ -125,6 +145,17 @@ func bytes2id(b []byte) (int64, error) {
|
||||
return binary.ReadVarint(bytes.NewReader(b))
|
||||
}
|
||||
|
||||
func withPrefix(prefix []byte, value []byte) []byte {
|
||||
if len(prefix) == 0 {
|
||||
return value
|
||||
}
|
||||
prefixed := make([]byte, len(prefix)+1+len(value))
|
||||
copy(prefixed[0:len(prefix)], prefix)
|
||||
prefixed[len(prefix)] = '-'
|
||||
copy(prefixed[len(prefix)+1:], value)
|
||||
return prefixed
|
||||
}
|
||||
|
||||
// RPush pushes a data from right of queue
|
||||
func (queue *Queue) RPush(data []byte) error {
|
||||
queue.highLock.Lock()
|
||||
@ -133,7 +164,7 @@ func (queue *Queue) RPush(data []byte) error {
|
||||
queue.highLock.Unlock()
|
||||
return err
|
||||
}
|
||||
err = queue.db.Put(id2bytes(id), data, nil)
|
||||
err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
|
||||
queue.highLock.Unlock()
|
||||
return err
|
||||
}
|
||||
@ -146,7 +177,7 @@ func (queue *Queue) LPush(data []byte) error {
|
||||
queue.lowLock.Unlock()
|
||||
return err
|
||||
}
|
||||
err = queue.db.Put(id2bytes(id), data, nil)
|
||||
err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
|
||||
queue.lowLock.Unlock()
|
||||
return err
|
||||
}
|
||||
@ -157,7 +188,7 @@ func (queue *Queue) RPop() ([]byte, error) {
|
||||
defer queue.highLock.Unlock()
|
||||
currentID := queue.high
|
||||
|
||||
res, err := queue.db.Get(id2bytes(currentID), nil)
|
||||
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
|
||||
if err != nil {
|
||||
if err == leveldb.ErrNotFound {
|
||||
return nil, ErrNotFound
|
||||
@ -170,7 +201,7 @@ func (queue *Queue) RPop() ([]byte, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = queue.db.Delete(id2bytes(currentID), nil)
|
||||
err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -183,7 +214,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error {
|
||||
defer queue.highLock.Unlock()
|
||||
currentID := queue.high
|
||||
|
||||
res, err := queue.db.Get(id2bytes(currentID), nil)
|
||||
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
|
||||
if err != nil {
|
||||
if err == leveldb.ErrNotFound {
|
||||
return ErrNotFound
|
||||
@ -200,7 +231,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return queue.db.Delete(id2bytes(currentID), nil)
|
||||
return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
|
||||
}
|
||||
|
||||
// LPop pop a data from left of queue
|
||||
@ -209,7 +240,7 @@ func (queue *Queue) LPop() ([]byte, error) {
|
||||
defer queue.lowLock.Unlock()
|
||||
currentID := queue.low
|
||||
|
||||
res, err := queue.db.Get(id2bytes(currentID), nil)
|
||||
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
|
||||
if err != nil {
|
||||
if err == leveldb.ErrNotFound {
|
||||
return nil, ErrNotFound
|
||||
@ -222,7 +253,7 @@ func (queue *Queue) LPop() ([]byte, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = queue.db.Delete(id2bytes(currentID), nil)
|
||||
err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -235,7 +266,7 @@ func (queue *Queue) LHandle(h func([]byte) error) error {
|
||||
defer queue.lowLock.Unlock()
|
||||
currentID := queue.low
|
||||
|
||||
res, err := queue.db.Get(id2bytes(currentID), nil)
|
||||
res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
|
||||
if err != nil {
|
||||
if err == leveldb.ErrNotFound {
|
||||
return ErrNotFound
|
||||
@ -252,11 +283,15 @@ func (queue *Queue) LHandle(h func([]byte) error) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return queue.db.Delete(id2bytes(currentID), nil)
|
||||
return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
|
||||
}
|
||||
|
||||
// Close closes the queue
|
||||
// Close closes the queue (and the underlying db is set to closeUnderlyingDB)
|
||||
func (queue *Queue) Close() error {
|
||||
if !queue.closeUnderlyingDB {
|
||||
queue.db = nil
|
||||
return nil
|
||||
}
|
||||
err := queue.db.Close()
|
||||
queue.db = nil
|
||||
return err
|
||||
|
110
set.go
Normal file
110
set.go
Normal file
@ -0,0 +1,110 @@
|
||||
// Copyright 2020 Andrew Thornton. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package levelqueue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
)
|
||||
|
||||
const (
|
||||
setPrefixStr = "set"
|
||||
)
|
||||
|
||||
// Set defines a set struct
|
||||
type Set struct {
|
||||
db *leveldb.DB
|
||||
closeUnderlyingDB bool
|
||||
lock sync.Mutex
|
||||
prefix []byte
|
||||
}
|
||||
|
||||
// OpenSet opens a set from the db path or creates a set if it doesn't exist.
|
||||
// The keys will be prefixed with "set-" by default
|
||||
func OpenSet(dataDir string) (*Set, error) {
|
||||
db, err := leveldb.OpenFile(dataDir, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewSet(db, []byte(setPrefixStr), true)
|
||||
}
|
||||
|
||||
// NewSet creates a set from a db. The keys will be prefixed with prefix
|
||||
// and at close the db will be closed as per closeUnderlyingDB
|
||||
func NewSet(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Set, error) {
|
||||
set := &Set{
|
||||
db: db,
|
||||
closeUnderlyingDB: closeUnderlyingDB,
|
||||
}
|
||||
set.prefix = make([]byte, len(prefix))
|
||||
copy(set.prefix, prefix)
|
||||
|
||||
return set, nil
|
||||
}
|
||||
|
||||
// Add adds a member string to a key set, returns true if the member was not already present
|
||||
func (set *Set) Add(value []byte) (bool, error) {
|
||||
set.lock.Lock()
|
||||
defer set.lock.Unlock()
|
||||
setKey := withPrefix(set.prefix, value)
|
||||
has, err := set.db.Has(setKey, nil)
|
||||
if err != nil || has {
|
||||
return !has, err
|
||||
}
|
||||
return !has, set.db.Put(setKey, []byte(""), nil)
|
||||
}
|
||||
|
||||
// Members returns the current members of the set
|
||||
func (set *Set) Members() ([][]byte, error) {
|
||||
set.lock.Lock()
|
||||
defer set.lock.Unlock()
|
||||
var members [][]byte
|
||||
prefix := withPrefix(set.prefix, []byte{})
|
||||
iter := set.db.NewIterator(util.BytesPrefix(prefix), nil)
|
||||
for iter.Next() {
|
||||
slice := iter.Key()[len(prefix):]
|
||||
value := make([]byte, len(slice))
|
||||
copy(value, slice)
|
||||
members = append(members, value)
|
||||
}
|
||||
iter.Release()
|
||||
return members, iter.Error()
|
||||
}
|
||||
|
||||
// Has returns if the member is in the set
|
||||
func (set *Set) Has(value []byte) (bool, error) {
|
||||
set.lock.Lock()
|
||||
defer set.lock.Unlock()
|
||||
setKey := withPrefix(set.prefix, value)
|
||||
|
||||
return set.db.Has(setKey, nil)
|
||||
}
|
||||
|
||||
// Remove removes a member from the set, returns true if the member was present
|
||||
func (set *Set) Remove(value []byte) (bool, error) {
|
||||
set.lock.Lock()
|
||||
defer set.lock.Unlock()
|
||||
setKey := withPrefix(set.prefix, value)
|
||||
|
||||
has, err := set.db.Has(setKey, nil)
|
||||
if err != nil || !has {
|
||||
return has, err
|
||||
}
|
||||
|
||||
return has, set.db.Delete(setKey, nil)
|
||||
}
|
||||
|
||||
// Close closes the set (and the underlying db if set to closeUnderlyingDB)
|
||||
func (set *Set) Close() error {
|
||||
if !set.closeUnderlyingDB {
|
||||
set.db = nil
|
||||
return nil
|
||||
}
|
||||
err := set.db.Close()
|
||||
set.db = nil
|
||||
return err
|
||||
}
|
71
set_test.go
Normal file
71
set_test.go
Normal file
@ -0,0 +1,71 @@
|
||||
// Copyright 2020 Andrew Thornton. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
package levelqueue
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestSet(t *testing.T) {
|
||||
const dbDir = "./set"
|
||||
os.RemoveAll(dbDir)
|
||||
|
||||
set, err := OpenSet(dbDir)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// SAdd
|
||||
added, err := set.Add([]byte("test1"))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, true, added)
|
||||
|
||||
added, err = set.Add([]byte("test2"))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, true, added)
|
||||
|
||||
added, err = set.Add([]byte("test1"))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, false, added)
|
||||
|
||||
added, err = set.Add([]byte("test3"))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, true, added)
|
||||
|
||||
added, err = set.Add([]byte("test5"))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, true, added)
|
||||
|
||||
// SRem
|
||||
removed, err := set.Remove([]byte("test1"))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, true, removed)
|
||||
|
||||
removed, err = set.Remove([]byte("test4"))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, false, removed)
|
||||
|
||||
removed, err = set.Remove([]byte("test3"))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, true, removed)
|
||||
|
||||
// SIsMember
|
||||
isMember, err := set.Has([]byte("test2"))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, true, isMember)
|
||||
|
||||
isMember, err = set.Has([]byte("test1"))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, false, isMember)
|
||||
|
||||
// SMembers
|
||||
members, err := set.Members()
|
||||
assert.NoError(t, err)
|
||||
assert.Contains(t, members, []byte("test2"))
|
||||
assert.Contains(t, members, []byte("test5"))
|
||||
assert.NotContains(t, members, []byte("test1"))
|
||||
assert.NotContains(t, members, []byte("test3"))
|
||||
assert.NotContains(t, members, []byte("test4"))
|
||||
}
|
184
uniquequeue.go
Normal file
184
uniquequeue.go
Normal file
@ -0,0 +1,184 @@
|
||||
// Copyright 2020 Andrew Thornton. All rights reserved.
|
||||
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package levelqueue
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
)
|
||||
|
||||
const (
|
||||
uniqueQueuePrefixStr = "unique"
|
||||
)
|
||||
|
||||
// UniqueQueue defines an unique queue struct
|
||||
type UniqueQueue struct {
|
||||
q *Queue
|
||||
set *Set
|
||||
db *leveldb.DB
|
||||
closeUnderlyingDB bool
|
||||
}
|
||||
|
||||
// OpenUnique opens an unique queue from the db path or creates a set if it doesn't exist.
|
||||
// The keys in the queue portion will not be prefixed, and the set keys will be prefixed with "set-"
|
||||
func OpenUnique(dataDir string) (*UniqueQueue, error) {
|
||||
db, err := leveldb.OpenFile(dataDir, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewUniqueQueue(db, []byte{}, []byte(uniqueQueuePrefixStr), true)
|
||||
}
|
||||
|
||||
// NewUniqueQueue creates a new unique queue from a db.
|
||||
// The queue keys will be prefixed with queuePrefix and the set keys with setPrefix
|
||||
// and at close the db will be closed as per closeUnderlyingDB
|
||||
func NewUniqueQueue(db *leveldb.DB, queuePrefix []byte, setPrefix []byte, closeUnderlyingDB bool) (*UniqueQueue, error) {
|
||||
internal, err := NewQueue(db, queuePrefix, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
set, err := NewSet(db, setPrefix, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
queue := &UniqueQueue{
|
||||
q: internal,
|
||||
set: set,
|
||||
db: db,
|
||||
closeUnderlyingDB: closeUnderlyingDB,
|
||||
}
|
||||
|
||||
return queue, err
|
||||
}
|
||||
|
||||
// LPush pushes data to the left of the queue
|
||||
func (queue *UniqueQueue) LPush(data []byte) error {
|
||||
return queue.LPushFunc(data, nil)
|
||||
}
|
||||
|
||||
// LPushFunc pushes data to the left of the queue and calls the callback if it is added
|
||||
func (queue *UniqueQueue) LPushFunc(data []byte, fn func() error) error {
|
||||
added, err := queue.set.Add(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !added {
|
||||
return ErrAlreadyInQueue
|
||||
}
|
||||
|
||||
if fn != nil {
|
||||
err = fn()
|
||||
if err != nil {
|
||||
_, remErr := queue.set.Remove(data)
|
||||
if remErr != nil {
|
||||
return fmt.Errorf("%v & %v", err, remErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return queue.q.LPush(data)
|
||||
}
|
||||
|
||||
// RPush pushes data to the right of the queue
|
||||
func (queue *UniqueQueue) RPush(data []byte) error {
|
||||
return queue.RPushFunc(data, nil)
|
||||
}
|
||||
|
||||
// RPushFunc pushes data to the right of the queue and calls the callback if is added
|
||||
func (queue *UniqueQueue) RPushFunc(data []byte, fn func() error) error {
|
||||
added, err := queue.set.Add(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !added {
|
||||
return ErrAlreadyInQueue
|
||||
}
|
||||
|
||||
if fn != nil {
|
||||
err = fn()
|
||||
if err != nil {
|
||||
_, remErr := queue.set.Remove(data)
|
||||
if remErr != nil {
|
||||
return fmt.Errorf("%v & %v", err, remErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return queue.q.RPush(data)
|
||||
}
|
||||
|
||||
// RPop pop data from the right of the queue
|
||||
func (queue *UniqueQueue) RPop() ([]byte, error) {
|
||||
popped, err := queue.q.RPop()
|
||||
if err != nil {
|
||||
return popped, err
|
||||
}
|
||||
_, err = queue.set.Remove(popped)
|
||||
|
||||
return popped, err
|
||||
}
|
||||
|
||||
// RHandle receives a user callback function to handle the right element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
|
||||
func (queue *UniqueQueue) RHandle(h func([]byte) error) error {
|
||||
return queue.q.RHandle(func(data []byte) error {
|
||||
err := h(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = queue.set.Remove(data)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// LPop pops data from left of the queue
|
||||
func (queue *UniqueQueue) LPop() ([]byte, error) {
|
||||
popped, err := queue.q.LPop()
|
||||
if err != nil {
|
||||
return popped, err
|
||||
}
|
||||
_, err = queue.set.Remove(popped)
|
||||
|
||||
return popped, err
|
||||
}
|
||||
|
||||
// LHandle receives a user callback function to handle the left element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
|
||||
func (queue *UniqueQueue) LHandle(h func([]byte) error) error {
|
||||
return queue.q.LHandle(func(data []byte) error {
|
||||
err := h(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = queue.set.Remove(data)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// Has checks whether the data is already in the queue
|
||||
func (queue *UniqueQueue) Has(data []byte) (bool, error) {
|
||||
return queue.set.Has(data)
|
||||
}
|
||||
|
||||
// Len returns the length of the queue
|
||||
func (queue *UniqueQueue) Len() int64 {
|
||||
queue.set.lock.Lock()
|
||||
defer queue.set.lock.Unlock()
|
||||
return queue.q.Len()
|
||||
}
|
||||
|
||||
// Close closes the queue (and the underlying DB if set to closeUnderlyingDB)
|
||||
func (queue *UniqueQueue) Close() error {
|
||||
_ = queue.q.Close()
|
||||
_ = queue.set.Close()
|
||||
if !queue.closeUnderlyingDB {
|
||||
queue.db = nil
|
||||
return nil
|
||||
}
|
||||
err := queue.db.Close()
|
||||
queue.db = nil
|
||||
return err
|
||||
}
|
294
uniquequeue_test.go
Normal file
294
uniquequeue_test.go
Normal file
@ -0,0 +1,294 @@
|
||||
// Copyright 2020 Andrew Thornton. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package levelqueue
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestUniqueQueue(t *testing.T) {
|
||||
const dbDir = "./uniquequeue"
|
||||
os.RemoveAll(dbDir)
|
||||
|
||||
queue, err := OpenUnique(dbDir)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = queue.RPush([]byte("test"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.EqualValues(t, 1, queue.Len())
|
||||
|
||||
err = queue.RPush([]byte("test"))
|
||||
assert.Error(t, err)
|
||||
|
||||
assert.EqualValues(t, 1, queue.Len())
|
||||
|
||||
err = queue.LPush([]byte("test"))
|
||||
assert.Error(t, err)
|
||||
|
||||
assert.EqualValues(t, 1, queue.Len())
|
||||
|
||||
data, err := queue.LPop()
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, "test", string(data))
|
||||
|
||||
// should be empty
|
||||
data, err = queue.LPop()
|
||||
assert.Error(t, err)
|
||||
assert.EqualValues(t, []byte(nil), data)
|
||||
assert.EqualValues(t, ErrNotFound, err)
|
||||
|
||||
assert.EqualValues(t, 0, queue.Len())
|
||||
|
||||
err = queue.LPush([]byte("test2"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = queue.LPush([]byte("test2"))
|
||||
assert.Error(t, err)
|
||||
|
||||
assert.EqualValues(t, 1, queue.Len())
|
||||
|
||||
data, err = queue.LPop()
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, "test2", string(data))
|
||||
|
||||
assert.EqualValues(t, 0, queue.Len())
|
||||
|
||||
data, err = queue.LPop()
|
||||
assert.Error(t, err)
|
||||
assert.EqualValues(t, []byte(nil), data)
|
||||
assert.EqualValues(t, ErrNotFound, err)
|
||||
|
||||
data, err = queue.RPop()
|
||||
assert.Error(t, err)
|
||||
assert.EqualValues(t, []byte(nil), data)
|
||||
assert.EqualValues(t, ErrNotFound, err)
|
||||
|
||||
err = queue.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
queue, err = OpenUnique(dbDir)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = queue.RPush([]byte("test3"))
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, 1, queue.Len())
|
||||
|
||||
has, err := queue.Has([]byte("test3"))
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, has)
|
||||
|
||||
has, err = queue.Has([]byte("test4"))
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, has)
|
||||
|
||||
err = queue.LPush([]byte("test4"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.EqualValues(t, 2, queue.Len())
|
||||
|
||||
data, err = queue.RPop()
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, "test3", string(data))
|
||||
|
||||
data, err = queue.RPop()
|
||||
assert.NoError(t, err)
|
||||
assert.EqualValues(t, "test4", string(data))
|
||||
|
||||
has, err = queue.Has([]byte("test3"))
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, has)
|
||||
|
||||
has, err = queue.Has([]byte("test4"))
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, has)
|
||||
}
|
||||
|
||||
func TestGoroutines_uniquequeue(t *testing.T) {
|
||||
const dbDir = "./uniquequeue"
|
||||
|
||||
os.RemoveAll(dbDir)
|
||||
queue, err := OpenUnique(dbDir)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
var w sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
w.Add(1)
|
||||
go func(i int) {
|
||||
if i%2 == 0 {
|
||||
err := queue.RPush(append([]byte("concurrent"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
_, err := queue.RPop()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
w.Done()
|
||||
}(i)
|
||||
}
|
||||
w.Wait()
|
||||
}
|
||||
|
||||
func TestGoroutines2_uniquequeue(t *testing.T) {
|
||||
const dbDir = "./uniquequeue"
|
||||
|
||||
os.RemoveAll(dbDir)
|
||||
queue, err := OpenUnique(dbDir)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
var w sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
w.Add(1)
|
||||
go func(i int) {
|
||||
if i%2 == 0 {
|
||||
err := queue.LPush(append([]byte("conc"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
_, err := queue.RPop()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
w.Done()
|
||||
}(i)
|
||||
}
|
||||
w.Wait()
|
||||
}
|
||||
|
||||
func TestGoroutines3_uniquequeue(t *testing.T) {
|
||||
const dbDir = "./uniquequeue"
|
||||
|
||||
os.RemoveAll(dbDir)
|
||||
queue, err := OpenUnique(dbDir)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
var w sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
w.Add(1)
|
||||
go func(i int) {
|
||||
if i%2 == 0 {
|
||||
err := queue.LPush(append([]byte("conc"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
_, err := queue.LPop()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
w.Done()
|
||||
}(i)
|
||||
}
|
||||
w.Wait()
|
||||
}
|
||||
|
||||
func TestHandle_uniquequeue(t *testing.T) {
|
||||
const dbDir = "./uniquequeue"
|
||||
|
||||
os.RemoveAll(dbDir)
|
||||
queue, err := OpenUnique(dbDir)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
var w sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
w.Add(1)
|
||||
go func(i int) {
|
||||
if i%2 == 0 {
|
||||
err := queue.RPush(append([]byte("conc"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
err := queue.RHandle(func(data []byte) error {
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
w.Done()
|
||||
}(i)
|
||||
}
|
||||
w.Wait()
|
||||
}
|
||||
|
||||
func TestHandle2_uniquequeue(t *testing.T) {
|
||||
const dbDir = "./uniquequeue"
|
||||
|
||||
os.RemoveAll(dbDir)
|
||||
queue, err := OpenUnique(dbDir)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
assert.EqualValues(t, 10, queue.Len())
|
||||
|
||||
var w sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
w.Add(1)
|
||||
go func(i int) {
|
||||
if i%2 == 0 {
|
||||
err := queue.LPush(append([]byte("conc"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(t, err)
|
||||
} else {
|
||||
err := queue.LHandle(func(data []byte) error {
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
w.Done()
|
||||
}(i)
|
||||
}
|
||||
w.Wait()
|
||||
}
|
||||
|
||||
func BenchmarkPush_uniquequeue(b *testing.B) {
|
||||
const dbDir = "./uniquequeue_push"
|
||||
|
||||
os.RemoveAll(dbDir)
|
||||
queue, err := OpenUnique(dbDir)
|
||||
assert.NoError(b, err)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
err = queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(b, err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPop_uniquequeue(b *testing.B) {
|
||||
const dbDir = "./uniquequeue_pop"
|
||||
|
||||
os.RemoveAll(dbDir)
|
||||
queue, err := Open(dbDir)
|
||||
assert.NoError(b, err)
|
||||
b.StopTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err = queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
||||
assert.NoError(b, err)
|
||||
}
|
||||
b.StartTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err = queue.RPop()
|
||||
assert.NoError(b, err)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user
How about rename it to
unique_queue.go
?