diff --git a/.gitignore b/.gitignore index 59a8bde..ab1fe76 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ queue/ queue_pop/ -queue_push/ \ No newline at end of file +queue_push/ +uniquequeue/ +uniquequeue_pop/ +uniquequeue_push/ +set/ diff --git a/README.md b/README.md index 80a0853..21db280 100644 --- a/README.md +++ b/README.md @@ -25,4 +25,36 @@ data, err = queue.LPop() queue.LHandle(func(dt []byte) error{ return nil }) -``` \ No newline at end of file +``` + +You can now create a Set from a leveldb: + +```Go +set, err := levelqueue.OpenSet("./set") + +added, err:= set.Add([]byte("member1")) + +has, err := set.Has([]byte("member1")) + +members, err := set.Members() + +removed, err := set.Remove([]byte("member1")) +``` + +And you can create a UniqueQueue from a leveldb: + +```Go +queue, err := levelqueue.OpenUnique("./queue") + +err := queue.RPush([]byte("member1")) + +err = queue.LPush([]byte("member1")) +// Will return ErrAlreadyInQueue + +// and so on. +``` + +## Creating Queues, UniqueQueues and Sets from already open DB + +If you have an already open DB you can create these from this using the +`NewQueue`, `NewUniqueQueue` and `NewSet` functions. \ No newline at end of file diff --git a/error.go b/error.go index d639c5d..648c185 100644 --- a/error.go +++ b/error.go @@ -7,6 +7,8 @@ package levelqueue import "errors" var ( - // ErrNotFound means no element in queue + // ErrNotFound means no elements in queue ErrNotFound = errors.New("no key found") + + ErrAlreadyInQueue = errors.New("value already in queue") ) diff --git a/queue.go b/queue.go index af624db..20ed901 100644 --- a/queue.go +++ b/queue.go @@ -12,37 +12,62 @@ import ( "github.com/syndtr/goleveldb/leveldb" ) +const ( + lowKeyStr = "low" + highKeyStr = "high" +) + // Queue defines a queue struct type Queue struct { - db *leveldb.DB - highLock sync.Mutex - lowLock sync.Mutex - low int64 - high int64 + db *leveldb.DB + highLock sync.Mutex + lowLock sync.Mutex + low int64 + high int64 + lowKey []byte + highKey []byte + prefix []byte + closeUnderlyingDB bool } -// Open opens a queue object or create it if not exist +// Open opens a queue from the db path or creates a +// queue if it doesn't exist. +// The keys will not be prefixed by default func Open(dataDir string) (*Queue, error) { db, err := leveldb.OpenFile(dataDir, nil) if err != nil { return nil, err } + return NewQueue(db, []byte{}, true) +} + +// NewQueue creates a queue from a db. The keys will be prefixed with prefix +// and at close the db will be closed as per closeUnderlyingDB +func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) { + var err error var queue = &Queue{ - db: db, + db: db, + closeUnderlyingDB: closeUnderlyingDB, } - queue.low, err = queue.readID(lowKey) + + queue.prefix = make([]byte, len(prefix)) + copy(queue.prefix, prefix) + queue.lowKey = withPrefix(prefix, []byte(lowKeyStr)) + queue.highKey = withPrefix(prefix, []byte(highKeyStr)) + + queue.low, err = queue.readID(queue.lowKey) if err == leveldb.ErrNotFound { queue.low = 1 - err = db.Put(lowKey, id2bytes(1), nil) + err = db.Put(queue.lowKey, id2bytes(1), nil) } if err != nil { return nil, err } - queue.high, err = queue.readID(highKey) + queue.high, err = queue.readID(queue.highKey) if err == leveldb.ErrNotFound { - err = db.Put(highKey, id2bytes(0), nil) + err = db.Put(queue.highKey, id2bytes(0), nil) } if err != nil { return nil, err @@ -59,15 +84,10 @@ func (queue *Queue) readID(key []byte) (int64, error) { return bytes2id(bs) } -var ( - lowKey = []byte("low") - highKey = []byte("high") -) - func (queue *Queue) highincrement() (int64, error) { id := queue.high + 1 queue.high = id - err := queue.db.Put(highKey, id2bytes(queue.high), nil) + err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil) if err != nil { queue.high = queue.high - 1 return 0, err @@ -77,7 +97,7 @@ func (queue *Queue) highincrement() (int64, error) { func (queue *Queue) highdecrement() (int64, error) { queue.high = queue.high - 1 - err := queue.db.Put(highKey, id2bytes(queue.high), nil) + err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil) if err != nil { queue.high = queue.high + 1 return 0, err @@ -87,7 +107,7 @@ func (queue *Queue) highdecrement() (int64, error) { func (queue *Queue) lowincrement() (int64, error) { queue.low = queue.low + 1 - err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil) if err != nil { queue.low = queue.low - 1 return 0, err @@ -97,7 +117,7 @@ func (queue *Queue) lowincrement() (int64, error) { func (queue *Queue) lowdecrement() (int64, error) { queue.low = queue.low - 1 - err := queue.db.Put(lowKey, id2bytes(queue.low), nil) + err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil) if err != nil { queue.low = queue.low + 1 return 0, err @@ -125,6 +145,17 @@ func bytes2id(b []byte) (int64, error) { return binary.ReadVarint(bytes.NewReader(b)) } +func withPrefix(prefix []byte, value []byte) []byte { + if len(prefix) == 0 { + return value + } + prefixed := make([]byte, len(prefix)+1+len(value)) + copy(prefixed[0:len(prefix)], prefix) + prefixed[len(prefix)] = '-' + copy(prefixed[len(prefix)+1:], value) + return prefixed +} + // RPush pushes a data from right of queue func (queue *Queue) RPush(data []byte) error { queue.highLock.Lock() @@ -133,7 +164,7 @@ func (queue *Queue) RPush(data []byte) error { queue.highLock.Unlock() return err } - err = queue.db.Put(id2bytes(id), data, nil) + err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil) queue.highLock.Unlock() return err } @@ -146,7 +177,7 @@ func (queue *Queue) LPush(data []byte) error { queue.lowLock.Unlock() return err } - err = queue.db.Put(id2bytes(id), data, nil) + err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil) queue.lowLock.Unlock() return err } @@ -157,7 +188,7 @@ func (queue *Queue) RPop() ([]byte, error) { defer queue.highLock.Unlock() currentID := queue.high - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return nil, ErrNotFound @@ -170,7 +201,7 @@ func (queue *Queue) RPop() ([]byte, error) { return nil, err } - err = queue.db.Delete(id2bytes(currentID), nil) + err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { return nil, err } @@ -183,7 +214,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error { defer queue.highLock.Unlock() currentID := queue.high - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return ErrNotFound @@ -200,7 +231,7 @@ func (queue *Queue) RHandle(h func([]byte) error) error { return err } - return queue.db.Delete(id2bytes(currentID), nil) + return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) } // LPop pop a data from left of queue @@ -209,7 +240,7 @@ func (queue *Queue) LPop() ([]byte, error) { defer queue.lowLock.Unlock() currentID := queue.low - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return nil, ErrNotFound @@ -222,7 +253,7 @@ func (queue *Queue) LPop() ([]byte, error) { return nil, err } - err = queue.db.Delete(id2bytes(currentID), nil) + err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { return nil, err } @@ -235,7 +266,7 @@ func (queue *Queue) LHandle(h func([]byte) error) error { defer queue.lowLock.Unlock() currentID := queue.low - res, err := queue.db.Get(id2bytes(currentID), nil) + res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil) if err != nil { if err == leveldb.ErrNotFound { return ErrNotFound @@ -252,11 +283,15 @@ func (queue *Queue) LHandle(h func([]byte) error) error { return err } - return queue.db.Delete(id2bytes(currentID), nil) + return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil) } -// Close closes the queue +// Close closes the queue (and the underlying db is set to closeUnderlyingDB) func (queue *Queue) Close() error { + if !queue.closeUnderlyingDB { + queue.db = nil + return nil + } err := queue.db.Close() queue.db = nil return err diff --git a/set.go b/set.go new file mode 100644 index 0000000..88f4e9b --- /dev/null +++ b/set.go @@ -0,0 +1,110 @@ +// Copyright 2020 Andrew Thornton. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import ( + "sync" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/util" +) + +const ( + setPrefixStr = "set" +) + +// Set defines a set struct +type Set struct { + db *leveldb.DB + closeUnderlyingDB bool + lock sync.Mutex + prefix []byte +} + +// OpenSet opens a set from the db path or creates a set if it doesn't exist. +// The keys will be prefixed with "set-" by default +func OpenSet(dataDir string) (*Set, error) { + db, err := leveldb.OpenFile(dataDir, nil) + if err != nil { + return nil, err + } + return NewSet(db, []byte(setPrefixStr), true) +} + +// NewSet creates a set from a db. The keys will be prefixed with prefix +// and at close the db will be closed as per closeUnderlyingDB +func NewSet(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Set, error) { + set := &Set{ + db: db, + closeUnderlyingDB: closeUnderlyingDB, + } + set.prefix = make([]byte, len(prefix)) + copy(set.prefix, prefix) + + return set, nil +} + +// Add adds a member string to a key set, returns true if the member was not already present +func (set *Set) Add(value []byte) (bool, error) { + set.lock.Lock() + defer set.lock.Unlock() + setKey := withPrefix(set.prefix, value) + has, err := set.db.Has(setKey, nil) + if err != nil || has { + return !has, err + } + return !has, set.db.Put(setKey, []byte(""), nil) +} + +// Members returns the current members of the set +func (set *Set) Members() ([][]byte, error) { + set.lock.Lock() + defer set.lock.Unlock() + var members [][]byte + prefix := withPrefix(set.prefix, []byte{}) + iter := set.db.NewIterator(util.BytesPrefix(prefix), nil) + for iter.Next() { + slice := iter.Key()[len(prefix):] + value := make([]byte, len(slice)) + copy(value, slice) + members = append(members, value) + } + iter.Release() + return members, iter.Error() +} + +// Has returns if the member is in the set +func (set *Set) Has(value []byte) (bool, error) { + set.lock.Lock() + defer set.lock.Unlock() + setKey := withPrefix(set.prefix, value) + + return set.db.Has(setKey, nil) +} + +// Remove removes a member from the set, returns true if the member was present +func (set *Set) Remove(value []byte) (bool, error) { + set.lock.Lock() + defer set.lock.Unlock() + setKey := withPrefix(set.prefix, value) + + has, err := set.db.Has(setKey, nil) + if err != nil || !has { + return has, err + } + + return has, set.db.Delete(setKey, nil) +} + +// Close closes the set (and the underlying db if set to closeUnderlyingDB) +func (set *Set) Close() error { + if !set.closeUnderlyingDB { + set.db = nil + return nil + } + err := set.db.Close() + set.db = nil + return err +} diff --git a/set_test.go b/set_test.go new file mode 100644 index 0000000..66f9289 --- /dev/null +++ b/set_test.go @@ -0,0 +1,71 @@ +// Copyright 2020 Andrew Thornton. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package levelqueue + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSet(t *testing.T) { + const dbDir = "./set" + os.RemoveAll(dbDir) + + set, err := OpenSet(dbDir) + assert.NoError(t, err) + + // SAdd + added, err := set.Add([]byte("test1")) + assert.NoError(t, err) + assert.EqualValues(t, true, added) + + added, err = set.Add([]byte("test2")) + assert.NoError(t, err) + assert.EqualValues(t, true, added) + + added, err = set.Add([]byte("test1")) + assert.NoError(t, err) + assert.EqualValues(t, false, added) + + added, err = set.Add([]byte("test3")) + assert.NoError(t, err) + assert.EqualValues(t, true, added) + + added, err = set.Add([]byte("test5")) + assert.NoError(t, err) + assert.EqualValues(t, true, added) + + // SRem + removed, err := set.Remove([]byte("test1")) + assert.NoError(t, err) + assert.EqualValues(t, true, removed) + + removed, err = set.Remove([]byte("test4")) + assert.NoError(t, err) + assert.EqualValues(t, false, removed) + + removed, err = set.Remove([]byte("test3")) + assert.NoError(t, err) + assert.EqualValues(t, true, removed) + + // SIsMember + isMember, err := set.Has([]byte("test2")) + assert.NoError(t, err) + assert.EqualValues(t, true, isMember) + + isMember, err = set.Has([]byte("test1")) + assert.NoError(t, err) + assert.EqualValues(t, false, isMember) + + // SMembers + members, err := set.Members() + assert.NoError(t, err) + assert.Contains(t, members, []byte("test2")) + assert.Contains(t, members, []byte("test5")) + assert.NotContains(t, members, []byte("test1")) + assert.NotContains(t, members, []byte("test3")) + assert.NotContains(t, members, []byte("test4")) +} diff --git a/uniquequeue.go b/uniquequeue.go new file mode 100644 index 0000000..8d2676b --- /dev/null +++ b/uniquequeue.go @@ -0,0 +1,184 @@ +// Copyright 2020 Andrew Thornton. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import ( + "fmt" + + "github.com/syndtr/goleveldb/leveldb" +) + +const ( + uniqueQueuePrefixStr = "unique" +) + +// UniqueQueue defines an unique queue struct +type UniqueQueue struct { + q *Queue + set *Set + db *leveldb.DB + closeUnderlyingDB bool +} + +// OpenUnique opens an unique queue from the db path or creates a set if it doesn't exist. +// The keys in the queue portion will not be prefixed, and the set keys will be prefixed with "set-" +func OpenUnique(dataDir string) (*UniqueQueue, error) { + db, err := leveldb.OpenFile(dataDir, nil) + if err != nil { + return nil, err + } + return NewUniqueQueue(db, []byte{}, []byte(uniqueQueuePrefixStr), true) +} + +// NewUniqueQueue creates a new unique queue from a db. +// The queue keys will be prefixed with queuePrefix and the set keys with setPrefix +// and at close the db will be closed as per closeUnderlyingDB +func NewUniqueQueue(db *leveldb.DB, queuePrefix []byte, setPrefix []byte, closeUnderlyingDB bool) (*UniqueQueue, error) { + internal, err := NewQueue(db, queuePrefix, false) + if err != nil { + return nil, err + } + set, err := NewSet(db, setPrefix, false) + if err != nil { + return nil, err + } + queue := &UniqueQueue{ + q: internal, + set: set, + db: db, + closeUnderlyingDB: closeUnderlyingDB, + } + + return queue, err +} + +// LPush pushes data to the left of the queue +func (queue *UniqueQueue) LPush(data []byte) error { + return queue.LPushFunc(data, nil) +} + +// LPushFunc pushes data to the left of the queue and calls the callback if it is added +func (queue *UniqueQueue) LPushFunc(data []byte, fn func() error) error { + added, err := queue.set.Add(data) + if err != nil { + return err + } + if !added { + return ErrAlreadyInQueue + } + + if fn != nil { + err = fn() + if err != nil { + _, remErr := queue.set.Remove(data) + if remErr != nil { + return fmt.Errorf("%v & %v", err, remErr) + } + return err + } + } + + return queue.q.LPush(data) +} + +// RPush pushes data to the right of the queue +func (queue *UniqueQueue) RPush(data []byte) error { + return queue.RPushFunc(data, nil) +} + +// RPushFunc pushes data to the right of the queue and calls the callback if is added +func (queue *UniqueQueue) RPushFunc(data []byte, fn func() error) error { + added, err := queue.set.Add(data) + if err != nil { + return err + } + if !added { + return ErrAlreadyInQueue + } + + if fn != nil { + err = fn() + if err != nil { + _, remErr := queue.set.Remove(data) + if remErr != nil { + return fmt.Errorf("%v & %v", err, remErr) + } + return err + } + } + + return queue.q.RPush(data) +} + +// RPop pop data from the right of the queue +func (queue *UniqueQueue) RPop() ([]byte, error) { + popped, err := queue.q.RPop() + if err != nil { + return popped, err + } + _, err = queue.set.Remove(popped) + + return popped, err +} + +// RHandle receives a user callback function to handle the right element of the queue, if the function returns nil, then delete the element, otherwise keep the element. +func (queue *UniqueQueue) RHandle(h func([]byte) error) error { + return queue.q.RHandle(func(data []byte) error { + err := h(data) + if err != nil { + return err + } + _, err = queue.set.Remove(data) + return err + }) +} + +// LPop pops data from left of the queue +func (queue *UniqueQueue) LPop() ([]byte, error) { + popped, err := queue.q.LPop() + if err != nil { + return popped, err + } + _, err = queue.set.Remove(popped) + + return popped, err +} + +// LHandle receives a user callback function to handle the left element of the queue, if the function returns nil, then delete the element, otherwise keep the element. +func (queue *UniqueQueue) LHandle(h func([]byte) error) error { + return queue.q.LHandle(func(data []byte) error { + err := h(data) + if err != nil { + return err + } + _, err = queue.set.Remove(data) + return err + }) +} + +// Has checks whether the data is already in the queue +func (queue *UniqueQueue) Has(data []byte) (bool, error) { + return queue.set.Has(data) +} + +// Len returns the length of the queue +func (queue *UniqueQueue) Len() int64 { + queue.set.lock.Lock() + defer queue.set.lock.Unlock() + return queue.q.Len() +} + +// Close closes the queue (and the underlying DB if set to closeUnderlyingDB) +func (queue *UniqueQueue) Close() error { + _ = queue.q.Close() + _ = queue.set.Close() + if !queue.closeUnderlyingDB { + queue.db = nil + return nil + } + err := queue.db.Close() + queue.db = nil + return err +} diff --git a/uniquequeue_test.go b/uniquequeue_test.go new file mode 100644 index 0000000..710e6be --- /dev/null +++ b/uniquequeue_test.go @@ -0,0 +1,294 @@ +// Copyright 2020 Andrew Thornton. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package levelqueue + +import ( + "os" + "strconv" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUniqueQueue(t *testing.T) { + const dbDir = "./uniquequeue" + os.RemoveAll(dbDir) + + queue, err := OpenUnique(dbDir) + assert.NoError(t, err) + + err = queue.RPush([]byte("test")) + assert.NoError(t, err) + + assert.EqualValues(t, 1, queue.Len()) + + err = queue.RPush([]byte("test")) + assert.Error(t, err) + + assert.EqualValues(t, 1, queue.Len()) + + err = queue.LPush([]byte("test")) + assert.Error(t, err) + + assert.EqualValues(t, 1, queue.Len()) + + data, err := queue.LPop() + assert.NoError(t, err) + assert.EqualValues(t, "test", string(data)) + + // should be empty + data, err = queue.LPop() + assert.Error(t, err) + assert.EqualValues(t, []byte(nil), data) + assert.EqualValues(t, ErrNotFound, err) + + assert.EqualValues(t, 0, queue.Len()) + + err = queue.LPush([]byte("test2")) + assert.NoError(t, err) + + err = queue.LPush([]byte("test2")) + assert.Error(t, err) + + assert.EqualValues(t, 1, queue.Len()) + + data, err = queue.LPop() + assert.NoError(t, err) + assert.EqualValues(t, "test2", string(data)) + + assert.EqualValues(t, 0, queue.Len()) + + data, err = queue.LPop() + assert.Error(t, err) + assert.EqualValues(t, []byte(nil), data) + assert.EqualValues(t, ErrNotFound, err) + + data, err = queue.RPop() + assert.Error(t, err) + assert.EqualValues(t, []byte(nil), data) + assert.EqualValues(t, ErrNotFound, err) + + err = queue.Close() + assert.NoError(t, err) + + queue, err = OpenUnique(dbDir) + assert.NoError(t, err) + + err = queue.RPush([]byte("test3")) + assert.NoError(t, err) + assert.EqualValues(t, 1, queue.Len()) + + has, err := queue.Has([]byte("test3")) + assert.NoError(t, err) + assert.True(t, has) + + has, err = queue.Has([]byte("test4")) + assert.NoError(t, err) + assert.False(t, has) + + err = queue.LPush([]byte("test4")) + assert.NoError(t, err) + + assert.EqualValues(t, 2, queue.Len()) + + data, err = queue.RPop() + assert.NoError(t, err) + assert.EqualValues(t, "test3", string(data)) + + data, err = queue.RPop() + assert.NoError(t, err) + assert.EqualValues(t, "test4", string(data)) + + has, err = queue.Has([]byte("test3")) + assert.NoError(t, err) + assert.False(t, has) + + has, err = queue.Has([]byte("test4")) + assert.NoError(t, err) + assert.False(t, has) +} + +func TestGoroutines_uniquequeue(t *testing.T) { + const dbDir = "./uniquequeue" + + os.RemoveAll(dbDir) + queue, err := OpenUnique(dbDir) + assert.NoError(t, err) + + for i := 0; i < 10; i++ { + err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...)) + assert.NoError(t, err) + } + + var w sync.WaitGroup + for i := 0; i < 10; i++ { + w.Add(1) + go func(i int) { + if i%2 == 0 { + err := queue.RPush(append([]byte("concurrent"), []byte(strconv.Itoa(i))...)) + assert.NoError(t, err) + } else { + _, err := queue.RPop() + assert.NoError(t, err) + } + w.Done() + }(i) + } + w.Wait() +} + +func TestGoroutines2_uniquequeue(t *testing.T) { + const dbDir = "./uniquequeue" + + os.RemoveAll(dbDir) + queue, err := OpenUnique(dbDir) + assert.NoError(t, err) + + for i := 0; i < 10; i++ { + err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...)) + assert.NoError(t, err) + } + + var w sync.WaitGroup + for i := 0; i < 10; i++ { + w.Add(1) + go func(i int) { + if i%2 == 0 { + err := queue.LPush(append([]byte("conc"), []byte(strconv.Itoa(i))...)) + assert.NoError(t, err) + } else { + _, err := queue.RPop() + assert.NoError(t, err) + } + w.Done() + }(i) + } + w.Wait() +} + +func TestGoroutines3_uniquequeue(t *testing.T) { + const dbDir = "./uniquequeue" + + os.RemoveAll(dbDir) + queue, err := OpenUnique(dbDir) + assert.NoError(t, err) + + for i := 0; i < 10; i++ { + err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...)) + assert.NoError(t, err) + } + + var w sync.WaitGroup + for i := 0; i < 10; i++ { + w.Add(1) + go func(i int) { + if i%2 == 0 { + err := queue.LPush(append([]byte("conc"), []byte(strconv.Itoa(i))...)) + assert.NoError(t, err) + } else { + _, err := queue.LPop() + assert.NoError(t, err) + } + w.Done() + }(i) + } + w.Wait() +} + +func TestHandle_uniquequeue(t *testing.T) { + const dbDir = "./uniquequeue" + + os.RemoveAll(dbDir) + queue, err := OpenUnique(dbDir) + assert.NoError(t, err) + + for i := 0; i < 10; i++ { + err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...)) + assert.NoError(t, err) + } + + var w sync.WaitGroup + for i := 0; i < 10; i++ { + w.Add(1) + go func(i int) { + if i%2 == 0 { + err := queue.RPush(append([]byte("conc"), []byte(strconv.Itoa(i))...)) + assert.NoError(t, err) + } else { + err := queue.RHandle(func(data []byte) error { + return nil + }) + assert.NoError(t, err) + } + w.Done() + }(i) + } + w.Wait() +} + +func TestHandle2_uniquequeue(t *testing.T) { + const dbDir = "./uniquequeue" + + os.RemoveAll(dbDir) + queue, err := OpenUnique(dbDir) + assert.NoError(t, err) + + for i := 0; i < 10; i++ { + err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...)) + assert.NoError(t, err) + } + assert.EqualValues(t, 10, queue.Len()) + + var w sync.WaitGroup + for i := 0; i < 10; i++ { + w.Add(1) + go func(i int) { + if i%2 == 0 { + err := queue.LPush(append([]byte("conc"), []byte(strconv.Itoa(i))...)) + assert.NoError(t, err) + } else { + err := queue.LHandle(func(data []byte) error { + return nil + }) + assert.NoError(t, err) + } + w.Done() + }(i) + } + w.Wait() +} + +func BenchmarkPush_uniquequeue(b *testing.B) { + const dbDir = "./uniquequeue_push" + + os.RemoveAll(dbDir) + queue, err := OpenUnique(dbDir) + assert.NoError(b, err) + + for i := 0; i < b.N; i++ { + err = queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...)) + assert.NoError(b, err) + } +} + +func BenchmarkPop_uniquequeue(b *testing.B) { + const dbDir = "./uniquequeue_pop" + + os.RemoveAll(dbDir) + queue, err := Open(dbDir) + assert.NoError(b, err) + b.StopTimer() + for i := 0; i < b.N; i++ { + err = queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...)) + assert.NoError(b, err) + } + b.StartTimer() + + for i := 0; i < b.N; i++ { + _, err = queue.RPop() + assert.NoError(b, err) + } +}