“Two way” HTTP Client

Posted on

Problem

In one of my applications I make requests to a server. However, that server may at some point start making requests to my application as well. The server also allows polling, this means that I am able to make a request to a URL on the server and it only returns after some time if it does not have any content available at the moment (the polling does not guarantee that there will be content).

So I implemented the class TwoWayHttpClient that is able to abstract all of this for me. It makes use of a HttpClient, to make requests to the server. It creates a HttpListener if needed, and also supports the polling.

This class provides the method RequestAndWaitForReply that makes a request to the server and waits for an expected message from server (that can arrive several minutes after). Remember that that message can arrive both via the listener, or via polling.

The class also provides the method WaitForMessage in the case the server sends the message without me having to make any request.

BaseIncomingMessage provides an abstraction to the kind of the response that is sent by the server. Classes that derive from it can override IsExpectedMessageImpl to determinate if the server sent the expected message.
One thing that I don’t like a lot is that this class is used in two different circumstances. The first is when ever I receive a request from the server and put it on the queue. The second is when I create an instance of it to be able to compare to the received message.

FilterQueue is already up to review on my other post.

public class TwoWayHttpClient : IDisposable
{
    private readonly HttpClientHandler _handler;
    private readonly HttpClient _client;

    protected HttpListener _listener;
    protected readonly FilterQueue<IIncomingMessage> _queue = new FilterQueue<IIncomingMessage>();

    public string ListeningAddress { get; private set; }

    private TwoWayHttpClient()
    {
        _handler = new HttpClientHandler()
        {
            CookieContainer = new CookieContainer()
        };
        _client = new HttpClient(_handler);
    }

    protected TwoWayHttpClient(string server)
        : this()
    {
        _client.BaseAddress = new Uri("http://" + server);
        _client.Timeout = TimeSpan.FromMinutes(10);
    }

    public void StartListener(int listenerPort, string baseAddress = "/")
    {
        if (_listener != null)
        {
            return;
        }

        //the listening address contaning local ip adress, so there aren't any issues about who the server needs to send the request to
        ListeningAddress = "http://" + GetLocalIPAddress() + ":" + listenerPort + baseAddress;

        _listener = new HttpListener();
        //Add the prefix for the listener on the specified port, the + sign is important so it can recieve requests
        _listener.Prefixes.Add("http://+:" + listenerPort + baseAddress);
        _listener.Start();

        //starts listening to requests
        Task.Run(async () =>
        {
            try
            {
                while (_listener.IsListening)
                {
                    var context = await _listener.GetContextAsync();

                    //for every request recieved put it on a queue.
                    _queue.Add(GetIncommingMessageFromContext(context));

                    //answer to the server with 200. This is important, so the server can keep sending other requests
                    using (context.Response) { }
                }
            }
            catch (ObjectDisposedException)
            {
                //listener was disposed, ignore it
            }
        });
    }

    CancellationTokenSource _cancelSource = new CancellationTokenSource();
    public void InitiatePolling(string url)
    {
        _cancelSource = new CancellationTokenSource();
        var token = _cancelSource.Token;
        Task.Run(async () =>
        {
            while (true)
            {
                _queue.WaitForWaiters();
                token.ThrowIfCancellationRequested();
                var response = await Request(url);
                if (!IsValidResponse(response))
                {
                    throw new InvalidOperationException(response.ToString());
                }
                if (response.StatusCode != HttpStatusCode.NoContent)
                {
                    _queue.Add(GetIncommingMessageFromResponse(response));
                }
            }
        }, token);
    }

    protected virtual IIncomingMessage GetIncommingMessageFromResponse(HttpResponseMessage response)
    {
        return new BaseIncomingMessage(response);
    }

    protected virtual IIncomingMessage GetIncommingMessageFromContext(HttpListenerContext context)
    {
        return new BaseIncomingMessage(context);
    }

    private static string GetLocalIPAddress()
    {
        var host = Dns.GetHostEntry(Dns.GetHostName());
        foreach (var ip in host.AddressList)
        {
            if (ip.AddressFamily == AddressFamily.InterNetwork)
            {
                return ip.ToString();
            }
        }
        throw new Exception("Local IP Address Not Found!");
    }

    private Task<string> GetQueryStringFromDictionary(IDictionary<string, object> parameters)
    {
        var values = parameters.Where(p => p.Value != null)
            .Select(p => new KeyValuePair<string, string>(p.Key, p.Value.ToString()));

        using (var query = new FormUrlEncodedContent(values))
        {
            return query.ReadAsStringAsync();
        }
    }

