admin管理员组

文章数量:1431391

I need to wait for replies for UDP broadcast with timeout, so I try ReadAsync with cancellation token and Threading.Timer to signal cancellation. There are two other computers, which reply to the broadcast. If I use just Read, I see that replies are coming at 0.05 and 0.27 s after transmission. When using ReadAsync and cancelling at time T after broadcast, replies are reported at T and T-0.22, for T 0.5, 1 and 1.5 s. In other words, reply from Comp1 is always reported as arriving at the time of cancellation, and reply from Comp2 is always reported at about 22 ms earlier.

It is very strange that time when reply is received depends on the timeout, any idea why and how to get replies as soon as possible regardless of timeout?

I am using .NET 8 Windows application, just a form with a button:

private async void button_Click(object sender, EventArgs e) {
    udp2 u = new udp2(local);
    List<udp2.Reply> rs = await u.Broadcast_as("hPC",TimeSpan.FromSeconds(waittime_sec)); 
}
public class udp2
{
    public readonly struct Reply {
        public readonly EndPoint From;
        public readonly DateTime When;
        public readonly string ReplyString;
        public Reply(EndPoint rrom, DateTime when, string reply) {
            From = rrom;
            When = when;
            ReplyString = reply;
        }
    }
    
    private readonly Socket m_socket;
    private readonly List<Reply> m_replies;
    public IList<Reply> Replies => m_replies;
    
    public udp2(IPEndPoint local) {
        m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
        m_socket.Bind(local);
        m_socket.EnableBroadcast = true;
        m_replies = new List<Reply>();
    }
    public void Shutdown() { if (m_socket.IsBound) m_socket.Shutdown(SocketShutdown.Both); m_socket.Dispose();}
    public async Task<List<Reply>> Broadcast_as(string command, TimeSpan timeout) {
        List<Reply> replies = [];
        byte[] dg = Encoding.ASCII.GetBytes(command);
        IPEndPoint dst = new(IPAddress.Broadcast, 4321);
        
        m_socket.SendTo(dg,dst);// m_client.Send(dg,"255.255.255.255",4321);
        byte[] rq = new byte[1024];
        t_bs x = new t_bs();
        CancellationToken ct = x.GetToken();
        bool cancelled = false;
        Stopwatch sw = Stopwatch.StartNew();
        using (Timer t = new Timer(t_b,x,(int)timeout.TotalMilliseconds,Timeout.Infinite)) {
            do {
                IPEndPoint remote = new IPEndPoint(IPAddress.Any,0);
                EndPoint epremote = (EndPoint)remote;
                try { 
                    SocketReceiveFromResult rres = await m_socket.ReceiveFromAsync(rq,epremote, ct);
                
                    if (rres.ReceivedBytes>0) {
                        Reply re = new Reply(rres.RemoteEndPoint,DateTime.Now,Encoding.ASCII.GetString(rq,0,rres.ReceivedBytes));
                        replies.Add(re);
                    } 
                }
                catch (OperationCanceledException) { cancelled = true; sw.Stop();}
                
            } while (!cancelled);
        }
        Debug.WriteLine("cancelled after "+sw.ElapsedMilliseconds+" ms");
        return replies;
    }
    public List<Reply> Broadcast_syn(string command, TimeSpan timeout) {
        List<Reply> replies = new List<Reply>();
        byte[] dg = Encoding.ASCII.GetBytes(command);
        IPEndPoint dst = new IPEndPoint(IPAddress.Broadcast,4321);
        
        
        m_socket.SendTo(dg,dst);// m_client.Send(dg,"255.255.255.255",4321);
        byte[] rq = new byte[1024];
        Closocket x = new Closocket(m_socket);
        bool cancelled = false;
        using (Timer t = new Timer(t_clo,x,(int)timeout.TotalMilliseconds,System.Threading.Timeout.Infinite)) {
            do {
                IPEndPoint remote = new IPEndPoint(IPAddress.Any,0);
                EndPoint epremote = (EndPoint)remote;
                try { 
                    int recount = m_socket.ReceiveFrom(rq,ref epremote);
                
                    if (recount>0) {
                        Reply re = new Reply(epremote,DateTime.Now,Encoding.ASCII.GetString(rq,0,recount));
                        replies.Add(re);
                    }
                }
                catch (OperationCanceledException) { cancelled = true; }
                catch (Exception ex) { Debug.WriteLine(ex.Message); cancelled = true;}
                
            } while (!cancelled);
        }
        
        return replies;
    }
    private class t_bs {
        private int m_cnt;
        private CancellationTokenSource m_cts;
        public t_bs() {
            m_cts = new CancellationTokenSource();
            m_cnt = 0;
        }
        public CancellationToken GetToken() { return m_cts.Token; }
        public void ReqCancellation() { if (m_cnt==0) m_cts.Cancel(); m_cnt=1;}
    };
    private void t_b(Object? o) {
        if (o is t_bs tbs)
            tbs.ReqCancellation();
    }
}

