Browse Source

support search after style pagination (#1182)

* support search after style pagination

Typically deep pagination becomes quite expensive when using
the size+from mechanism on distributed indexes, due to each
child index having to build large result sets, and return
them over the network for merging at the coordinating
node.

Alternatively, the search after approach allows for the client
to maintain state, in this case, the sort key of the last
result for the current page.  By running the same search again,
but this time providing the last hit's sort key as this
new requests search after key, we allow for significantly
more efficient pagination.

TotalHits and Facets are unaffected by using this parameter,
because all hits are still seen by the collector.

* fix incorrectly named method

* add support for SearchBefore

Adds a capability to go one page backwards in deep pagination
of search results, similar to going forwards using SearchAfter.

* reset request back to original

* refactor integration tests to allow for testing aliases

added integration tests for search_before and search_after
which go through index aliases

* fix typo

* fix incorrect type for method receiver
tags/v0.8.0
Marty Schoch GitHub 5 months ago
parent
commit
7ad4bbc2ad
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 415 additions and 80 deletions
  1. +22
    -24
      index_alias_impl.go
  2. +56
    -8
      index_impl.go
  3. +35
    -0
      search.go
  4. +29
    -0
      search/collector/topn.go
  5. +29
    -0
      search/sort.go
  6. +100
    -47
      test/integration_test.go
  7. +3
    -0
      test/tests/alias/datasets/shard0/a.json
  8. +3
    -0
      test/tests/alias/datasets/shard0/c.json
  9. +3
    -0
      test/tests/alias/datasets/shard1/b.json
  10. +3
    -0
      test/tests/alias/datasets/shard1/d.json
  11. +3
    -0
      test/tests/alias/mapping.json
  12. +76
    -0
      test/tests/alias/searches.json
  13. +53
    -1
      test/tests/sort/searches.json

+ 22
- 24
index_alias_impl.go View File

@@ -434,6 +434,8 @@ func createChildSearchRequest(req *SearchRequest) *SearchRequest {
Sort: req.Sort.Copy(),
IncludeLocations: req.IncludeLocations,
Score: req.Score,
SearchAfter: req.SearchAfter,
SearchBefore: req.SearchBefore,
}
return &rv
}
@@ -451,6 +453,14 @@ func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Se
searchStart := time.Now()
asyncResults := make(chan *asyncSearchResult, len(indexes))

var reverseQueryExecution bool
if req.SearchBefore != nil {
reverseQueryExecution = true
req.Sort.Reverse()
req.SearchAfter = req.SearchBefore
req.SearchBefore = nil
}

// run search on each index in separate go routine
var waitGroup sync.WaitGroup

@@ -503,7 +513,7 @@ func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Se

// sort all hits with the requested order
if len(req.Sort) > 0 {
sorter := newMultiSearchHitSorter(req.Sort, sr.Hits)
sorter := newSearchHitSorter(req.Sort, sr.Hits)
sort.Sort(sorter)
}

@@ -524,6 +534,17 @@ func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Se
sr.Facets.Fixup(name, fr.Size)
}

if reverseQueryExecution {
// reverse the sort back to the original
req.Sort.Reverse()
// resort using the original order
mhs := newSearchHitSorter(req.Sort, sr.Hits)
sort.Sort(mhs)
// reset request
req.SearchBefore = req.SearchAfter
req.SearchAfter = nil
}

// fix up original request
sr.Request = req
searchDuration := time.Since(searchStart)
@@ -581,26 +602,3 @@ func (f *indexAliasImplFieldDict) Close() error {
defer f.index.mutex.RUnlock()
return f.fieldDict.Close()
}

type multiSearchHitSorter struct {
hits search.DocumentMatchCollection
sort search.SortOrder
cachedScoring []bool
cachedDesc []bool
}

func newMultiSearchHitSorter(sort search.SortOrder, hits search.DocumentMatchCollection) *multiSearchHitSorter {
return &multiSearchHitSorter{
sort: sort,
hits: hits,
cachedScoring: sort.CacheIsScore(),
cachedDesc: sort.CacheDescending(),
}
}

func (m *multiSearchHitSorter) Len() int { return len(m.hits) }
func (m *multiSearchHitSorter) Swap(i, j int) { m.hits[i], m.hits[j] = m.hits[j], m.hits[i] }
func (m *multiSearchHitSorter) Less(i, j int) bool {
c := m.sort.Compare(m.cachedScoring, m.cachedDesc, m.hits[i], m.hits[j])
return c < 0
}

+ 56
- 8
index_impl.go View File