    private Cookie GetSetCookie(HttpResponseMessage response)
    {
        IEnumerable<string> authValues;
        if (!response.Headers.TryGetValues("Set-Cookie", out authValues))
        {
            return null;
        }

        var cookieValue = authValues.FirstOrDefault();
        var idxEquals = cookieValue.IndexOf('=');

        var name = cookieValue.Substring(0, idxEquals);
        var value = cookieValue.Substring(idxEquals + 1, cookieValue.IndexOf(';') - idxEquals - 1);

        return new Cookie(name, value)
        {
            Domain = _client.BaseAddress.Host
        };
    }

    public async Task<HttpResponseMessage> Request(string url, IDictionary<string, object> parameters = null, string name = "")
    {
        if (parameters != null && parameters.Count != 0)
        {
            var query = await GetQueryStringFromDictionary(parameters);
            url = url + "?" + query;
        }

        var response = await _client.GetAsync(url);

        var cookie = GetSetCookie(response);
        if (cookie != null)
        {
            _handler.CookieContainer.Add(cookie);
        }
        return response;
    }

    protected async Task<TwoWayHttpResponse> RequestAndWaitForReply(string url, object parameters, IIncomingMessage expectedMessage)
    {
        if (parameters == null)
        {
            parameters = new object();
        }
        var response = await Request(url, parameters.ToDictionary());

        if (IsValidResponse(response))
        {
            if (expectedMessage == null)
            {
                return new TwoWayHttpResponse
                {
                    Response = response
                };
            }
            var result = new TwoWayHttpResponse
            {
                Response = response,
                ServerRequest = await WaitForMessage(expectedMessage)
            };

            return result;
        }

        return new TwoWayHttpResponse
        {
            Response = response
        };
    }

    public virtual Task<IIncomingMessage> WaitForMessage(IIncomingMessage expectedMessage)
    {
        var message = _queue.Take(expectedMessage.IsExpectedMessage, Timeout.InfiniteTimeSpan);
        return Task.FromResult(message);
    }

    public virtual void ClearQueue()
    {
        _queue.Clear();
    }

    protected virtual bool IsValidResponse(HttpResponseMessage response)
    {
        return (int)response.StatusCode >= 200 && (int)response.StatusCode < 400;
    }

    public void Dispose()
    {
        if (_listener != null)
        {
            _listener.Stop();
            (_listener as IDisposable).Dispose();
        }
        if (_cancelSource != null)
        {
            _cancelSource.Cancel();
        }
        _client.Dispose();
    }
}   

public class BaseIncomingMessage : IIncomingMessage
{
    #if DEBUG
    private readonly HttpListenerContext _context;
    private readonly HttpResponseMessage _response;
    #endif

    private readonly Task<string> _content;

    public BaseIncomingMessage(HttpListenerContext context)
    {
        _context = context;
        using (var reader = new StreamReader(_context.Request.InputStream))
        {
            _content = reader.ReadToEndAsync();
        }
    }

    public BaseIncomingMessage(HttpResponseMessage response)
    {
        _response = response;
        _content = response.Content.ReadAsStringAsync();
    }

    public BaseIncomingMessage()
    {

    }

    public bool IsExpectedMessage(IIncomingMessage message)
    {
        return IsExpectedMessageImpl(message, _content.Result);
    }


    public virtual bool IsExpectedMessageImpl(IIncomingMessage originalMessage, string content)
    {
        return true;
    }

}

public interface IIncomingMessage
{
    bool IsExpectedMessage(IIncomingMessage compareTo);
}

Some utilities:

public static class ObjectUtilities
{
    private static bool HasValue(object value)
    {
        if (value != null)
        {
            if (value is bool)
            {
                return (bool)value;
            }
            return true;
        }
        return false;
    }

    public static Dictionary<string, object> ToDictionary(this object myObj)
    {
        if (myObj == null)
        {
            return new Dictionary<string, object>();
        }
        return myObj.GetType()
            .GetProperties()
            .Where(pi => HasValue(pi.GetValue(myObj, null)))
            .Select(pi =>
            {
                var dataMemberAttribute = pi.GetCustomAttribute<DataMemberAttribute>();
                var name = dataMemberAttribute == null ? pi.Name : dataMemberAttribute.Name;
                return new { Name = name, Value = pi.GetValue(myObj, null) };
            })
            .AsParallel()
            .ToDictionary(ks => ks.Name, vs => vs.Value);
    }
}

