Skip to content

Routing context in URI #346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.security.GeneralSecurityException;

import org.neo4j.driver.internal.cluster.LoadBalancer;
import org.neo4j.driver.internal.cluster.RoutingContext;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.net.SocketConnector;
Expand Down Expand Up @@ -54,14 +55,14 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
RetrySettings retrySettings, Config config )
{
BoltServerAddress address = BoltServerAddress.from( uri );
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
SecurityPlan securityPlan = createSecurityPlan( address, config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
RetryLogic retryLogic = createRetryLogic( retrySettings, config.logging() );

try
{
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan,
retryLogic );
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic );
}
catch ( Throwable driverError )
{
Expand All @@ -78,12 +79,15 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
}
}

private Driver createDriver( BoltServerAddress address, String scheme, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
RetryLogic retryLogic )
{
switch ( scheme.toLowerCase() )
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
{
case "bolt":
assertNoRoutingContext( uri, routingSettings );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic );
case "bolt+routing":
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
Expand Down Expand Up @@ -260,4 +264,14 @@ private static SecurityPlan createSecurityPlanImpl( BoltServerAddress address, C
return insecure();
}
}

private static void assertNoRoutingContext( URI uri, RoutingSettings routingSettings )
{
RoutingContext routingContext = routingSettings.routingContext();
if ( routingContext.isDefined() )
{
throw new IllegalArgumentException(
"Routing parameters are not supported with scheme 'bolt'. Given URI: '" + uri + "'" );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private static Rediscovery createRediscovery( BoltServerAddress initialRouter, R
Clock clock, Logger log )
{
ClusterCompositionProvider clusterComposition =
new GetServersProcedureClusterCompositionProvider( clock, log, settings );
new RoutingProcedureClusterCompositionProvider( clock, log, settings );
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition, new DnsResolver( log ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections,
{
int failures = 0;

for ( long start = clock.millis(), delay = 0; ; delay = Math.max( settings.retryTimeoutDelay, delay * 2 ) )
for ( long start = clock.millis(), delay = 0; ; delay = Math.max( settings.retryTimeoutDelay(), delay * 2 ) )
{
long waitTime = start + delay - clock.millis();
sleep( waitTime );
Expand All @@ -71,7 +71,7 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections,
return composition;
}

if ( ++failures >= settings.maxRoutingFailures )
if ( ++failures >= settings.maxRoutingFailures() )
{
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;

public class RoutingContext
{
public static final RoutingContext EMPTY = new RoutingContext();

private final Map<String,String> context;

private RoutingContext()
{
this.context = emptyMap();
}

public RoutingContext( URI uri )
{
this.context = unmodifiableMap( parseParameters( uri ) );
}

public boolean isDefined()
{
return !context.isEmpty();
}

public Map<String,String> asMap()
{
return context;
}

private static Map<String,String> parseParameters( URI uri )
{
String query = uri.getQuery();

if ( query == null || query.isEmpty() )
{
return emptyMap();
}

Map<String,String> parameters = new HashMap<>();
String[] pairs = query.split( "&" );
for ( String pair : pairs )
{
String[] keyValue = pair.split( "=" );
if ( keyValue.length != 2 )
{
throw new IllegalArgumentException(
"Invalid parameters: '" + pair + "' in URI '" + uri + "'" );
}

String key = keyValue[0];
String value = keyValue[1];
String previousValue = parameters.put( key, value );

if ( previousValue != null )
{
throw new IllegalArgumentException(
"Duplicated query parameters with key '" + key + "' in URI '" + uri + "'" );
}
}
return parameters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,28 @@
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ProtocolException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.value.ValueException;

import static java.lang.String.format;

public class GetServersProcedureClusterCompositionProvider implements ClusterCompositionProvider
public class RoutingProcedureClusterCompositionProvider implements ClusterCompositionProvider
{

private final String PROTOCOL_ERROR_MESSAGE = "Failed to parse `%s' result received from server due to ";
private static final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '%s' result received from server due to ";

private final Clock clock;
private final Logger log;
private final GetServersProcedureRunner getServersRunner;
private final RoutingProcedureRunner getServersRunner;

public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingSettings settings )
public RoutingProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingSettings settings )
{
this( clock, log, new GetServersProcedureRunner( settings.routingParameters ) );
this( clock, log, new RoutingProcedureRunner( settings.routingContext() ) );
}

GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, GetServersProcedureRunner getServersRunner )
RoutingProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingProcedureRunner getServersRunner )
{
this.clock = clock;
this.log = log;
Expand All @@ -67,7 +67,7 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
return new ClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
"Failed to run '%s' on server. " +
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.",
getServersRunner.procedureCalled() ), e
invokedProcedureString() ), e
) );
}

Expand All @@ -78,9 +78,8 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
if ( records.size() != 1 )
{
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
PROTOCOL_ERROR_MESSAGE +
"records received '%s' is too few or too many.", getServersRunner.procedureCalled(),
records.size() ) ) );
PROTOCOL_ERROR_MESSAGE + "records received '%s' is too few or too many.",
invokedProcedureString(), records.size() ) ) );
}