@@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"os"
"sort"
"sync"
"sync/atomic"
"time"
@@ -442,7 +443,20 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
return nil, ErrorIndexClosed
}

collector := collector.NewTopNCollector(req.Size, req.From, req.Sort)
var reverseQueryExecution bool
if req.SearchBefore != nil {
reverseQueryExecution = true
req.Sort.Reverse()
req.SearchAfter = req.SearchBefore
req.SearchBefore = nil
}

var coll *collector.TopNCollector
if req.SearchAfter != nil {
coll = collector.NewTopNCollectorAfter(req.Size, req.Sort, req.SearchAfter)
} else {
coll = collector.NewTopNCollector(req.Size, req.From, req.Sort)
}

// open a reader for this search
indexReader, err := i.i.Reader()
@@ -494,10 +508,10 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
facetsBuilder.Add(facetName, facetBuilder)
}
}
collector.SetFacetsBuilder(facetsBuilder)
coll.SetFacetsBuilder(facetsBuilder)
}

memNeeded := memNeededForSearch(req, searcher, collector)
memNeeded := memNeededForSearch(req, searcher, coll)
if cb := ctx.Value(SearchQueryStartCallbackKey); cb != nil {
if cbF, ok := cb.(SearchQueryStartCallbackFn); ok {
err = cbF(memNeeded)
@@ -515,12 +529,12 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
}
}

err = collector.Collect(ctx, searcher, indexReader)
err = coll.Collect(ctx, searcher, indexReader)
if err != nil {
return nil, err
}

hits := collector.Results()
hits := coll.Results()

var highlighter highlight.Highlighter

@@ -560,6 +574,17 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
logger.Printf("slow search took %s - %v", searchDuration, req)
}

if reverseQueryExecution {
// reverse the sort back to the original
req.Sort.Reverse()
// resort using the original order
mhs := newSearchHitSorter(req.Sort, hits)
sort.Sort(mhs)
// reset request
req.SearchBefore = req.SearchAfter
req.SearchAfter = nil
}

return &SearchResult{
Status: &SearchStatus{
Total: 1,
@@ -567,10 +592,10 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
},
Request: req,
Hits: hits,
Total: collector.Total(),
MaxScore: collector.MaxScore(),
Total: coll.Total(),
MaxScore: coll.MaxScore(),
Took: searchDuration,
Facets: collector.FacetResults(),
Facets: coll.FacetResults(),
}, nil
}

@@ -865,3 +890,26 @@ func deDuplicate(fields []string) []string {
}
return ret
}

type searchHitSorter struct {
hits search.DocumentMatchCollection
sort search.SortOrder
cachedScoring []bool
cachedDesc []bool
}

func newSearchHitSorter(sort search.SortOrder, hits search.DocumentMatchCollection) *searchHitSorter {
return &searchHitSorter{
sort: sort,
hits: hits,
cachedScoring: sort.CacheIsScore(),
cachedDesc: sort.CacheDescending(),
}
}

func (m *searchHitSorter) Len() int { return len(m.hits) }
func (m *searchHitSorter) Swap(i, j int) { m.hits[i], m.hits[j] = m.hits[j], m.hits[i] }
func (m *searchHitSorter) Less(i, j int) bool {
c := m.sort.Compare(m.cachedScoring, m.cachedDesc, m.hits[i], m.hits[j])
return c < 0
}

+ 35
- 0
search.go View File