I need to wait for replies for UDP broadcast with timeout, so I try ReadAsync with cancellation token and Threading.Timer to signal cancellation. There are two other computers, which reply to the broadcast. If I use just Read, I see that replies are coming at 0.05 and 0.27 s after transmission. When using ReadAsync and cancelling at time T after broadcast, replies are reported at T and T-0.22, for T 0.5, 1 and 1.5 s. In other words, reply from Comp1 is always reported as arriving at the time of cancellation, and reply from Comp2 is always reported at about 22 ms earlier.

It is very strange that time when reply is received depends on the timeout, any idea why and how to get replies as soon as possible regardless of timeout?

I am using .NET 8 Windows application, just a form with a button:

private async void button_Click(object sender, EventArgs e) {
    udp2 u = new udp2(local);
    List<udp2.Reply> rs = await u.Broadcast_as("hPC",TimeSpan.FromSeconds(waittime_sec)); 
}
public class udp2
{
    public readonly struct Reply {
        public readonly EndPoint From;
        public readonly DateTime When;
        public readonly string ReplyString;
        public Reply(EndPoint rrom, DateTime when, string reply) {
            From = rrom;
            When = when;
            ReplyString = reply;
        }
    }
    
    private readonly Socket m_socket;
    private readonly List<Reply> m_replies;
    public IList<Reply> Replies => m_replies;
    
    public udp2(IPEndPoint local) {
        m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
        m_socket.Bind(local);
        m_socket.EnableBroadcast = true;
        m_replies = new List<Reply>();
    }
    public void Shutdown() { if (m_socket.IsBound) m_socket.Shutdown(SocketShutdown.Both); m_socket.Dispose();}
    public async Task<List<Reply>> Broadcast_as(string command, TimeSpan timeout) {
        List<Reply> replies = [];
        byte[] dg = Encoding.ASCII.GetBytes(command);
        IPEndPoint dst = new(IPAddress.Broadcast, 4321);
        
        m_socket.SendTo(dg,dst);// m_client.Send(dg,"255.255.255.255",4321);
        byte[] rq = new byte[1024];
        t_bs x = new t_bs();
        CancellationToken ct = x.GetToken();
        bool cancelled = false;
        Stopwatch sw = Stopwatch.StartNew();
        using (Timer t = new Timer(t_b,x,(int)timeout.TotalMilliseconds,Timeout.Infinite)) {
            do {
                IPEndPoint remote = new IPEndPoint(IPAddress.Any,0);
                EndPoint epremote = (EndPoint)remote;
                try { 
                    SocketReceiveFromResult rres = await m_socket.ReceiveFromAsync(rq,epremote, ct);
                
                    if (rres.ReceivedBytes>0) {
                        Reply re = new Reply(rres.RemoteEndPoint,DateTime.Now,Encoding.ASCII.GetString(rq,0,rres.ReceivedBytes));
                        replies.Add(re);
                    } 
                }
                catch (OperationCanceledException) { cancelled = true; sw.Stop();}
                
            } while (!cancelled);
        }
        Debug.WriteLine("cancelled after "+sw.ElapsedMilliseconds+" ms");
        return replies;
    }
    public List<Reply> Broadcast_syn(string command, TimeSpan timeout) {
        List<Reply> replies = new List<Reply>();
        byte[] dg = Encoding.ASCII.GetBytes(command);
        IPEndPoint dst = new IPEndPoint(IPAddress.Broadcast,4321);
        
        
        m_socket.SendTo(dg,dst);// m_client.Send(dg,"255.255.255.255",4321);
        byte[] rq = new byte[1024];
        Closocket x = new Closocket(m_socket);
        bool cancelled = false;
        using (Timer t = new Timer(t_clo,x,(int)timeout.TotalMilliseconds,System.Threading.Timeout.Infinite)) {
            do {
                IPEndPoint remote = new IPEndPoint(IPAddress.Any,0);
                EndPoint epremote = (EndPoint)remote;
                try { 
                    int recount = m_socket.ReceiveFrom(rq,ref epremote);
                
                    if (recount>0) {
                        Reply re = new Reply(epremote,DateTime.Now,Encoding.ASCII.GetString(rq,0,recount));
                        replies.Add(re);
                    }
                }
                catch (OperationCanceledException) { cancelled = true; }
                catch (Exception ex) { Debug.WriteLine(ex.Message); cancelled = true;}
                
            } while (!cancelled);
        }
        
        return replies;
    }
    private class t_bs {
        private int m_cnt;
        private CancellationTokenSource m_cts;
        public t_bs() {
            m_cts = new CancellationTokenSource();
            m_cnt = 0;
        }
        public CancellationToken GetToken() { return m_cts.Token; }
        public void ReqCancellation() { if (m_cnt==0) m_cts.Cancel(); m_cnt=1;}
    };
    private void t_b(Object? o) {
        if (o is t_bs tbs)
            tbs.ReqCancellation();
    }
}
Share Improve this question asked Nov 19, 2024 at 6:28 vssvss 211 silver badge3 bronze badges 5
  • Can you try making the socket non-blocking using m_socket.Blocking = false ? Also using a cancellation token tied to a timer can cause the receive operation to wait until the cancellation finishes – Abcd Commented Nov 19, 2024 at 6:51
  • The names of your variables are terrible. Makes it difficult to read. – Magnus Commented Nov 19, 2024 at 7:48
  • Why use a Timer and not a CancellationToken? And why use a raw socket and not UdpClient which handles most of the issues for you. – Charlieface Commented Nov 19, 2024 at 10:33
  • Blocking socket is used because it seems more efficient that cycle of "if Available then Read; Sleep". Timer is used to signal cancellation via CancellationToken when timeout is reached. – vss Commented Nov 19, 2024 at 17:23
  • I am experimenting with raw socket, rather than UdpClient, because UdpClient does not have ReceiveAsync with cancellation token. – vss Commented Nov 19, 2024 at 17:41
