Skip to content

Tcp Keepalive for Linux (NET8.0) #1120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/NetMQ.Tests/ClientServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public async void AsyncWithCancellationToken()
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await server.ReceiveStringAsync(source.Token));
}

#if NETCOREAPP3_1
#if NETSTANDARD2_1_OR_GREATER

[Fact(Timeout = 120)]
public async void AsyncEnumerableCanceled()
Expand Down
4 changes: 2 additions & 2 deletions src/NetMQ.Tests/SocketOptionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public void GetAndSetAllProperties()
socket.Options.TcpKeepalive = true;
Assert.True(socket.Options.TcpKeepalive);

// socket.Options.TcpKeepaliveCnt = 100;
// Assert.Equal(100, socket.Options.TcpKeepaliveCnt);
socket.Options.TcpKeepaliveCnt = 100;
Assert.Equal(100, socket.Options.TcpKeepaliveCnt);

socket.Options.TcpKeepaliveIdle = TimeSpan.FromMilliseconds(100);
Assert.Equal(TimeSpan.FromMilliseconds(100), socket.Options.TcpKeepaliveIdle);
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ/Annotations.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements. The .NET Foundation licenses this file to you under the MIT license. See the LICENSE.md file in the project root for more information.

#if !NETSTANDARD2_1
#if !NET

namespace System.Diagnostics.CodeAnalysis
{
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ/AsyncReceiveExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#if NETSTANDARD2_0 || NETSTANDARD2_1 || NET47
#if NET || NET47

using System;
using System.Collections.Generic;
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ/Core/Mechanisms/CurveClientMechanism.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ PullMsgResult ProduceInitiate(ref Msg msg)

VouchNoncePrefix.CopyTo(vouchNonce);
using var rng = RandomNumberGenerator.Create();
#if NETSTANDARD2_1
#if NET
rng.GetBytes(vouchNonce.Slice(8));
#else
byte[] temp = new byte[16];
Expand Down
4 changes: 2 additions & 2 deletions src/NetMQ/Core/Mechanisms/CurveServerMechanism.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ PullMsgResult ProduceWelcome(ref Msg msg)
// 8-byte prefix plus 16-byte random nonce
CookieNoncePrefix.CopyTo(cookieNonce);
using var rng = RandomNumberGenerator.Create();
#if NETSTANDARD2_1
#if NET
rng.GetBytes(cookieNonce.Slice(8));
#else
byte[] temp = new byte[16];
Expand All @@ -184,7 +184,7 @@ PullMsgResult ProduceWelcome(ref Msg msg)
// Create full nonce for encryption
// 8-byte prefix plus 16-byte random nonce
WelcomeNoncePrefix.CopyTo(welcomeNonce);
#if NETSTANDARD2_1
#if NET
rng.GetBytes(welcomeNonce.Slice(8));
#else
rng.GetBytes(temp);
Expand Down
7 changes: 7 additions & 0 deletions src/NetMQ/Core/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,10 @@ public void SetSocketOption(ZmqSocketOption option, object? optionValue)
TcpKeepalive = tcpKeepalive;
break;

case ZmqSocketOption.TcpKeepaliveCnt:
TcpKeepaliveCnt = Get<int>();
break;

case ZmqSocketOption.DelayAttachOnConnect:
DelayAttachOnConnect = Get<bool>();
break;
Expand Down Expand Up @@ -615,6 +619,9 @@ public void SetSocketOption(ZmqSocketOption option, object? optionValue)
case ZmqSocketOption.TcpKeepalive:
return TcpKeepalive;

case ZmqSocketOption.TcpKeepaliveCnt:
return TcpKeepaliveCnt;

case ZmqSocketOption.DelayAttachOnConnect:
return DelayAttachOnConnect;

Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ/Core/Transports/Pgm/PgmSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ internal void Init()
Debug.WriteLine(xMsg);

// If running on Microsoft Windows, suggest to the developer that he may need to install MSMQ in order to get PGM socket support.
#if NETSTANDARD1_1_OR_GREATER
#if NET
bool isWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
#else
bool isWindows = true;
Expand Down
18 changes: 15 additions & 3 deletions src/NetMQ/Core/Transports/Tcp/TcpConnector.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private void StartConnecting()
// TerminatingException can occur in above call to EventConnectDelayed via
// MonitorEvent.Write if corresponding PairSocket has been sent Term command
catch (TerminatingException)
{}
{ }
}

