levelqueue/uniquequeue.go
Andrew Thornton f020868cc2
All checks were successful
continuous-integration/drone/push Build is passing
Prevent NPE if pushes/gets occur after the db is already closed (#6)
Reviewed-on: #6
Reviewed-by: Lunny Xiao <xiaolunwen@gmail.com>
Co-authored-by: Andrew Thornton <art27@cantab.net>
Co-committed-by: Andrew Thornton <art27@cantab.net>
2022-07-29 13:47:28 +08:00

194 lines
4.7 KiB
Go

// Copyright 2020 Andrew Thornton. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package levelqueue
import (
"fmt"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
)
const (
uniqueQueuePrefixStr = "unique"
)
// UniqueQueue defines an unique queue struct
type UniqueQueue struct {
q *Queue
set *Set
db *leveldb.DB
closeUnderlyingDB bool
}
// OpenUnique opens an unique queue from the db path or creates a set if it doesn't exist.
// The keys in the queue portion will not be prefixed, and the set keys will be prefixed with "set-"
func OpenUnique(dataDir string) (*UniqueQueue, error) {
db, err := leveldb.OpenFile(dataDir, nil)
if err != nil {
if !errors.IsCorrupted(err) {
return nil, err
}
db, err = leveldb.RecoverFile(dataDir, nil)
if err != nil {
return nil, err
}
}
return NewUniqueQueue(db, []byte{}, []byte(uniqueQueuePrefixStr), true)
}
// NewUniqueQueue creates a new unique queue from a db.
// The queue keys will be prefixed with queuePrefix and the set keys with setPrefix
// and at close the db will be closed as per closeUnderlyingDB
func NewUniqueQueue(db *leveldb.DB, queuePrefix []byte, setPrefix []byte, closeUnderlyingDB bool) (*UniqueQueue, error) {
internal, err := NewQueue(db, queuePrefix, false)
if err != nil {
return nil, err
}
set, err := NewSet(db, setPrefix, false)
if err != nil {
return nil, err
}
queue := &UniqueQueue{
q: internal,
set: set,
db: db,
closeUnderlyingDB: closeUnderlyingDB,
}
return queue, err
}
// LPush pushes data to the left of the queue
func (queue *UniqueQueue) LPush(data []byte) error {
return queue.LPushFunc(data, nil)
}
// LPushFunc pushes data to the left of the queue and calls the callback if it is added
func (queue *UniqueQueue) LPushFunc(data []byte, fn func() error) error {
added, err := queue.set.Add(data)
if err != nil {
return err
}
if !added {
return ErrAlreadyInQueue
}
if fn != nil {
err = fn()
if err != nil {
_, remErr := queue.set.Remove(data)
if remErr != nil {
return fmt.Errorf("%v & %v", err, remErr)
}
return err
}
}
return queue.q.LPush(data)
}
// RPush pushes data to the right of the queue
func (queue *UniqueQueue) RPush(data []byte) error {
return queue.RPushFunc(data, nil)
}
// RPushFunc pushes data to the right of the queue and calls the callback if is added
func (queue *UniqueQueue) RPushFunc(data []byte, fn func() error) error {
added, err := queue.set.Add(data)
if err != nil {
return err
}
if !added {
return ErrAlreadyInQueue
}
if fn != nil {
err = fn()
if err != nil {
_, remErr := queue.set.Remove(data)
if remErr != nil {
return fmt.Errorf("%v & %v", err, remErr)
}
return err
}
}
return queue.q.RPush(data)
}
// RPop pop data from the right of the queue
func (queue *UniqueQueue) RPop() ([]byte, error) {
popped, err := queue.q.RPop()
if err != nil {
return popped, err
}
_, err = queue.set.Remove(popped)
return popped, err
}
// RHandle receives a user callback function to handle the right element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
func (queue *UniqueQueue) RHandle(h func([]byte) error) error {
return queue.q.RHandle(func(data []byte) error {
err := h(data)
if err != nil {
return err
}
_, err = queue.set.Remove(data)
return err
})
}
// LPop pops data from left of the queue
func (queue *UniqueQueue) LPop() ([]byte, error) {
popped, err := queue.q.LPop()
if err != nil {
return popped, err
}
_, err = queue.set.Remove(popped)
return popped, err
}
// LHandle receives a user callback function to handle the left element of the queue, if the function returns nil, then delete the element, otherwise keep the element.
func (queue *UniqueQueue) LHandle(h func([]byte) error) error {
return queue.q.LHandle(func(data []byte) error {
err := h(data)
if err != nil {
return err
}
_, err = queue.set.Remove(data)
return err
})
}
// Has checks whether the data is already in the queue
func (queue *UniqueQueue) Has(data []byte) (bool, error) {
return queue.set.Has(data)
}
// Len returns the length of the queue
func (queue *UniqueQueue) Len() int64 {
queue.set.lock.Lock()
defer queue.set.lock.Unlock()
return queue.q.Len()
}
// Close closes the queue (and the underlying DB if set to closeUnderlyingDB)
func (queue *UniqueQueue) Close() error {
_ = queue.q.Close()
_ = queue.set.Close()
queue.set.lock.Lock()
defer queue.set.lock.Unlock()
if !queue.closeUnderlyingDB || queue.db == nil {
queue.db = nil
return nil
}
err := queue.db.Close()
queue.db = nil
return err
}