Add a comment  | 

1 Answer 1

Reset to default 1

I am struggling to follow your code, but using the following I wasn't able to reproduce the problem and found that using a CancellationToken works perfectly to stop the ReceiveAsync.

Server Code:

static async Task SendAndReceiveAsync(UdpClient udpClient, int messageCount, CancellationToken cancellationToken)
{
    var timeout = 500; // Timeout to wait for UDP response
    try
    {
        string message = $"{messageCount:D4} at {DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}";
        var memory = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(message));

        var stopWatch = new Stopwatch();
        await udpClient.SendAsync(memory, cancellationToken);
        stopWatch.Start();
        try
        {
            using var insideCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            insideCancellationTokenSource.CancelAfter(timeout);
                
            while (true) // Fast Read to clear any delayed transmissions that arrived, outside the timeout
            { 
                var udpResult = await udpClient.ReceiveAsync(insideCancellationTokenSource.Token);
                if (Int32.Parse(Encoding.UTF8.GetString(udpResult.Buffer)) == messageCount)
                {
                    Console.WriteLine($"{messageCount:D4} - Response Received - {stopWatch.ElapsedMilliseconds,3} ms");
                    break;
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"{messageCount:D4} - Timed out...");
        }
        catch (Exception) { throw; }
    }
    catch (OperationCanceledException) { }
    catch (Exception) { throw; }
}

static async Task UdpHeartbeatServerSender(CancellationToken cancellationToken)
{
    var sendInterval = 1000; // Interval to send UDP broadcast messages

    try
    {
        using var udpClient = new UdpClient();
        udpClient.Connect(new IPEndPoint(IPAddress.Parse("192.168.1.218"), 54321));
        udpClient.EnableBroadcast = true;
        int messageCount = 0;
        while (true)
        {
            var subTask = Task.Run(() => SendAndReceiveAsync(udpClient, messageCount++, cancellationToken),cancellationToken);
            await Task.Delay(sendInterval, cancellationToken);
            if (!subTask.IsCompleted)
                throw new Exception("Send/Receive has not completed!");                   
        }
    }
    catch (OperationCanceledException) { }
    catch (Exception) { throw; }
}

static async Task Main(string[] args)
{
    try
    {
        using var cancellationTokenSource = new CancellationTokenSource();
        var serverSender = Task.Run(() => UdpHeartbeatServerSender(cancellationTokenSource.Token));
            
        Console.WriteLine("Press ENTER to exit");
        Console.ReadLine();
        cancellationTokenSource.Cancel();
        await serverSender;
    }
    catch (OperationCanceledException) { }
    catch (Exception) { throw; }
}

Client Code

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;

int responseDelay = 0;
UdpClient udpClient;
async Task Respond(string messageID, int timeDelay, IPEndPoint remoteEndPoint)
{
    try
    {
        await Task.Delay(Interlocked.CompareExchange(ref responseDelay, 0, 0));
        var buffer = Encoding.UTF8.GetBytes(messageID);
        await udpClient.SendAsync(buffer, buffer.Length, remoteEndPoint);
        Console.WriteLine($"S: {messageID}");
    }
    catch (OperationCanceledException) { }
    catch (Exception) { throw; }
}
async Task ListenAndSend()
{
    try
    {
        udpClient = new UdpClient(54321);
        while (true)
        {
            var udpResult = await udpClient.ReceiveAsync();
            Console.WriteLine("R: "+ Encoding.UTF8.GetString(udpResult.Buffer));
            var messageId = Encoding.UTF8.GetString(udpResult.Buffer[0..4]);

            _ = Task.Run(() => Respond(messageId, Interlocked.CompareExchange(ref responseDelay, 0, 0), udpResult.RemoteEndPoint));
        }
    }
    catch (OperationCanceledException) { }
    catch (Exception) { throw; }

}


Console.WriteLine("Press");
Console.WriteLine("     Space to increase timer");
Console.WriteLine("     z to decrease timer");

_ = Task.Run(() => ListenAndSend());

while (true)
{
    var key = Console.ReadKey();
    if (key.KeyChar == ' ')
        Interlocked.Add(ref responseDelay,100);
    if (key.KeyChar == 'z')
        Interlocked.Add(ref responseDelay, -100);

    Console.WriteLine($"Delay is now: {Interlocked.CompareExchange(ref responseDelay, 0, 0)}");
}

本文标签: cNET 8 socket ReadAsync why the last reply is reported exactly at the time of cancellationStack Overflow