/// <summary>
Expand Down Expand Up @@ -236,9 +236,12 @@ public void OutCompleted(SocketError socketError, int bytesTransferred)
m_ioObject.RemoveSocket(m_s);
m_handleValid = false;

try {
try
{
m_s.NoDelay = true;
} catch (ArgumentException) {
}
catch (ArgumentException)
{
// OSX sometime fail while the socket is still connecting
}

Expand All @@ -248,6 +251,14 @@ public void OutCompleted(SocketError socketError, int bytesTransferred)
// Set the TCP keep-alive option values to the underlying socket.
m_s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, m_options.TcpKeepalive);

#if NETCOREAPP3_1_OR_GREATER
if (m_options.TcpKeepaliveIdle != -1)
m_s.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, m_options.TcpKeepaliveIdle / 1000);
if (m_options.TcpKeepaliveIntvl != -1)
m_s.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, m_options.TcpKeepaliveIntvl / 1000);
if (m_options.TcpKeepaliveCnt != -1)
m_s.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, m_options.TcpKeepaliveCnt);
#else
if (m_options.TcpKeepaliveIdle != -1 && m_options.TcpKeepaliveIntvl != -1)
{
// Write the TCP keep-alive options to a byte-array, to feed to the IOControl method..
Expand All @@ -261,6 +272,7 @@ public void OutCompleted(SocketError socketError, int bytesTransferred)

m_s.IOControl(IOControlCode.KeepAliveValues, (byte[])bytes, null);
}
#endif
}

// Create the engine object for this connection.
Expand Down
111 changes: 61 additions & 50 deletions src/NetMQ/Core/Transports/Tcp/TcpListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ You should have received a copy of the GNU Lesser General Public License
using System;
using System.Diagnostics;
using System.Net.Sockets;
#if NETSTANDARD2_0 || NETSTANDARD2_1
#if NET
using System.Runtime.InteropServices;
#endif
using AsyncIO;
Expand All @@ -45,12 +45,12 @@ internal class TcpListener : Own, IProactorEvents
/// </summary>
private AsyncSocket? m_handle;

/*
/// <summary>
/// socket being accepted
/// </summary>
private AsyncSocket m_acceptedSocket;
*/
/*
/// <summary>
/// socket being accepted
/// </summary>
private AsyncSocket m_acceptedSocket;
*/

/// <summary>
/// Socket the listener belongs to.
Expand Down Expand Up @@ -107,7 +107,7 @@ protected override void ProcessPlug()
protected override void ProcessTerm(int linger)
{
Assumes.NotNull(m_handle);

m_ioObject.SetHandler(this);
m_ioObject.RemoveSocket(m_handle);
Close();
Expand Down Expand Up @@ -141,7 +141,7 @@ public virtual void SetAddress(string addr)
}
}

#if NETSTANDARD2_0 || NETSTANDARD2_1
#if NET
// This command is failing on linux
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
m_handle.ExclusiveAddressUse = false;
Expand Down Expand Up @@ -194,68 +194,79 @@ public void InCompleted(SocketError socketError, int bytesTransferred)
switch (socketError)
{
case SocketError.Success:
{
// TODO: check TcpFilters
var acceptedSocket = m_handle.GetAcceptedSocket();
{
// TODO: check TcpFilters
var acceptedSocket = m_handle.GetAcceptedSocket();

acceptedSocket.NoDelay = true;

if (m_options.TcpKeepalive != -1)
{
acceptedSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, m_options.TcpKeepalive);

if (m_options.TcpKeepaliveIdle != -1 && m_options.TcpKeepaliveIntvl != -1)
if (m_options.TcpKeepalive != -1)
{
var bytes = new ByteArraySegment(new byte[12]);
acceptedSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, m_options.TcpKeepalive);
#if NETCOREAPP3_1_OR_GREATER
if (m_options.TcpKeepaliveIdle != -1)
acceptedSocket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, m_options.TcpKeepaliveIdle / 1000);
if (m_options.TcpKeepaliveIntvl != -1)
acceptedSocket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, m_options.TcpKeepaliveIntvl / 1000);
if (m_options.TcpKeepaliveCnt != -1)
acceptedSocket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, m_options.TcpKeepaliveCnt);
#else