Solution

Protected fields (such as _listener and _queue breaks incapsulation), fields must be private. When they’re not then you’re exposing your internal implementation to derived classes. It’s not just about academic OOP reasoning but also because they will become part of class interface contract (and you usually do not want to restrict your choices about internal implementation.) To design a good derivable class you already need a lot of effort even without exposing implementation.

Private TwoWayHttpClient parameterless constructor is called only by another protected TwoWayHttpClient constructor. In this moment the parameterless constructor is then useless, drop it (and merge code with the other constructor.)

You have only one protected constructor then your class is effectively not instantiable however it’s not abstract, mark it as abstract to make it clear at first sight (and with a more helpful compiler error message in case of mistakes.)

You do not validate parameters: what if, for example, server argument is null? Add proper arguments checking for non-private methods.

AFAIK await the context from HttpListener and directly using it inside an asynchronous call of Task.Run is redundant (I’d say useless.) Just use _listener.GetContext() and drop await and async.

Do not directly throw Exception, there are many exceptions you can use…pick the most proper one. It’s not only about the point where you throw but – even more – about the point where you catch them. Using Exception you force the caller to catch Exception (and that’s pretty pretty bad.) Also note that method may simplified to (note that First() will generate InvalidOperationException for you if there isn’t any match):

private static string GetLocalIPAddress()
{
    return Dns.GetHostEntry(Dns.GetHostName()).AddressList
        .First(x => x.AddressFamily == AddressFamily.InterNetwork)
        .ToString();
}

In RequestAndWaitForReply you reassing parameters argument. I usually try very hard to avoid to reassign function arguments (I miss const modifier here…) because it may be break code by quick changes (just for example I have to change code path or I make that parameter ref), sometimes it is not harmful but I try to have a general rule. In this case it is used only once and ToDictionary() already handles that case. Just drop it.

I do not know about your usage scenario of this class but when I see a protected virtual method (like GetIncommingMessageFromResponse) to create an instance of another type then I also consider if code may be simplified using generic arguments.

An object should be disposable how many times you want…in your case if you call Dispose() twice you will get an exception (because you invoke Stop() on already disposed object). Save a _disposed flag. Also you should implement IDisposable using the usual pattern (especially if your class is abstract and should be derived.)

Do you need an utility class ObjectUtilities to be public?

Query in ToDictionary() IMO may be simplified with few private helper methods but another point is more important: will AsParallel() speed-up ToDictionary<TSource, TKey>()? I doubt (and if query contains only ToDictionary<TSource, TKey>() I doubt PLINQ will execute it in parallel, synchronization cost will be too high…you may perform a quick test.)

I’m not sure about proper asynchronous tasks usage, I feel something wrong (or with space for improvements) but it’d take a longer time to review that…a good job for someone else.

The way TwoWayHttpClient is designed is flawed.

Currently all incoming requests that come from the server go to the container. And then WaitFormessage verifies that a certain message has arrived. This may seem to cause no harm at first glance, but the consequences are that I am not available to remove items from that container ever.

Instead of doing that I should prefer to put an item on the container when someone needs to wait for a message. When a message from the server arrives it goes trough all items on that container and notify the correct waiters.

This implies modifications on FilterQueue that I will be discussing on my other post, while presenting the same problem, as well modifications in RequestAndWaitForReply and WaitForMessage.

protected async Task<TwoWayHttpResponse> RequestAndWaitForReply(string url, 
    object parameters, IIncomingMessage expectedMessage)
{
    if (parameters == null)
    {
        parameters = new object();
    }

    Func<IIncomingMessage> task = null;
    if (expectedMessage != null)
    {
        task = WaitForMessage(expectedMessage);
    }
    var response = await Request(url, parameters.ToDictionary());

    if (IsValidResponse(response))
    {
        if (expectedMessage == null)
        {
            return new TwoWayHttpResponse
            {
                Response = response
            };
        }
        var result = new TwoWayHttpResponse
        {
            Response = response,
            ServerRequest = task()
        };

        return result;
    }

    return new TwoWayHttpResponse
    {
        Response = response
    };
}

public Func<IIncomingMessage> WaitForMessage(IIncomingMessage expectedMessage)
{
    var token = _queue.AddMessage(expectedMessage);
    return (Func<IIncomingMessage>)(() => 
        _queue.WaitForNotification(token, Timeout.InfiniteTimeSpan));
}

Leave a Reply

Your email address will not be published. Required fields are marked *