@@ -262,6 +262,8 @@ func (h *HighlightRequest) AddField(field string) {
// result score explanations.
// Sort describes the desired order for the results to be returned.
// Score controls the kind of scoring performed
// SearchAfter supports deep paging by providing a minimum sort key
// SearchBefore supports deep paging by providing a maximum sort key
//
// A special field named "*" can be used to return all fields.
type SearchRequest struct {
@@ -275,6 +277,8 @@ type SearchRequest struct {
Sort search.SortOrder `json:"sort"`
IncludeLocations bool `json:"includeLocations"`
Score string `json:"score,omitempty"`
SearchAfter []string `json:"search_after"`
SearchBefore []string `json:"search_before"`
}

func (r *SearchRequest) Validate() error {
@@ -285,6 +289,27 @@ func (r *SearchRequest) Validate() error {
}
}

if r.SearchAfter != nil && r.SearchBefore != nil {
return fmt.Errorf("cannot use search after and search before together")
}

if r.SearchAfter != nil {
if r.From != 0 {
return fmt.Errorf("cannot use search after with from !=0")
}
if len(r.SearchAfter) != len(r.Sort) {
return fmt.Errorf("search after must have same size as sort order")
}
}
if r.SearchBefore != nil {
if r.From != 0 {
return fmt.Errorf("cannot use search before with from !=0")
}
if len(r.SearchBefore) != len(r.Sort) {
return fmt.Errorf("search before must have same size as sort order")
}
}

return r.Facets.Validate()
}

@@ -311,6 +336,12 @@ func (r *SearchRequest) SortByCustom(order search.SortOrder) {
r.Sort = order
}

// SetSearchAfter sets the request to skip over hits with a sort
// value less than the provided sort after key
func (r *SearchRequest) SetSearchAfter(after []string) {
r.SearchAfter = after
}

// UnmarshalJSON deserializes a JSON representation of
// a SearchRequest
func (r *SearchRequest) UnmarshalJSON(input []byte) error {
@@ -325,6 +356,8 @@ func (r *SearchRequest) UnmarshalJSON(input []byte) error {
Sort []json.RawMessage `json:"sort"`
IncludeLocations bool `json:"includeLocations"`
Score string `json:"score"`
SearchAfter []string `json:"search_after"`
SearchBefore []string `json:"search_before"`
}

err := json.Unmarshal(input, &temp)
@@ -352,6 +385,8 @@ func (r *SearchRequest) UnmarshalJSON(input []byte) error {
r.Facets = temp.Facets
r.IncludeLocations = temp.IncludeLocations
r.Score = temp.Score
r.SearchAfter = temp.SearchAfter
r.SearchBefore = temp.SearchBefore
r.Query, err = query.ParseQuery(temp.Q)
if err != nil {
return err


+ 29
- 0
search/collector/topn.go View File

@@ -69,6 +69,7 @@ type TopNCollector struct {
lowestMatchOutsideResults *search.DocumentMatch
updateFieldVisitor index.DocumentFieldTermVisitor
dvReader index.DocValueReader
searchAfter *search.DocumentMatch
}

// CheckDoneEvery controls how frequently we check the context deadline
@@ -78,6 +79,21 @@ const CheckDoneEvery = uint64(1024)
// skipping over the first 'skip' hits
// ordering hits by the provided sort order
func NewTopNCollector(size int, skip int, sort search.SortOrder) *TopNCollector {
return newTopNCollector(size, skip, sort)
}

// NewTopNCollector builds a collector to find the top 'size' hits
// skipping over the first 'skip' hits
// ordering hits by the provided sort order
func NewTopNCollectorAfter(size int, sort search.SortOrder, after []string) *TopNCollector {
rv := newTopNCollector(size, 0, sort)
rv.searchAfter = &search.DocumentMatch{
Sort: after,
}
return rv
}

func newTopNCollector(size int, skip int, sort search.SortOrder) *TopNCollector {
hc := &TopNCollector{size: size, skip: skip, sort: sort}

// pre-allocate space on the store to avoid reslicing
@@ -266,6 +282,19 @@ func MakeTopNDocumentMatchHandler(
if d == nil {
return nil
}

// support search after based pagination,
// if this hit is <= the search after sort key
// we should skip it
if hc.searchAfter != nil {
// exact sort order matches use hit number to break tie
// but we want to allow for exact match, so we pretend
hc.searchAfter.HitNumber = d.HitNumber
if hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d, hc.searchAfter) <= 0 {
return nil
}
}

// optimization, we track lowest sorting hit already removed from heap
// with this one comparison, we can avoid all heap operations if
// this hit would have been added and then immediately removed


+ 29
- 0
search/sort.go View File

@@ -38,6 +38,8 @@ type SearchSort interface {
RequiresScoring() bool
RequiresFields() []string

Reverse()

Copy() SearchSort
}

@@ -293,6 +295,12 @@ func (so SortOrder) CacheDescending() []bool {
return rv
}

func (so SortOrder) Reverse() {
for _, soi := range so {
soi.Reverse()
}
}

// SortFieldType lets you control some internal sort behavior
// normally leaving this to the zero-value of SortFieldAuto is fine
type SortFieldType int
@@ -492,6 +500,15 @@ func (s *SortField) Copy() SearchSort {
return &rv
}

func (s *SortField) Reverse() {
s.Desc = !s.Desc
if s.Missing == SortFieldMissingFirst {
s.Missing = SortFieldMissingLast
} else {
s.Missing = SortFieldMissingFirst
}
}

// SortDocID will sort results by the document identifier
type SortDocID struct {
Desc bool
@@ -533,6 +550,10 @@ func (s *SortDocID) Copy() SearchSort {
return &rv
}

func (s *SortDocID) Reverse() {
s.Desc = !s.Desc
}

// SortScore will sort results by the document match score
type SortScore struct {
Desc bool
@@ -574,6 +595,10 @@ func (s *SortScore) Copy() SearchSort {
return &rv
}

func (s *SortScore) Reverse() {
s.Desc = !s.Desc
}

var maxDistance = string(numeric.MustNewPrefixCodedInt64(math.MaxInt64, 0))

// NewSortGeoDistance creates SearchSort instance for sorting documents by
@@ -705,6 +730,10 @@ func (s *SortGeoDistance) Copy() SearchSort {
return &rv
}

func (s *SortGeoDistance) Reverse() {
s.Desc = !s.Desc
}

type BytesSlice [][]byte

func (p BytesSlice) Len() int { return len(p) }


+ 100
- 47
test/integration_test.go View File

@@ -17,6 +17,7 @@ package test
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"math"
"os"
@@ -87,53 +88,29 @@ func runTestDir(t *testing.T, dir, datasetName string) {
return
}

// open new index
if !*keepIndex {
defer func() {
err := os.RemoveAll("test.bleve")
if err != nil {
t.Fatal(err)
}
}()
}
index, err := bleve.New("test.bleve", &mapping)
if err != nil {
t.Errorf("error creating new index: %v", err)
return
}
// set a custom index name
index.SetName(datasetName)
defer func() {
err := index.Close()
if err != nil {
t.Fatal(err)
}
}()
var index bleve.Index
var cleanup func()

// index data
fis, err := ioutil.ReadDir(dir + string(filepath.Separator) + "data")
if err != nil {
t.Errorf("error reading data dir: %v", err)
return
}
for _, fi := range fis {
fileBytes, err := ioutil.ReadFile(dir + string(filepath.Separator) + "data" + string(filepath.Separator) + fi.Name())
// if there is a dir named 'data' open single index
_, err = os.Stat(dir + string(filepath.Separator) + "data")
if !os.IsNotExist(err) {

index, cleanup, err = loadDataSet(t, datasetName, mapping, dir+string(filepath.Separator)+"data")
if err != nil {
t.Errorf("error reading data file: %v", err)
t.Errorf("error loading dataset: %v", err)
return
}
var fileDoc interface{}
err = json.Unmarshal(fileBytes, &fileDoc)
if err != nil {
t.Errorf("error parsing data file as json: %v", err)
}
filename := fi.Name()
ext := filepath.Ext(filename)
id := filename[0 : len(filename)-len(ext)]
err = index.Index(id, fileDoc)
if err != nil {
t.Errorf("error indexing data: %v", err)
return
defer cleanup()
} else {
// if there is a dir named 'datasets' build alias over each index
_, err = os.Stat(dir + string(filepath.Separator) + "datasets")
if !os.IsNotExist(err) {
index, cleanup, err = loadDataSets(t, datasetName, mapping, dir+string(filepath.Separator)+"datasets")
if err != nil {
t.Errorf("error loading dataset: %v", err)
return
}
defer cleanup()
}
}

@@ -165,6 +142,7 @@ func runTestDir(t *testing.T, dir, datasetName string) {
if len(res.Hits) != len(search.Result.Hits) {
t.Errorf("test error - %s", search.Comment)
t.Errorf("test %d - expected hits len: %d got %d", testNum, len(search.Result.Hits), len(res.Hits))
t.Errorf("got hits: %v", res.Hits)
continue
}
for hi, hit := range search.Result.Hits {
@@ -202,12 +180,87 @@ func runTestDir(t *testing.T, dir, datasetName string) {
t.Errorf("test %d - expected facets: %#v got %#v", testNum, search.Result.Facets, res.Facets)
}
}
// check that custom index name is in results
for _, hit := range res.Hits {
if hit.Index != datasetName {
t.Fatalf("expected name: %s, got: %s", datasetName, hit.Index)
if _, ok := index.(bleve.IndexAlias); !ok {
// check that custom index name is in results
for _, hit := range res.Hits {
if hit.Index != datasetName {
t.Fatalf("expected name: %s, got: %s", datasetName, hit.Index)
}
}
}
}
}
}

func loadDataSet(t *testing.T, datasetName string, mapping mapping.IndexMappingImpl, path string) (bleve.Index, func(), error) {
idxPath := fmt.Sprintf("test-%s.bleve", datasetName)
index, err := bleve.New(idxPath, &mapping)
if err != nil {
return nil, nil, fmt.Errorf("error creating new index: %v", err)
}
// set a custom index name
index.SetName(datasetName)

// index data
fis, err := ioutil.ReadDir(path)
if err != nil {
return nil, nil, fmt.Errorf("error reading data dir: %v", err)
}
for _, fi := range fis {
fileBytes, err := ioutil.ReadFile(path + string(filepath.Separator) + fi.Name())
if err != nil {
return nil, nil, fmt.Errorf("error reading data file: %v", err)
}
var fileDoc interface{}
err = json.Unmarshal(fileBytes, &fileDoc)
if err != nil {
return nil, nil, fmt.Errorf("error parsing data file as json: %v", err)
}
filename := fi.Name()
ext := filepath.Ext(filename)
id := filename[0 : len(filename)-len(ext)]
err = index.Index(id, fileDoc)
if err != nil {
return nil, nil, fmt.Errorf("error indexing data: %v", err)
}
}
cleanup := func() {
err := index.Close()
if err != nil {
t.Fatalf("error closing index: %v", err)
}
if !*keepIndex {
err := os.RemoveAll(idxPath)
if err != nil {
t.Fatalf("error removing index: %v", err)
}
}
}
return index, cleanup, nil
}

func loadDataSets(t *testing.T, datasetName string, mapping mapping.IndexMappingImpl, path string) (bleve.Index, func(), error) {
fis, err := ioutil.ReadDir(path)
if err != nil {
return nil, nil, fmt.Errorf("error reading datasets dir: %v", err)
}
var cleanups []func()
alias := bleve.NewIndexAlias()
for _, fi := range fis {
idx, idxCleanup, err := loadDataSet(t, fi.Name(), mapping, path+string(filepath.Separator)+fi.Name())
if err != nil {
return nil, nil, fmt.Errorf("error loading dataset: %v", err)
}
cleanups = append(cleanups, idxCleanup)
alias.Add(idx)
}
alias.SetName(datasetName)

cleanupAll := func() {
for _, cleanup := range cleanups {
cleanup()
}
}

return alias, cleanupAll, nil
}

+ 3
- 0
test/tests/alias/datasets/shard0/a.json View File

@@ -0,0 +1,3 @@
{
"name": "a"
}

+ 3
- 0
test/tests/alias/datasets/shard0/c.json View File

@@ -0,0 +1,3 @@
{
"name": "c"
}

+ 3
- 0
test/tests/alias/datasets/shard1/b.json View File

@@ -0,0 +1,3 @@
{
"name": "b"
}

+ 3
- 0
test/tests/alias/datasets/shard1/d.json View File

@@ -0,0 +1,3 @@
{
"name": "d"
}

+ 3
- 0
test/tests/alias/mapping.json View File

@@ -0,0 +1,3 @@
{
"default_analyzer": "keyword"
}

+ 76
- 0
test/tests/alias/searches.json View File

@@ -0,0 +1,76 @@
[
{
"comment": "match all across shards",
"search": {
"from": 0,
"size": 10,
"sort": ["-_score", "_id"],
"query": {
"match_all": {}
}
},
"result": {
"total_hits": 4,
"hits": [
{
"id": "a"
},
{
"id": "b"
},
{
"id": "c"
},
{
"id": "d"
}
]
}
},
{
"comment": "search after b (page 2 when size=2)",
"search": {
"from": 0,
"size": 2,
"sort": ["name"],
"search_after": ["b"],
"query": {
"match_all": {}
}
},
"result": {
"total_hits": 4,
"hits": [
{
"id": "c"
},
{
"id": "d"
}
]
}
},
{
"comment": "search before c (page 1 when size=2)",
"search": {
"from": 0,
"size": 2,
"sort": ["name"],
"search_before": ["c"],
"query": {
"match_all": {}
}
},
"result": {
"total_hits": 4,
"hits": [
{
"id": "a"
},
{
"id": "b"
}
]
}
}
]

+ 53
- 1
test/tests/sort/searches.json View File

@@ -406,5 +406,57 @@
}
]
}
}
},
{
"comment": "sort by name, ascending, after marty",
"search": {
"from": 0,
"size": 10,
"query": {
"match_all":{}
},
"sort": ["name"],
"search_after": ["marty"]
},
"result": {
"total_hits": 6,
"hits": [
{
"id": "e"
},
{
"id": "b"
},
{
"id": "d"
}
]
}
},
{
"comment": "sort by name, ascending, before nancy",
"search": {
"from": 0,
"size": 10,
"query": {
"match_all":{}
},
"sort": ["name"],
"search_before": ["nancy"]
},
"result": {
"total_hits": 6,
"hits": [
{
"id": "c"
},
{
"id": "f"
},
{
"id": "a"
}
]
}
}
]

Loading…
Cancel
Save