In the previous post, I demonstrated the obvious, viz. that in the absence of transactions, concurrent clients can step on each others’ feet. And now, we find ourselves in a crossroads: either we admit that there’s a new science of data management whereby all problems have been solved once we demormalized everything into large tree structures, or we accept that there’s still need for transactional support and we need some way to implement it. Lest I make it a habit to state the obvious, I won’t spend time on this dilemma. Let me just say that denormalizing, if anything, widens the need for transactions, since denormalized data must be constantly synchronized. Notwithstanding thousands of Google hits stating with almost religious fanaticism how everyone who seeks “Old SQL” properties in MongoDB just doesn’t get it.
Let’s revisit the item reordering exercise from the last post, and reformulate it in a bit more elementary form: a particular simple way to go about rearranging items in a list, which is swapping locations of two items. With this formulation, the problem now is that we have multiple agents, each needing occasionally to get exclusive access to two shared resources and do some work, preferably without interfering too much with anyone else. Did I jog your memory? I bet I did. This problem is the Dining Philosophers one, of the venerable Edsger Dijkstra.
I will show you that MongoDB is, indeed, up to the task of handling this problem. I even hope that this will appease the MongoDB fans into flaming me less than usual for this kind of heretic talk. Let me set up the universe of discourse by way of showing you the code to create the data, set, get, test and reset them (yes, data is plural, by the way).
open MongoDB.Bson open MongoDB.Driver let mutex = new System.Object() let global_counter = ref 0 let connectionString = "mongodb://localhost/?safe=true"; let server = MongoServer.Create(connectionString); let mongo = server.GetDatabase("local"); let items = mongo.GetCollection<BsonDocument>("items") let create_items () = let create_item (n:int) = let x = (new BsonDocument()).Add("name",BsonValue.Create(n)) items.Insert(x) |> ignore for i in 1 .. 5 do create_item i let all = items.Count printfn "%A" all let checkUniq listarr = (listarr |> Set.ofList |> Set.count) = listarr.Length let check_items () = let cur = items.FindAll() let enumer = cur.GetEnumerator() let rec traverse l = if enumer.MoveNext () then traverse (enumer.Current :: l) else l let all = traverse [] let allord = all |> List.map (fun d -> d.GetValue("ord").AsInt32) checkUniq allord let get_item (n:int) = let query = new QueryDocument((new BsonDocument()).Add("name",BsonValue.Create(n))) let doc = items.FindOne(query) doc.GetValue("ord").AsInt32 let set_item (n:int) (o:int) = let query = new QueryDocument((new BsonDocument()).Add("name",BsonValue.Create(n))) let upd = new UpdateDocument((new BsonDocument()).Add("$set",BsonValue.Create((new BsonDocument()).Add("ord", BsonValue.Create(o))))) items.Update(query, upd) |> ignore let reset_item (n:int) = let query = new QueryDocument((new BsonDocument()).Add("name",BsonValue.Create(n))) let upd = new UpdateDocument((new BsonDocument()).Add("$set",BsonValue.Create((new BsonDocument()).Add("ord", BsonValue.Create(n)).Add("locked", BsonValue.Create(0))))) items.Update(query, upd) |> ignore let reset_items () = for i in 1 .. 5 do reset_item i
Simple enough. Five items, identified by integer names from one to five, holding integer places from one to five.
To make a long story short, I’ll skip the part where a naive solution to the problem leads to potential deadlock and adopt the practice of ordering resources to eliminate that. But let me show you how I implement locking, including the busy-wait loop to fill in for the missing blocking wait.
let r (lockfun:int->bool) (n:int) = let rec raux i = if i = 200 then failwith "Possible livelock" let b = lockfun n if b then i else Async.RunSynchronously(Async.Sleep 10) ; raux (i+1) let i = raux 1 lock mutex (fun () -> global_counter := !global_counter + i ; printfn "l %A -> %A" n i) let simple_lock (n:int) = let query = new QueryDocument((new BsonDocument()).Add("name",BsonValue.Create(n)).Add("locked",BsonValue.Create(0))) let upd = new UpdateDocument((new BsonDocument()).Add("$set",BsonValue.Create((new BsonDocument()).Add("locked", BsonValue.Create(1))))) let wc = items.Update(query, upd) wc.DocumentsAffected = (int64)1 let simple_unlock (n:int) = let query = new QueryDocument((new BsonDocument()).Add("name",BsonValue.Create(n))) let upd = new UpdateDocument((new BsonDocument()).Add("$set",BsonValue.Create((new BsonDocument()).Add("locked", BsonValue.Create(0))))) let wc = items.Update(query, upd) wc.DocumentsAffected = (int64)1
Each swap operation is implemented by the following code.
let rec swap_simple_lock (ix:int) (iy:int) = if ix > iy then swap_simple_lock iy ix else r simple_lock ix let oix = get_item ix r simple_lock iy let oiy = get_item iy set_item ix oiy set_item iy oix simple_unlock iy |> ignore simple_unlock ix |> ignore
The code to run a full test is the following.
let worker (swapfun:int->int->unit) cnt = let random = new System.Random() async { for i = 1 to cnt do let ix = random.Next(4) + 1 let iytemp = random.Next(4) + 1 let iy = if ix <> iytemp then iytemp else (iytemp % 4) + 1 swapfun ix iy } let run_test swapfun cnt wcnt = let s = seq { for i in [1 .. wcnt] -> worker swapfun cnt } Async.RunSynchronously (Async.Parallel s) let cnt = 10 let wcnt = 10 let expected_cnt = 2 * cnt * wcnt run_test swap_simple_lock cnt wcnt |> ignore let b = check_items () printfn "check = %A" b printfn "Exprected_counter = %A" expected_cnt printfn "Global_counter = %A" !global_counter
Running the test almost proves that the problem is solved and, for practical purposes, one might also say that it is. However, the only thing we did is push the problem to a corner albeit similar to the corner it is pushed by the “Full SQL” solution. Starvation due to livelock is still possible, particularly since the ordering of resources makes some workers less privileged than others. To demonstrate that, I’ll “rig” the test a little.
let rigged_worker ix iy (swapfun:int->int->unit) cnt = let random = new System.Random() async { for i = 1 to cnt do let flip = random.Next(5) let ix = if flip = 0 then 1 else ix let iy = if flip = 0 then ix else iy swapfun ix iy; lock mutex (fun () -> printfn "swapped %A, %A for %A times" ix iy cnt) } let run_rigged_test swapfun cnt wcnt = let s = seq { for i in [1 .. wcnt] -> rigged_worker 4 5 swapfun cnt } Async.RunSynchronously (Async.Parallel s) run_rigged_test swap_simple_lock cnt wcnt |> ignore
This set of probabilities makes a livelock appear in almost every run, on my machine. So a livelock is still possible, but is it more possible than running similar concurrent transactions in SQL? I must say I do not have an authoritative answer to that but, after monitoring systems running many millions of similar concurrent transactions per day, I can attest to only having witnessed locking of a transaction by another when the latter took more time than expected. Modern RDBMS’s include mechanisms to minimize starvation (here’s an article I found to that effect).
So, is this the end of it? I hope not, and I have some ideas on now to implement a solution that goes a bit further in the way of avoiding starvation. Rest assured that, should they come to fruition, you’ll be the first to know.