// failed to parse the record
Expand All @@ -92,19 +91,25 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
catch ( ValueException e )
{
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
PROTOCOL_ERROR_MESSAGE +
"unparsable record received.", getServersRunner.procedureCalled() ), e ) );
PROTOCOL_ERROR_MESSAGE + "unparsable record received.",
invokedProcedureString() ), e ) );
}

// the cluster result is not a legal reply
if ( !cluster.hasRoutersAndReaders() )
{
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
PROTOCOL_ERROR_MESSAGE +
"no router or reader found in response.", getServersRunner.procedureCalled() ) ) );
PROTOCOL_ERROR_MESSAGE + "no router or reader found in response.",
invokedProcedureString() ) ) );
}

// all good
return new ClusterCompositionResponse.Success( cluster );
}

private String invokedProcedureString()
{
Statement statement = getServersRunner.invokedProcedure();
return statement.text() + " " + statement.parameters();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.driver.internal.cluster;

import java.util.List;
import java.util.Map;

import org.neo4j.driver.internal.NetworkSession;
import org.neo4j.driver.internal.spi.Connection;
Expand All @@ -32,42 +31,42 @@
import static org.neo4j.driver.internal.util.ServerVersion.version;
import static org.neo4j.driver.v1.Values.parameters;

public class GetServersProcedureRunner
public class RoutingProcedureRunner
{
static final String GET_SERVERS = "dbms.cluster.routing.getServers";
static final String GET_ROUTING_TABLE_PARAM = "context";
static final String GET_ROUTING_TABLE = "dbms.cluster.routing.getRoutingTable({" + GET_ROUTING_TABLE_PARAM + "})";

private final Map<String, String> routingContext;
private Statement procedureCalled;
private final RoutingContext context;
private Statement invokedProcedure;

public GetServersProcedureRunner( Map<String, String> context )
public RoutingProcedureRunner( RoutingContext context )
{
this.routingContext = context;
this.context = context;
}

public List<Record> run( Connection connection )
{
if( version( connection.server().version() ).greaterThanOrEqual( v3_2_0 ) )
{
procedureCalled = new Statement( "CALL " + GET_ROUTING_TABLE,
parameters(GET_ROUTING_TABLE_PARAM, routingContext ) );
invokedProcedure = new Statement( "CALL " + GET_ROUTING_TABLE,
parameters( GET_ROUTING_TABLE_PARAM, context.asMap() ) );
}
else
{
procedureCalled = new Statement("CALL " + GET_SERVERS );
invokedProcedure = new Statement( "CALL " + GET_SERVERS );
}

return runProcedure( connection, procedureCalled );
return runProcedure( connection, invokedProcedure );
}

List<Record> runProcedure( Connection connection, Statement procedure )
{
return NetworkSession.run( connection, procedure, NO_OP ).list();
}

Statement procedureCalled()
Statement invokedProcedure()
{
return procedureCalled;
return invokedProcedure;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,41 @@
*/
package org.neo4j.driver.internal.cluster;

import java.util.Map;

public class RoutingSettings
{
final int maxRoutingFailures;
final long retryTimeoutDelay;
final Map<String, String> routingParameters;
private final int maxRoutingFailures;
private final long retryTimeoutDelay;
private final RoutingContext routingContext;

public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay )
{
this( maxRoutingFailures, retryTimeoutDelay, RoutingContext.EMPTY );
}

public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay, Map<String, String> routingParameters )
public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay, RoutingContext routingContext )
{
this.maxRoutingFailures = maxRoutingFailures;
this.retryTimeoutDelay = retryTimeoutDelay;
this.routingParameters = routingParameters;
this.routingContext = routingContext;
}

public RoutingSettings withRoutingContext( RoutingContext newRoutingContext )
{
return new RoutingSettings( maxRoutingFailures, retryTimeoutDelay, newRoutingContext );
}

public int maxRoutingFailures()
{
return maxRoutingFailures;
}

public long retryTimeoutDelay()
{
return retryTimeoutDelay;
}

public RoutingContext routingContext()
{
return routingContext;
}
}
Loading