Andrew Thornton
4dd4891972
Improve documentation Separate out set, uniquequeue and queue Add Has Move to use unique queue Add SIsMember Update README Add redis-like set commands
295 lines
6.0 KiB
Go
295 lines
6.0 KiB
Go
// Copyright 2020 Andrew Thornton. All rights reserved.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package levelqueue
|
|
|
|
import (
|
|
"os"
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
func TestUniqueQueue(t *testing.T) {
|
|
const dbDir = "./uniquequeue"
|
|
os.RemoveAll(dbDir)
|
|
|
|
queue, err := OpenUnique(dbDir)
|
|
assert.NoError(t, err)
|
|
|
|
err = queue.RPush([]byte("test"))
|
|
assert.NoError(t, err)
|
|
|
|
assert.EqualValues(t, 1, queue.Len())
|
|
|
|
err = queue.RPush([]byte("test"))
|
|
assert.Error(t, err)
|
|
|
|
assert.EqualValues(t, 1, queue.Len())
|
|
|
|
err = queue.LPush([]byte("test"))
|
|
assert.Error(t, err)
|
|
|
|
assert.EqualValues(t, 1, queue.Len())
|
|
|
|
data, err := queue.LPop()
|
|
assert.NoError(t, err)
|
|
assert.EqualValues(t, "test", string(data))
|
|
|
|
// should be empty
|
|
data, err = queue.LPop()
|
|
assert.Error(t, err)
|
|
assert.EqualValues(t, []byte(nil), data)
|
|
assert.EqualValues(t, ErrNotFound, err)
|
|
|
|
assert.EqualValues(t, 0, queue.Len())
|
|
|
|
err = queue.LPush([]byte("test2"))
|
|
assert.NoError(t, err)
|
|
|
|
err = queue.LPush([]byte("test2"))
|
|
assert.Error(t, err)
|
|
|
|
assert.EqualValues(t, 1, queue.Len())
|
|
|
|
data, err = queue.LPop()
|
|
assert.NoError(t, err)
|
|
assert.EqualValues(t, "test2", string(data))
|
|
|
|
assert.EqualValues(t, 0, queue.Len())
|
|
|
|
data, err = queue.LPop()
|
|
assert.Error(t, err)
|
|
assert.EqualValues(t, []byte(nil), data)
|
|
assert.EqualValues(t, ErrNotFound, err)
|
|
|
|
data, err = queue.RPop()
|
|
assert.Error(t, err)
|
|
assert.EqualValues(t, []byte(nil), data)
|
|
assert.EqualValues(t, ErrNotFound, err)
|
|
|
|
err = queue.Close()
|
|
assert.NoError(t, err)
|
|
|
|
queue, err = OpenUnique(dbDir)
|
|
assert.NoError(t, err)
|
|
|
|
err = queue.RPush([]byte("test3"))
|
|
assert.NoError(t, err)
|
|
assert.EqualValues(t, 1, queue.Len())
|
|
|
|
has, err := queue.Has([]byte("test3"))
|
|
assert.NoError(t, err)
|
|
assert.True(t, has)
|
|
|
|
has, err = queue.Has([]byte("test4"))
|
|
assert.NoError(t, err)
|
|
assert.False(t, has)
|
|
|
|
err = queue.LPush([]byte("test4"))
|
|
assert.NoError(t, err)
|
|
|
|
assert.EqualValues(t, 2, queue.Len())
|
|
|
|
data, err = queue.RPop()
|
|
assert.NoError(t, err)
|
|
assert.EqualValues(t, "test3", string(data))
|
|
|
|
data, err = queue.RPop()
|
|
assert.NoError(t, err)
|
|
assert.EqualValues(t, "test4", string(data))
|
|
|
|
has, err = queue.Has([]byte("test3"))
|
|
assert.NoError(t, err)
|
|
assert.False(t, has)
|
|
|
|
has, err = queue.Has([]byte("test4"))
|
|
assert.NoError(t, err)
|
|
assert.False(t, has)
|
|
}
|
|
|
|
func TestGoroutines_uniquequeue(t *testing.T) {
|
|
const dbDir = "./uniquequeue"
|
|
|
|
os.RemoveAll(dbDir)
|
|
queue, err := OpenUnique(dbDir)
|
|
assert.NoError(t, err)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(t, err)
|
|
}
|
|
|
|
var w sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
w.Add(1)
|
|
go func(i int) {
|
|
if i%2 == 0 {
|
|
err := queue.RPush(append([]byte("concurrent"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(t, err)
|
|
} else {
|
|
_, err := queue.RPop()
|
|
assert.NoError(t, err)
|
|
}
|
|
w.Done()
|
|
}(i)
|
|
}
|
|
w.Wait()
|
|
}
|
|
|
|
func TestGoroutines2_uniquequeue(t *testing.T) {
|
|
const dbDir = "./uniquequeue"
|
|
|
|
os.RemoveAll(dbDir)
|
|
queue, err := OpenUnique(dbDir)
|
|
assert.NoError(t, err)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(t, err)
|
|
}
|
|
|
|
var w sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
w.Add(1)
|
|
go func(i int) {
|
|
if i%2 == 0 {
|
|
err := queue.LPush(append([]byte("conc"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(t, err)
|
|
} else {
|
|
_, err := queue.RPop()
|
|
assert.NoError(t, err)
|
|
}
|
|
w.Done()
|
|
}(i)
|
|
}
|
|
w.Wait()
|
|
}
|
|
|
|
func TestGoroutines3_uniquequeue(t *testing.T) {
|
|
const dbDir = "./uniquequeue"
|
|
|
|
os.RemoveAll(dbDir)
|
|
queue, err := OpenUnique(dbDir)
|
|
assert.NoError(t, err)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(t, err)
|
|
}
|
|
|
|
var w sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
w.Add(1)
|
|
go func(i int) {
|
|
if i%2 == 0 {
|
|
err := queue.LPush(append([]byte("conc"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(t, err)
|
|
} else {
|
|
_, err := queue.LPop()
|
|
assert.NoError(t, err)
|
|
}
|
|
w.Done()
|
|
}(i)
|
|
}
|
|
w.Wait()
|
|
}
|
|
|
|
func TestHandle_uniquequeue(t *testing.T) {
|
|
const dbDir = "./uniquequeue"
|
|
|
|
os.RemoveAll(dbDir)
|
|
queue, err := OpenUnique(dbDir)
|
|
assert.NoError(t, err)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(t, err)
|
|
}
|
|
|
|
var w sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
w.Add(1)
|
|
go func(i int) {
|
|
if i%2 == 0 {
|
|
err := queue.RPush(append([]byte("conc"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(t, err)
|
|
} else {
|
|
err := queue.RHandle(func(data []byte) error {
|
|
return nil
|
|
})
|
|
assert.NoError(t, err)
|
|
}
|
|
w.Done()
|
|
}(i)
|
|
}
|
|
w.Wait()
|
|
}
|
|
|
|
func TestHandle2_uniquequeue(t *testing.T) {
|
|
const dbDir = "./uniquequeue"
|
|
|
|
os.RemoveAll(dbDir)
|
|
queue, err := OpenUnique(dbDir)
|
|
assert.NoError(t, err)
|
|
|
|
for i := 0; i < 10; i++ {
|
|
err := queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(t, err)
|
|
}
|
|
assert.EqualValues(t, 10, queue.Len())
|
|
|
|
var w sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
w.Add(1)
|
|
go func(i int) {
|
|
if i%2 == 0 {
|
|
err := queue.LPush(append([]byte("conc"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(t, err)
|
|
} else {
|
|
err := queue.LHandle(func(data []byte) error {
|
|
return nil
|
|
})
|
|
assert.NoError(t, err)
|
|
}
|
|
w.Done()
|
|
}(i)
|
|
}
|
|
w.Wait()
|
|
}
|
|
|
|
func BenchmarkPush_uniquequeue(b *testing.B) {
|
|
const dbDir = "./uniquequeue_push"
|
|
|
|
os.RemoveAll(dbDir)
|
|
queue, err := OpenUnique(dbDir)
|
|
assert.NoError(b, err)
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
err = queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(b, err)
|
|
}
|
|
}
|
|
|
|
func BenchmarkPop_uniquequeue(b *testing.B) {
|
|
const dbDir = "./uniquequeue_pop"
|
|
|
|
os.RemoveAll(dbDir)
|
|
queue, err := Open(dbDir)
|
|
assert.NoError(b, err)
|
|
b.StopTimer()
|
|
for i := 0; i < b.N; i++ {
|
|
err = queue.RPush(append([]byte("test"), []byte(strconv.Itoa(i))...))
|
|
assert.NoError(b, err)
|
|
}
|
|
b.StartTimer()
|
|
|
|
for i := 0; i < b.N; i++ {
|
|
_, err = queue.RPop()
|
|
assert.NoError(b, err)
|
|
}
|
|
}
|