Finding local C# servers on a Java client using UDP and reactive extensions

Posted on

Problem

This program uses UDP broadcast to find app servers on the local network. When a server receives a client broadcast, it sends a port (integer) to the client which will later be used to create a TCP connection.


The Server portion of my code is written in C#. The UdpPoller method returns an IObservable<IPEndPoint> which is used to report any activity on the broadcast port.

class ClientListener
{
    public static IObservable<IPEndPoint> UdpPoller(int listenPort)
    {
        return Observable.Create(
            (IObserver<IPEndPoint> subscriber) =>
            {
                UdpClient listener = null;
                try
                {
                    listener = new UdpClient(listenPort);
                    for (;;)
                    {
                        var endpoint = new IPEndPoint(IPAddress.Any, listenPort);
                        listener.Receive(ref endpoint);
                        Console.WriteLine("Received broadcast from {0}", endpoint.ToString());
                        subscriber.OnNext(endpoint);
                    }
                }
                catch (SocketException e)
                {
                    Console.WriteLine(e.Message);
                }
                finally
                {
                    if (listener != null)
                    {
                        listener.Close();
                    }
                }
                return Disposable.Empty;
            });
    }

The PortReporter method returns an Action<IPEndPoint> which is used to send the port info to a client.

    public static Action<IPEndPoint> PortReporter(int serverPort)
    {
        return endpoint =>
        {
            var sender = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);

            try
            {
                var sizeBytes = BitConverter.GetBytes(serverPort);
                // Java needs reverse order. 
                // Int format will be consistent as Java style
                Array.Reverse(sizeBytes);
                sender.SendTo(sizeBytes, new IPEndPoint(endpoint.Address, endpoint.Port));
            }
            catch (FormatException e)
            {
                Console.WriteLine(e.Message);
            }
            catch (SocketException e)
            {
                Console.WriteLine(e.Message);
            }
            finally
            {
                sender.Close();
            }
        };
    }
}

Here is an example:

var report = ClientListener.UdpPoller(11000)
    .SubscribeOn(NewThreadScheduler.Default)
    .Subscribe(ClientListener.PortReporter(11001));

The client portion of my code is in Java. The find method is used to ping all the servers on a local network and listen for that servers port. When it receives the port number it will emit a Pair<InetAddress, Integer>. The end goal is that a user will receive a list of all the servers and choose which one they want to connect to.

public class AppServer {
    public static Observable<Pair<InetAddress, Integer>> find(
            final InetAddress broadcastAddress,
            final int broadcastPort
    ) {
        return Observable.create(
                subscriber -> {
                    try (final DatagramSocket socket = new DatagramSocket()) {

                        final byte[] data = "Client".getBytes("UTF-8");
                        final DatagramPacket packet = new DatagramPacket(
                                data, data.length, broadcastAddress, broadcastPort);
                        socket.send(packet);
                        for (;;) {
                            socket.setSoTimeout(1000);
                            packet.setData(new byte[4]);
                            socket.receive(packet);
                            subscriber.onNext(new Pair<>(
                                    packet.getAddress(),
                                    ByteBuffer.wrap(packet.getData()).getInt()));
                        }
                    } catch (final IOException ignored) {
                        // Timeout reached
                    } finally {
                        subscriber.onCompleted();
                    }
                });
    } 
}

Here is an example:

AppServer.find(InetAddress.getByName("255.255.255.255"), 11000)
    .subscribe(System.out::println, System.err::println, ()-> System.out.println("Done"));

Feedback I’m most interested in:

  • To know if I am using Rx correctly, or how I can better use the library.
  • How to better handle the endless looping (found in UdpPoller when listening to broadcasts, and in find when parsing incoming packets and relying on the timeout IOException).
  • How to better handle errors (in UdpPoller I’m not sure if it’s better to let the error propagate or just end the stream).

Solution

return Disposable.Empty;

By returning Disposable.Empty, client code has no way to stop listening on that port.

Consider instead a combination of Observable.Using, Observable.FromAsync, and Observable.Repeat:

public static IObservable<IPEndPoint> UdpPoller(int listenPort)
{
    IObservable<UdpReceiveResult> receiveResults = Observable.Using(
        () => new UdpClient(listenPort),
        client => Observable.FromAsync(client.ReceiveAsync).Repeat());

    return receiveResults.Select(receiveResult => receiveResult.RemoteEndPoint);
}

This way, disposing of the subscription will dispose of the UdpClient.

Leave a Reply

Your email address will not be published. Required fields are marked *