-
Can I reconnect to the node used when I catch a RedisCommandTimeoutException for a command to a Redis Cluster? We are having a problem where the old master does not respond for 10 seconds after a FAILOVER is issued to its replica. TCP packets with new requests still get acked during these 10 seconds. As the connection is clearly not dead, Lettuce keeps sending new commands to the old master. Eventually, it will receive all the MOVED response at once but this is too late for us. For our specific problem, it would be better if Lettuce reconnected to the node on command timeout as the bug only seems to affect a single TCP socket. A command on a new socket will get an immediate MOVED response, allowing Lettuce to continue on the master. I guess it could be tricky to get this right as all the requests in flight will time out at different times and we probably do not want to reconnect for each timeout. Of course, we are trying to get the underling problem with Redis resolved too, see #2572 but a work-around like this would still be useful until that gets fixed. I have checked the wiki, GitHub issues and GitHub Discussions and found #2082 which is similar but in that case, the TCP packets do not get acked, leading to another solution. I tried setting an absurdly low periodic refresh of a few hundred milliseconds but that does not seem to help, which might be a bug but I have not looked into it yet. |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 8 replies
-
Hey @e-ts , this is a tricky question. In your scenario you know that a command would time out because of the Redis instance delaying its responses due to failover, but this is a very specific failover scenario. In practice a command could time out due to many different reasons (network delay, server load, etc.) and in many of those cases the correct approach would be to resend the command to the same instance without reconnecting to the same server. Reconnect is a slow process and if we reconnect on each timeout we might drastically decrease the performance of the driver in many of those cases. You mentioned #2082, did you manage to check out the option to set a custom TCP_USER_TIMEOUT with #2499? RedisClient redisClient = RedisClient.create(RedisURI.Builder
.redis("redis.io", 12000)
.build());
SocketOptions socketOptions = SocketOptions.builder()
.tcpUserTimeout(SocketOptions.TcpUserTimeoutOptions.builder()
.enable(true)
.tcpUserTimeout(Duration.ofSeconds(3))
.build())
.keepAlive(SocketOptions.KeepAliveOptions.builder()
.interval(Duration.ofSeconds(5))
.idle(Duration.ofSeconds(5))
.count(3).enable()
.build())
.build();
redisClient.setOptions(ClientOptions.builder().socketOptions(socketOptions).build());
RedisCommands<String, String> redis = redisClient.connect(new StringCodec()).sync(); If you do go that way have in mind that - as with my previous note - this timeout might be caused by different (valid) scenarios, that do not require re-connect. You should be mindful of the value you set. But I think, from your description, it should resolve your issue if your deployment is such that the server always responds within a couple of seconds. Also have in mind that it is generally a good idea to set up a KEEPALIVE setting too. All values there are highly dependent on your application and deployment. |
Beta Was this translation helpful? Give feedback.
-
Closing this in favour of #2909 |
Beta Was this translation helpful? Give feedback.
-
Revisiting the discussion here to supply a solution where the application could force the driver to reconnect. The example below monitors for command timeouts and when they reach a certain threshold - forces a reconnect. Note This is the second solution provided in the topic. The first one was not working with multiple channels. This one should, and is gewnerally improved. package acme.lettuce;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.protocol.CompleteableCommand;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.NettyCustomizer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
public class ForceReconnectDemo {
public static final RedisURI LOCAL = RedisURI.Builder.redis("redis-xxxxx.cxxx.east-us-mz.azure.redns.redis-cloud.com",
xxxxx).withPassword("xxxxx").build();
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ForceReconnectDemo.class);
public static void main(String[] args) {
ForceReconnectDemo demo = new ForceReconnectDemo();
demo.start();
}
private void start() {
try {
// Custom channel handler that could be used to forcefully disconnect the Netty channel
// Assumes the autoReconnect() is enabled, otherwise the channel would not reconnect
// All the settings below need to be fine-tuned to your environment
// Tested against a free-tier environment deployed in https://app.redislabs.com/
// Reconnect if ...
ReconnectChannelHandler reconnectHandler = new ReconnectChannelHandler(
Duration.ofSeconds(5), // ... during the last 5 seconds ...
10, // ... there have been 5 timeouts or more ...
Duration.ofSeconds(5)); // ... and we are at least 5 seconds after the last reconnect
ClientResources resources = ClientResources.builder().nettyCustomizer(new NettyCustomizer() {
@Override
public void afterChannelInitialized(Channel channel) {
channel.pipeline().addLast(reconnectHandler);
}
}).build();
// ALL LOGIC IN THIS METHOD BELOW THIS LINE IS USED FOR DEMONSTRATION PURPOSES ONLY
RedisClient redisClient = RedisClient.create(resources, LOCAL);
redisClient.setOptions(timeoutOptions());
EventBus eventBus = redisClient.getResources().eventBus();
eventBus.get().subscribe(e -> {
logger.info("EVENT: " + e);
});
int attempts = 0;
String key = "key:" + UUID.randomUUID().getLeastSignificantBits();
RedisAsyncCommands<String, String> commands = redisClient.connect().async();
while (++attempts < 1000) {
// NOT REQUIRED - used to track overall progress of the demo
if (attempts % 100 == 0) {
logger.info("{}% ", attempts / 10);
}
// Call a command that will timeout
try{
commands.incr(key).get();
} catch ( ExecutionException e) {
if (!(e.getCause() instanceof RedisCommandTimeoutException)) {
throw e;
}
}
}
} catch (Exception e) {
logger.warn("Exception while writing to Cache: " + e.getLocalizedMessage(), e);
}
}
private ClientOptions timeoutOptions() {
TimeoutOptions timeoutOpts = TimeoutOptions.builder().timeoutCommands().fixedTimeout(Duration.ofMillis(100)).build();
return ClientOptions.builder().timeoutOptions(timeoutOpts).build();
}
/**
* The ChannelDuplexHandler used to attach to the typical flow of the driver.
*/
@ChannelHandler.Sharable
private static class ReconnectChannelHandler extends ChannelDuplexHandler {
private final Duration slidingWindow;
private final int maxTimeouts;
private final Duration quietPeriod;
private final Map<ChannelHandlerContext, ChannelAwareTimeoutHandler> handlers = new ConcurrentHashMap<>();
public ReconnectChannelHandler(Duration window, int maxTimeouts, Duration quietPeriod) {
this.slidingWindow = window;
this.maxTimeouts = maxTimeouts;
this.quietPeriod = quietPeriod;
}
/**
* Called when the channel is active.
* This is when configure the timeout handler that is going to track the timeouts on this channel.
*
* @param ctx the channel handler context
* @see #channelInactive(ChannelHandlerContext)
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.handlers.put(ctx, new ChannelAwareTimeoutHandler(ctx, slidingWindow, maxTimeouts, quietPeriod));
super.channelActive(ctx);
}
/**
* Called when the channel receives a write.
* This is when we attach the timeout callback to each command, allowing us to track the occurrence of timeouts.
*
* @param ctx the channel handler context
* @see #write(ChannelHandlerContext, Object, ChannelPromise)
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof CompleteableCommand) {
((CompleteableCommand<?>) msg).onComplete((o, o2) -> {
if (o2 instanceof RedisCommandTimeoutException ) {
this.handlers.get(ctx).timeoutDetected();
}
});
}
ctx.write(msg, promise);
}
}
/**
* The timeout handler that tracks the timeouts on a channel.
* It uses a sliding window to track the timeouts and a quiet period to avoid reconnecting too often.
*/
private static class ChannelAwareTimeoutHandler {
private final Duration slidingWindow;
private final int maxTimeouts;
private final Duration quietPeriod;
private Instant lastReconnect;
private final ChannelHandlerContext context;
private final Queue<Instant> timeouts = new LinkedList<>();
public ChannelAwareTimeoutHandler(ChannelHandlerContext ctx, Duration window, int maxTimeouts, Duration quietPeriod) {
this.slidingWindow = window;
this.maxTimeouts = maxTimeouts;
this.quietPeriod = quietPeriod;
this.lastReconnect = Instant.now();
this.context = ctx;
}
// Implements naïve synchronization
public void timeoutDetected() {
Instant now = Instant.now();
if (isInsideQuietPeriod(now)) {
logger.info("Waiting until quiet period completes before reconnecting");
return;
}
this.cleanupExpiredEvents(now);
timeouts.add(now);
if (timeouts.size() > maxTimeouts) {
logger.info("Timeout threshold exceeded, reconnecting...");
reconnect();
} else {
logger.info("Timeout detected, threshold to reconnect not yet met.");
}
}
private boolean isInsideQuietPeriod(Instant now) {
return lastReconnect.plus(quietPeriod).isAfter(now);
}
private void cleanupExpiredEvents(Instant now) {
Instant cutoff = now.minus(slidingWindow);
while (!timeouts.isEmpty() && timeouts.peek().isBefore(cutoff)) {
timeouts.poll();
}
}
private void reconnect() {
lastReconnect = Instant.now();
context.channel().close();
this.timeouts.clear();
}
}
}
|
Beta Was this translation helpful? Give feedback.
My bad, missed the fact the packages are being acknowledged. You are right,
TCP_USER_TIMEOUT
is useless in this case.