Endianness endian = BitConverter.IsLittleEndian ? Endianness.Little : Endianness.Big;
if (m_options.TcpKeepaliveIdle != -1 && m_options.TcpKeepaliveIntvl != -1)
{
var bytes = new ByteArraySegment(new byte[12]);

bytes.PutInteger(endian, m_options.TcpKeepalive, 0);
bytes.PutInteger(endian, m_options.TcpKeepaliveIdle, 4);
bytes.PutInteger(endian, m_options.TcpKeepaliveIntvl, 8);
Endianness endian = BitConverter.IsLittleEndian ? Endianness.Little : Endianness.Big;

bytes.PutInteger(endian, m_options.TcpKeepalive, 0);
bytes.PutInteger(endian, m_options.TcpKeepaliveIdle, 4);
bytes.PutInteger(endian, m_options.TcpKeepaliveIntvl, 8);

acceptedSocket.IOControl(IOControlCode.KeepAliveValues, (byte[])bytes, null);
}
#endif

acceptedSocket.IOControl(IOControlCode.KeepAliveValues, (byte[])bytes, null);
}
}

// Create the engine object for this connection.
var engine = new StreamEngine(acceptedSocket, m_options, m_endpoint);
// Create the engine object for this connection.
var engine = new StreamEngine(acceptedSocket, m_options, m_endpoint);

// Choose I/O thread to run connector in. Given that we are already
// running in an I/O thread, there must be at least one available.
IOThread? ioThread = ChooseIOThread(m_options.Affinity);
// Choose I/O thread to run connector in. Given that we are already
// running in an I/O thread, there must be at least one available.
IOThread? ioThread = ChooseIOThread(m_options.Affinity);

Assumes.NotNull(ioThread);
Assumes.NotNull(ioThread);

// Create and launch a session object.
// TODO: send null in address parameter, is unneeded in this case
SessionBase session = SessionBase.Create(ioThread, false, m_socket, m_options, new Address(m_handle.LocalEndPoint));
session.IncSeqnum();
LaunchChild(session);
// Create and launch a session object.
// TODO: send null in address parameter, is unneeded in this case
SessionBase session = SessionBase.Create(ioThread, false, m_socket, m_options, new Address(m_handle.LocalEndPoint));
session.IncSeqnum();
LaunchChild(session);

SendAttach(session, engine, false);
SendAttach(session, engine, false);

m_socket.EventAccepted(m_endpoint, acceptedSocket);
m_socket.EventAccepted(m_endpoint, acceptedSocket);

Accept();
break;
}
Accept();
break;

}
case SocketError.ConnectionReset:
case SocketError.NoBufferSpaceAvailable:
case SocketError.TooManyOpenSockets:
{
m_socket.EventAcceptFailed(m_endpoint, socketError.ToErrorCode());
{
m_socket.EventAcceptFailed(m_endpoint, socketError.ToErrorCode());

Accept();
break;
}
Accept();
break;
}
default:
{
NetMQException exception = NetMQException.Create(socketError);
{
NetMQException exception = NetMQException.Create(socketError);

m_socket.EventAcceptFailed(m_endpoint, exception.ErrorCode);
throw exception;
}
m_socket.EventAcceptFailed(m_endpoint, exception.ErrorCode);
throw exception;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ/Core/Utils/OpCode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static bool Open()
string val = Environment.GetEnvironmentVariable("NETQM_SUPPRESS_RDTSC");
if (!string.IsNullOrEmpty(val))
return false;
#if NETSTANDARD1_1_OR_GREATER || NET471_OR_GREATER
#if NET
if (RuntimeInformation.ProcessArchitecture != Architecture.X86 &&
RuntimeInformation.ProcessArchitecture != Architecture.X64)
{
Expand Down
2 changes: 1 addition & 1 deletion src/NetMQ/Core/Utils/SpanUtility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ internal static class SpanUtility
{
public static string ToAscii(Span<byte> bytes)
{
#if NETSTANDARD2_1
#if NET
return Encoding.ASCII.GetString(bytes);
#else
return Encoding.ASCII.GetString(bytes.ToArray());
Expand Down
Loading