149
149
import java .util .concurrent .atomic .AtomicBoolean ;
150
150
import java .util .concurrent .atomic .AtomicInteger ;
151
151
import java .util .concurrent .atomic .AtomicLong ;
152
+ import java .util .function .BiFunction ;
153
+ import java .util .function .Function ;
152
154
import java .util .function .LongSupplier ;
153
155
import java .util .function .Supplier ;
154
156
@@ -555,16 +557,17 @@ public void executeQueryPhase(ShardSearchRequest request, SearchShardTask task,
555
557
// check if we can shortcut the query phase entirely.
556
558
if (orig .canReturnNullResponseIfMatchNoDocs ()) {
557
559
assert orig .scroll () == null ;
558
- final CanMatchShardResponse canMatchResp ;
559
- try {
560
- ShardSearchRequest clone = new ShardSearchRequest (orig );
561
- canMatchResp = canMatch (clone , false );
562
- } catch (Exception exc ) {
563
- l .onFailure (exc );
564
- return ;
565
- }
560
+ ShardSearchRequest clone = new ShardSearchRequest (orig );
561
+ CanMatchContext canMatchContext = new CanMatchContext (
562
+ clone ,
563
+ indicesService ::indexServiceSafe ,
564
+ this ::findReaderContext ,
565
+ defaultKeepAlive ,
566
+ maxKeepAlive
567
+ );
568
+ CanMatchShardResponse canMatchResp = canMatch (canMatchContext , false );
566
569
if (canMatchResp .canMatch () == false ) {
567
- l .onResponse (QuerySearchResult .nullInstance ());
570
+ listener .onResponse (QuerySearchResult .nullInstance ());
568
571
return ;
569
572
}
570
573
}
@@ -1212,25 +1215,37 @@ public void freeAllScrollContexts() {
1212
1215
}
1213
1216
1214
1217
private long getKeepAlive (ShardSearchRequest request ) {
1218
+ return getKeepAlive (request , defaultKeepAlive , maxKeepAlive );
1219
+ }
1220
+
1221
+ private static long getKeepAlive (ShardSearchRequest request , long defaultKeepAlive , long maxKeepAlive ) {
1215
1222
if (request .scroll () != null ) {
1216
- return getScrollKeepAlive (request .scroll ());
1223
+ return getScrollKeepAlive (request .scroll (), defaultKeepAlive , maxKeepAlive );
1217
1224
} else if (request .keepAlive () != null ) {
1218
- checkKeepAliveLimit (request .keepAlive ().millis ());
1225
+ checkKeepAliveLimit (request .keepAlive ().millis (), maxKeepAlive );
1219
1226
return request .keepAlive ().getMillis ();
1220
1227
} else {
1221
1228
return request .readerId () == null ? defaultKeepAlive : -1 ;
1222
1229
}
1223
1230
}
1224
1231
1225
1232
private long getScrollKeepAlive (Scroll scroll ) {
1233
+ return getScrollKeepAlive (scroll , defaultKeepAlive , maxKeepAlive );
1234
+ }
1235
+
1236
+ private static long getScrollKeepAlive (Scroll scroll , long defaultKeepAlive , long maxKeepAlive ) {
1226
1237
if (scroll != null && scroll .keepAlive () != null ) {
1227
- checkKeepAliveLimit (scroll .keepAlive ().millis ());
1238
+ checkKeepAliveLimit (scroll .keepAlive ().millis (), maxKeepAlive );
1228
1239
return scroll .keepAlive ().getMillis ();
1229
1240
}
1230
1241
return defaultKeepAlive ;
1231
1242
}
1232
1243
1233
1244
private void checkKeepAliveLimit (long keepAlive ) {
1245
+ checkKeepAliveLimit (keepAlive , maxKeepAlive );
1246
+ }
1247
+
1248
+ private static void checkKeepAliveLimit (long keepAlive , long maxKeepAlive ) {
1234
1249
if (keepAlive > maxKeepAlive ) {
1235
1250
throw new IllegalArgumentException (
1236
1251
"Keep alive for request ("
@@ -1689,6 +1704,7 @@ public void canMatch(CanMatchNodeRequest request, ActionListener<CanMatchNodeRes
1689
1704
final List <CanMatchNodeResponse .ResponseOrFailure > responses = new ArrayList <>(shardLevelRequests .size ());
1690
1705
for (var shardLevelRequest : shardLevelRequests ) {
1691
1706
try {
1707
+ // TODO remove the exception handling as it's now in canMatch itself
1692
1708
responses .add (new CanMatchNodeResponse .ResponseOrFailure (canMatch (request .createShardSearchRequest (shardLevelRequest ))));
1693
1709
} catch (Exception e ) {
1694
1710
responses .add (new CanMatchNodeResponse .ResponseOrFailure (e ));
@@ -1700,82 +1716,145 @@ public void canMatch(CanMatchNodeRequest request, ActionListener<CanMatchNodeRes
1700
1716
/**
1701
1717
* This method uses a lightweight searcher without wrapping (i.e., not open a full reader on frozen indices) to rewrite the query
1702
1718
* to check if the query can match any documents. This method can have false positives while if it returns {@code false} the query
1703
- * won't match any documents on the current shard.
1719
+ * won't match any documents on the current shard. Exceptions are handled within the method, and never re-thrown.
1704
1720
*/
1705
- public CanMatchShardResponse canMatch (ShardSearchRequest request ) throws IOException {
1706
- return canMatch (request , true );
1721
+ public CanMatchShardResponse canMatch (ShardSearchRequest request ) {
1722
+ CanMatchContext canMatchContext = new CanMatchContext (
1723
+ request ,
1724
+ indicesService ::indexServiceSafe ,
1725
+ this ::findReaderContext ,
1726
+ defaultKeepAlive ,
1727
+ maxKeepAlive
1728
+ );
1729
+ return canMatch (canMatchContext , true );
1707
1730
}
1708
1731
1709
- private CanMatchShardResponse canMatch (ShardSearchRequest request , boolean checkRefreshPending ) throws IOException {
1710
- assert request .searchType () == SearchType .QUERY_THEN_FETCH : "unexpected search type: " + request .searchType ();
1732
+ static class CanMatchContext {
1733
+ private final ShardSearchRequest request ;
1734
+ private final Function <Index , IndexService > indexServiceLookup ;
1735
+ private final BiFunction <ShardSearchContextId , TransportRequest , ReaderContext > findReaderContext ;
1736
+ private final long defaultKeepAlive ;
1737
+ private final long maxKeepAlive ;
1738
+
1739
+ private IndexService indexService ;
1740
+
1741
+ CanMatchContext (
1742
+ ShardSearchRequest request ,
1743
+ Function <Index , IndexService > indexServiceLookup ,
1744
+ BiFunction <ShardSearchContextId , TransportRequest , ReaderContext > findReaderContext ,
1745
+ long defaultKeepAlive ,
1746
+ long maxKeepAlive
1747
+ ) {
1748
+ this .request = request ;
1749
+ this .indexServiceLookup = indexServiceLookup ;
1750
+ this .findReaderContext = findReaderContext ;
1751
+ this .defaultKeepAlive = defaultKeepAlive ;
1752
+ this .maxKeepAlive = maxKeepAlive ;
1753
+ }
1754
+
1755
+ long getKeepAlive () {
1756
+ return SearchService .getKeepAlive (request , defaultKeepAlive , maxKeepAlive );
1757
+ }
1758
+
1759
+ ReaderContext findReaderContext () {
1760
+ return findReaderContext .apply (request .readerId (), request );
1761
+ }
1762
+
1763
+ QueryRewriteContext getQueryRewriteContext (IndexService indexService ) {
1764
+ return indexService .newQueryRewriteContext (request ::nowInMillis , request .getRuntimeMappings (), request .getClusterAlias ());
1765
+ }
1766
+
1767
+ SearchExecutionContext getSearchExecutionContext (Engine .Searcher searcher ) {
1768
+ return getIndexService ().newSearchExecutionContext (
1769
+ request .shardId ().id (),
1770
+ 0 ,
1771
+ searcher ,
1772
+ request ::nowInMillis ,
1773
+ request .getClusterAlias (),
1774
+ request .getRuntimeMappings ()
1775
+ );
1776
+ }
1777
+
1778
+ IndexShard getShard () {
1779
+ return getIndexService ().getShard (request .shardId ().getId ());
1780
+ }
1781
+
1782
+ IndexService getIndexService () {
1783
+ if (this .indexService == null ) {
1784
+ this .indexService = indexServiceLookup .apply (request .shardId ().getIndex ());
1785
+ }
1786
+ return this .indexService ;
1787
+ }
1788
+ }
1789
+
1790
+ static CanMatchShardResponse canMatch (CanMatchContext canMatchContext , boolean checkRefreshPending ) {
1791
+ assert canMatchContext .request .searchType () == SearchType .QUERY_THEN_FETCH
1792
+ : "unexpected search type: " + canMatchContext .request .searchType ();
1711
1793
Releasable releasable = null ;
1712
1794
try {
1713
1795
IndexService indexService ;
1714
1796
final boolean hasRefreshPending ;
1715
1797
final Engine .Searcher canMatchSearcher ;
1716
- if (request .readerId () != null ) {
1798
+ if (canMatchContext . request .readerId () != null ) {
1717
1799
hasRefreshPending = false ;
1718
1800
ReaderContext readerContext ;
1719
1801
Engine .Searcher searcher ;
1720
1802
try {
1721
- readerContext = findReaderContext (request . readerId (), request );
1722
- releasable = readerContext .markAsUsed (getKeepAlive (request ));
1803
+ readerContext = canMatchContext . findReaderContext ();
1804
+ releasable = readerContext .markAsUsed (canMatchContext . getKeepAlive ());
1723
1805
indexService = readerContext .indexService ();
1724
- if (canMatchAfterRewrite (request , indexService ) == false ) {
1806
+ QueryRewriteContext queryRewriteContext = canMatchContext .getQueryRewriteContext (indexService );
1807
+ if (queryStillMatchesAfterRewrite (canMatchContext .request , queryRewriteContext ) == false ) {
1725
1808
return new CanMatchShardResponse (false , null );
1726
1809
}
1727
1810
searcher = readerContext .acquireSearcher (Engine .CAN_MATCH_SEARCH_SOURCE );
1728
1811
} catch (SearchContextMissingException e ) {
1729
- final String searcherId = request .readerId ().getSearcherId ();
1812
+ final String searcherId = canMatchContext . request .readerId ().getSearcherId ();
1730
1813
if (searcherId == null ) {
1731
- throw e ;
1814
+ return new CanMatchShardResponse ( true , null ) ;
1732
1815
}
1733
- indexService = indicesService .indexServiceSafe (request .shardId ().getIndex ());
1734
- if (canMatchAfterRewrite (request , indexService ) == false ) {
1816
+ if (queryStillMatchesAfterRewrite (
1817
+ canMatchContext .request ,
1818
+ canMatchContext .getQueryRewriteContext (canMatchContext .getIndexService ())
1819
+ ) == false ) {
1735
1820
return new CanMatchShardResponse (false , null );
1736
1821
}
1737
- IndexShard indexShard = indexService .getShard (request .shardId ().getId ());
1738
- final Engine .SearcherSupplier searcherSupplier = indexShard .acquireSearcherSupplier ();
1822
+ final Engine .SearcherSupplier searcherSupplier = canMatchContext .getShard ().acquireSearcherSupplier ();
1739
1823
if (searcherId .equals (searcherSupplier .getSearcherId ()) == false ) {
1740
1824
searcherSupplier .close ();
1741
- throw e ;
1825
+ return new CanMatchShardResponse ( true , null ) ;
1742
1826
}
1743
1827
releasable = searcherSupplier ;
1744
1828
searcher = searcherSupplier .acquireSearcher (Engine .CAN_MATCH_SEARCH_SOURCE );
1745
1829
}
1746
1830
canMatchSearcher = searcher ;
1747
1831
} else {
1748
- indexService = indicesService .indexServiceSafe (request .shardId ().getIndex ());
1749
- if (canMatchAfterRewrite (request , indexService ) == false ) {
1832
+ if (queryStillMatchesAfterRewrite (
1833
+ canMatchContext .request ,
1834
+ canMatchContext .getQueryRewriteContext (canMatchContext .getIndexService ())
1835
+ ) == false ) {
1750
1836
return new CanMatchShardResponse (false , null );
1751
1837
}
1752
- IndexShard indexShard = indexService .getShard (request .shardId ().getId ());
1753
- boolean needsWaitForRefresh = request .waitForCheckpoint () != UNASSIGNED_SEQ_NO ;
1838
+ boolean needsWaitForRefresh = canMatchContext .request .waitForCheckpoint () != UNASSIGNED_SEQ_NO ;
1754
1839
// If this request wait_for_refresh behavior, it is safest to assume a refresh is pending. Theoretically,
1755
1840
// this can be improved in the future by manually checking that the requested checkpoint has already been refresh.
1756
1841
// However, this will request modifying the engine to surface that information.
1842
+ IndexShard indexShard = canMatchContext .getShard ();
1757
1843
hasRefreshPending = needsWaitForRefresh || (indexShard .hasRefreshPending () && checkRefreshPending );
1758
1844
canMatchSearcher = indexShard .acquireSearcher (Engine .CAN_MATCH_SEARCH_SOURCE );
1759
1845
}
1760
1846
try (canMatchSearcher ) {
1761
- SearchExecutionContext context = indexService .newSearchExecutionContext (
1762
- request .shardId ().id (),
1763
- 0 ,
1764
- canMatchSearcher ,
1765
- request ::nowInMillis ,
1766
- request .getClusterAlias (),
1767
- request .getRuntimeMappings ()
1768
- );
1769
- final boolean canMatch = queryStillMatchesAfterRewrite (request , context );
1770
- final MinAndMax <?> minMax ;
1847
+ SearchExecutionContext context = canMatchContext .getSearchExecutionContext (canMatchSearcher );
1848
+ final boolean canMatch = queryStillMatchesAfterRewrite (canMatchContext .request , context );
1771
1849
if (canMatch || hasRefreshPending ) {
1772
- FieldSortBuilder sortBuilder = FieldSortBuilder .getPrimaryFieldSortOrNull (request .source ());
1773
- minMax = sortBuilder != null ? FieldSortBuilder .getMinMaxOrNull (context , sortBuilder ) : null ;
1774
- } else {
1775
- minMax = null ;
1850
+ FieldSortBuilder sortBuilder = FieldSortBuilder .getPrimaryFieldSortOrNull (canMatchContext .request .source ());
1851
+ final MinAndMax <?> minMax = sortBuilder != null ? FieldSortBuilder .getMinMaxOrNull (context , sortBuilder ) : null ;
1852
+ return new CanMatchShardResponse (true , minMax );
1776
1853
}
1777
- return new CanMatchShardResponse (canMatch || hasRefreshPending , minMax );
1854
+ return new CanMatchShardResponse (false , null );
1778
1855
}
1856
+ } catch (Exception e ) {
1857
+ return new CanMatchShardResponse (true , null );
1779
1858
} finally {
1780
1859
Releasables .close (releasable );
1781
1860
}
@@ -1788,15 +1867,6 @@ private CanMatchShardResponse canMatch(ShardSearchRequest request, boolean check
1788
1867
* {@link MatchNoneQueryBuilder}. This allows us to avoid extra work for example making the shard search active and waiting for
1789
1868
* refreshes.
1790
1869
*/
1791
- private static boolean canMatchAfterRewrite (final ShardSearchRequest request , final IndexService indexService ) throws IOException {
1792
- final QueryRewriteContext queryRewriteContext = indexService .newQueryRewriteContext (
1793
- request ::nowInMillis ,
1794
- request .getRuntimeMappings (),
1795
- request .getClusterAlias ()
1796
- );
1797
- return queryStillMatchesAfterRewrite (request , queryRewriteContext );
1798
- }
1799
-
1800
1870
@ SuppressWarnings ("unchecked" )
1801
1871
public static boolean queryStillMatchesAfterRewrite (ShardSearchRequest request , QueryRewriteContext context ) throws IOException {
1802
1872
Rewriteable .rewrite (request .getRewriteable (), context , false );
0 commit comments