Problem
I have a data synchronisation operation that is observing a database. This is a fairly key piece of code for the system, so I want to make sure I handle the concurrency correctly.
The problem
- The operation manipulates sync state flags in the database and should
have zero concurrency - The operation must be cancellable
- The operation should repeat an additional time if signals are received while it is running
Behavior
- The service is started in response to changes to the underlying data
- The service is started in response to changes in connectivity
- The service is started periodically
Current design
bool syncRequired = false;
private async void StartSynchronisation()
{
// the cancellation token operates as the flag to indicate if the operation is running
if (this.cancellationTokenSource != null)
{
syncRequired = true;
return;
}
try
{
this.cancellationTokenSource = new System.Threading.CancellationTokenSource();
// stop the time that periodically prompt the operation
this.synchronisationTimer.Stop();
bool dataWasSynchronised = false;
// Repeated synchronisation and replication until no updates are made
do
{
syncRequired = false;
// Only attempt synchronisation if the service is actually available
if (this.connectivityMonitor.IsServiceAvailable)
{
dataWasSynchronised = await DoSynchronisation().ConfiguredAwait(false);
}
}
while (dataWasSynchronised || syncRequired );
}
catch (Exception ex)
{
// handle exception using injected policy handler
}
finally
{
this.synchronisationTimer.Start();
this.cancellationTokenSource.Dispose();
this.cancellationTokenSource = null;
}
}
I bring you this code because it is missing any thread synchronisation and it probably needs it. It currently relies on the atomicity of read/writes to a reference (cancellationTokenSource
) and bool (syncRequired
) but this doesn’t cover anything really. As a critical piece of code, I don’t want to miss an event and need to wait for the next timer interval to start the operation. It will continue working, but our test department won’t like it. My issue is, thread concurrency isn’t my strongest area and I’m already burned out on this feature – I’m struggling to reason about it without tying my head in knots.
What synchronisation primitives do I need to make sure all events are handled as intended? Is there a more favourable pattern using concurrent queues or reactive extensions that I can throw in here? I have been considering some use of Reactive Extension’s BehaviorSubject<T>
.
Solution
My understanding:
The first if
statement is meant to check whether a thread is already executing the loop by checking whether cancellationTokenSource
is null; if it’s not null, then signal that a sync is required by setting syncRequired
to true and returning. The thread executing the loop will then run another loop due to the while (dataWasSynchronised || syncRequired )
condition.
If cancellationTokenSource
is null, then the sync loop isn’t being executed, so this thread goes on to do just that.
private async void StartSynchronisation()
{
// the cancellation token operates as the flag to indicate if the operation is running
if (this.cancellationTokenSource != null)
{
syncRequired = true;
return;
}
try
{
this.cancellationTokenSource = new System.Threading.CancellationTokenSource();
So the first problem is that that check (the first if
statement) isn’t synchronised. One thread can see cancellationTokenSource
is null and move on to execute the sync loop, first setting cancellationTokenSource
to a new object. Before it assigns the new object – say, after setting syncRequired
to true – there’s a context switch, and another thread executes the same code. It too sees that cancellationTokenSource
is null, sets syncRequired
to true, assigns a new object to cancellationTokenSource
, and starts running the loop. Then there’s another context switch and our original thread is back again, assigns a new object to cancellationTokenSource
, and starts executing the loop as well. Voila, you’ve now violated your first requirement of having zero concurrency.
The fix is simple: the check, as well as any objects being altered to affect the outcome of the check, must be synchronised.
A basic way to achieve this:
class MyClass
{
private readonly object syncLock = new object();
private volatile bool syncRequired = false;
private volatile bool syncRunning = false;
private void MySyncFunc()
{
lock (syncLock)
{
if (syncRunning)
{
syncRequired = true;
return;
}
syncRunning = true;
}
while (true)
{
syncRequired = false;
// your sync operations here
lock (syncLock)
{
if (!syncRequired)
{
syncRunning = false;
break;
}
}
}
}
}
I’ll quickly point out this isn’t the most efficient or pretty, but it’s a simple (and hopefully accurate) way to show and do it.
First of all notice you can’t even check whether your sync loop is running without getting the lock. So only one thread at a time can check. If a thread determines it isn’t running, it sets the value to true (to signal that it’s now running) and exits the lock. The second thread that arrived on the scene in the earlier example, if it was context-switched to during that time, would not have been able to proceed because it couldn’t get the lock. Once the first thread exits the lock, the second thread gets the lock. Now it sees that syncRunning
is true, so it sets syncRequired
to true and returns. No sync problems there.
Now let’s look at the loop. Nevermind that it’s an infinite loop, I just wanted to get the checks out of the loop condition. So first thing it does is set syncRequired
to false. This doesn’t need to be synchronised because even if another thread had set it to true right before, you’re gonna go and do the actual synchronisation right now anyway.
So after your own sync stuff completes, it’s time to check whether you can exit the loop or need to repeat it. (I’ve left out checking the result of your sync operation as it’s not relevant to this.) Again you got to get the lock. Why? Because the thread could check syncRequired
and see it’s false, and move on to set syncRunning = false
. But before it’s set, context switch, another thread checks and sees that syncRunning
== true, so sets syncRequired
to true and returns. Meanwhile the first thread continues to set syncRunning
to false and returns as well. Now you have syncRequired
== true but no thread running the loop. Whoops, you missed an event! Take a look at your original code and you’ll see this problem is there, if syncRequired
is set to true after the loop exits and before cancellationTokenSource
is set to null in the finally
block.
So what the code above does instead is it gets a lock on the same object again and checks whether a sync is required. If it is, fine, the lock is exited and the loop starts over. But if it’s not, then it sets syncRunning
to false before it exits the lock. Since any other thread that wants to check if syncRunning
== true has to wait until the lock in the loop is exited, they will see the correct value when they finally do get the lock, because if the first thread will exit the loop, it will set syncRunning
to false while it has the lock and before it exits the lock. Whether the next thread gets the lock and checks, it will see that syncRunning
== false, and go on to run the loop itself. No more lost events!
Couple other issues:
If you’re going to use a member variable in a loop, but not modify its value in the loop (or only ever set it to a single value on each iteration such that there is no apparent point in doing so, even though you know that some other thread can modify the value), then mark it volatile
. This will result in the value being read from memory each time, so you pick up changes made from other threads. Otherwise the compiler could optimise the read out of the loop; after all, it’s never changing during the lifetime of the loop.
And your code isn’t cancellable. Sure you create a CancellationTokenSource
, but you never actually use it – hence why I simply substituted it with a bool
in my example. What you need to be doing is checking the token each iteration (or possibly pass it to the function being called if that’s the part that should allow cancellation of its task, though you’d then have to alter the function to accept and know how to use the token) and seeing whether a cancellation has been requested; then if so, throw a OperationCanceledException
(or however you want to abort what’s currently running).