Skip to content

Improve connection pool concurrent access #1035

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 15, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
Expand Down Expand Up @@ -61,7 +66,8 @@ public class ConnectionPoolImpl implements ConnectionPool
private final MetricsListener metricsListener;
private final boolean ownsEventLoopGroup;

private final ConcurrentMap<BoltServerAddress,ExtendedChannelPool> pools = new ConcurrentHashMap<>();
private final ReadWriteLock addressToPoolLock = new ReentrantReadWriteLock();
private final Map<BoltServerAddress,ExtendedChannelPool> addressToPool = new HashMap<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private final ConnectionFactory connectionFactory;
Expand Down Expand Up @@ -124,25 +130,32 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
@Override
public void retainAll( Set<BoltServerAddress> addressesToRetain )
{
for ( BoltServerAddress address : pools.keySet() )
executeWithLock( addressToPoolLock.writeLock(), () ->
{
if ( !addressesToRetain.contains( address ) )
Iterator<Map.Entry<BoltServerAddress,ExtendedChannelPool>> entryIterator = addressToPool.entrySet().iterator();
while ( entryIterator.hasNext() )
{
int activeChannels = nettyChannelTracker.inUseChannelCount( address );
if ( activeChannels == 0 )
Map.Entry<BoltServerAddress,ExtendedChannelPool> entry = entryIterator.next();
BoltServerAddress address = entry.getKey();
if ( !addressesToRetain.contains( address ) )
{
// address is not present in updated routing table and has no active connections
// it's now safe to terminate corresponding connection pool and forget about it
ExtendedChannelPool pool = pools.remove( address );
if ( pool != null )
int activeChannels = nettyChannelTracker.inUseChannelCount( address );
if ( activeChannels == 0 )
{
log.info( "Closing connection pool towards %s, it has no active connections " +
"and is not in the routing table registry.", address );
closePoolInBackground( address, pool );
// address is not present in updated routing table and has no active connections
// it's now safe to terminate corresponding connection pool and forget about it
ExtendedChannelPool pool = entry.getValue();
entryIterator.remove();
if ( pool != null )
{
log.info( "Closing connection pool towards %s, it has no active connections " +
"and is not in the routing table registry.", address );
closePoolInBackground( address, pool );
}
}
}
}
}
} );
}

@Override
Expand All @@ -163,35 +176,40 @@ public CompletionStage<Void> close()
if ( closed.compareAndSet( false, true ) )
{
nettyChannelTracker.prepareToCloseChannels();
CompletableFuture<Void> allPoolClosedFuture = closeAllPools();

// We can only shutdown event loop group when all netty pools are fully closed,
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
allPoolClosedFuture.whenComplete( ( ignored, pollCloseError ) -> {
pools.clear();
if ( !ownsEventLoopGroup )
{
completeWithNullIfNoError( closeFuture, pollCloseError );
}
else
{
shutdownEventLoopGroup( pollCloseError );
}
} );
executeWithLockAsync( addressToPoolLock.writeLock(),
() ->
{
// We can only shutdown event loop group when all netty pools are fully closed,
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
return closeAllPools().whenComplete(
( ignored, pollCloseError ) ->
{
addressToPool.clear();
if ( !ownsEventLoopGroup )
{
completeWithNullIfNoError( closeFuture, pollCloseError );
}
else
{
shutdownEventLoopGroup( pollCloseError );
}
} );
} );
}
return closeFuture;
}

@Override
public boolean isOpen( BoltServerAddress address )
{
return pools.containsKey( address );
return executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.containsKey( address ) );
}

@Override
public String toString()
{
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
return executeWithLock( addressToPoolLock.readLock(), () -> "ConnectionPoolImpl{" + "pools=" + addressToPool + '}' );
}

private void processAcquisitionError( ExtendedChannelPool pool, BoltServerAddress serverAddress, Throwable error )
Expand Down Expand Up @@ -237,15 +255,15 @@ private void assertNotClosed( BoltServerAddress address, Channel channel, Extend
{
pool.release( channel );
closePoolInBackground( address, pool );
pools.remove( address );
executeWithLock( addressToPoolLock.writeLock(), () -> addressToPool.remove( address ) );
assertNotClosed();
}
}

// for testing only
ExtendedChannelPool getPool( BoltServerAddress address )
{
return pools.get( address );
return executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.get( address ) );
}

ExtendedChannelPool newPool( BoltServerAddress address )
Expand All @@ -256,12 +274,22 @@ ExtendedChannelPool newPool( BoltServerAddress address )

private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
{
return pools.computeIfAbsent( address, ignored -> {
ExtendedChannelPool pool = newPool( address );
// before the connection pool is added I can add the metrics for the pool.
metricsListener.putPoolMetrics( pool.id(), address, this );
return pool;
} );
ExtendedChannelPool existingPool = executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.get( address ) );
return existingPool != null
? existingPool
: executeWithLock( addressToPoolLock.writeLock(),
() ->
{
ExtendedChannelPool pool = addressToPool.get( address );
if ( pool == null )
{
pool = newPool( address );
// before the connection pool is added I can add the metrics for the pool.
metricsListener.putPoolMetrics( pool.id(), address, this );
addressToPool.put( address, pool );
}
return pool;
} );
}

private CompletionStage<Void> closePool( ExtendedChannelPool pool )
Expand Down Expand Up @@ -303,12 +331,45 @@ private void shutdownEventLoopGroup( Throwable pollCloseError )
private CompletableFuture<Void> closeAllPools()
{
return CompletableFuture.allOf(
pools.entrySet().stream().map( entry -> {
BoltServerAddress address = entry.getKey();
ExtendedChannelPool pool = entry.getValue();
log.info( "Closing connection pool towards %s", address );
// Wait for all pools to be closed.
return closePool( pool ).toCompletableFuture();
} ).toArray( CompletableFuture[]::new ) );
addressToPool.entrySet().stream()
.map( entry ->
{
BoltServerAddress address = entry.getKey();
ExtendedChannelPool pool = entry.getValue();
log.info( "Closing connection pool towards %s", address );
// Wait for all pools to be closed.
return closePool( pool ).toCompletableFuture();
} )
.toArray( CompletableFuture[]::new ) );
}

private void executeWithLock( Lock lock, Runnable runnable )
{
executeWithLock( lock, () ->
{
runnable.run();
return null;
} );
}

private <T> T executeWithLock( Lock lock, Supplier<T> supplier )
{
lock.lock();
try
{
return supplier.get();
}
finally
{
lock.unlock();
}
}

private <T> void executeWithLockAsync( Lock lock, Supplier<CompletionStage<T>> stageSupplier )
{
lock.lock();
CompletableFuture.completedFuture( lock )
.thenCompose( ignored -> stageSupplier.get() )
.whenComplete( ( ignored, throwable ) -> lock.unlock() );
}
}