Infusing MongoDB with some ACID

In the previous post, I demonstrated the obvious, viz. that in the absence of transactions, concurrent clients can step on each others’ feet. And now, we find ourselves in a crossroads: either we admit that there’s a new science of data management whereby all problems have been solved once we demormalized everything into large tree structures, or we accept that there’s still need for transactional support and we need some way to implement it. Lest I make it a habit to state the obvious, I won’t spend time on this dilemma. Let me just say that denormalizing, if anything, widens the need for transactions, since denormalized data must be constantly synchronized. Notwithstanding thousands of Google hits stating with almost religious fanaticism how everyone who seeks “Old SQL” properties in MongoDB just doesn’t get it.

Let’s revisit the item reordering exercise from the last post, and reformulate it in a bit more elementary form: a particular simple way to go about rearranging items in a list, which is swapping locations of two items. With this formulation, the problem now is that we have multiple agents, each needing occasionally to get exclusive access to two shared resources and do some work, preferably without interfering too much with anyone else. Did I jog your memory? I bet I did. This problem is the Dining Philosophers one, of the venerable Edsger Dijkstra.

I will show you that MongoDB is, indeed, up to the task of handling this problem. I even hope that this will appease the MongoDB fans into flaming me less than usual for this kind of heretic talk. Let me set up the universe of discourse by way of showing you the code to create the data, set, get, test and reset them (yes, data is plural, by the way).

open MongoDB.Bson
open MongoDB.Driver

let mutex = new System.Object()
let global_counter = ref 0
let connectionString = "mongodb://localhost/?safe=true";
let server = MongoServer.Create(connectionString);
let mongo = server.GetDatabase("local");
let items = mongo.GetCollection<BsonDocument>("items")

let create_items () =
    let create_item (n:int) =
        let x = (new BsonDocument()).Add("name",BsonValue.Create(n))
        items.Insert(x) |> ignore
    for i in 1 .. 5 do create_item i
    let all = items.Count
    printfn "%A" all

let checkUniq listarr =
    (listarr |> Set.ofList |> Set.count) = listarr.Length

let check_items () =
    let cur = items.FindAll()
    let enumer = cur.GetEnumerator()
    let rec traverse l =
        if enumer.MoveNext () then traverse (enumer.Current :: l)
        else l
    let all = traverse []
    let allord = all |> List.map (fun d -> d.GetValue("ord").AsInt32)
    checkUniq allord

let get_item (n:int) =
    let query = new QueryDocument((new BsonDocument()).Add("name",BsonValue.Create(n)))
    let doc = items.FindOne(query)
    doc.GetValue("ord").AsInt32

let set_item (n:int) (o:int) =
    let query = new QueryDocument((new BsonDocument()).Add("name",BsonValue.Create(n)))
    let upd = new UpdateDocument((new BsonDocument()).Add("$set",BsonValue.Create((new BsonDocument()).Add("ord", BsonValue.Create(o)))))
    items.Update(query, upd) |> ignore

let reset_item (n:int) =
    let query = new QueryDocument((new BsonDocument()).Add("name",BsonValue.Create(n)))
    let upd = new UpdateDocument((new BsonDocument()).Add("$set",BsonValue.Create((new BsonDocument()).Add("ord", BsonValue.Create(n)).Add("locked", BsonValue.Create(0)))))
    items.Update(query, upd) |> ignore

let reset_items () = for i in 1 .. 5 do reset_item i

Simple enough. Five items, identified by integer names from one to five, holding integer places from one to five.

To make a long story short, I’ll skip the part where a naive solution to the problem leads to potential deadlock and adopt the practice of ordering resources to eliminate that. But let me show you how I implement locking, including the busy-wait loop to fill in for the missing blocking wait.

let r (lockfun:int->bool) (n:int) =
    let rec raux i =
        if i = 200 then failwith "Possible livelock" 
        let b = lockfun n
        if b then i
        else Async.RunSynchronously(Async.Sleep 10) ; raux (i+1)
    let i = raux 1
    lock mutex (fun () -> global_counter := !global_counter + i ; printfn "l %A -> %A" n i)

let simple_lock (n:int) =
    let query = new QueryDocument((new BsonDocument()).Add("name",BsonValue.Create(n)).Add("locked",BsonValue.Create(0)))
    let upd = new UpdateDocument((new BsonDocument()).Add("$set",BsonValue.Create((new BsonDocument()).Add("locked", BsonValue.Create(1)))))
    let wc = items.Update(query, upd)
    wc.DocumentsAffected = (int64)1

let simple_unlock (n:int) =
    let query = new QueryDocument((new BsonDocument()).Add("name",BsonValue.Create(n)))
    let upd = new UpdateDocument((new BsonDocument()).Add("$set",BsonValue.Create((new BsonDocument()).Add("locked", BsonValue.Create(0)))))
    let wc = items.Update(query, upd)
    wc.DocumentsAffected = (int64)1

Each swap operation is implemented by the following code.

let rec swap_simple_lock (ix:int) (iy:int) =
    if ix > iy then
        swap_simple_lock iy ix
    else
        r simple_lock ix
        let oix = get_item ix
        r simple_lock iy
        let oiy = get_item iy
        set_item ix oiy
        set_item iy oix
        simple_unlock iy |> ignore
        simple_unlock ix |> ignore

The code to run a full test is the following.

let worker (swapfun:int->int->unit) cnt =
    let random = new System.Random()
    async {
        for i = 1 to cnt 
            do
                let ix = random.Next(4) + 1
                let iytemp = random.Next(4) + 1
                let iy = if ix <> iytemp then iytemp else (iytemp % 4) + 1
                swapfun ix iy
    }

let run_test swapfun cnt wcnt =
    let s = seq { for i in [1 .. wcnt] -> worker swapfun cnt }
    Async.RunSynchronously (Async.Parallel s)

let cnt = 10
let wcnt = 10
let expected_cnt = 2 * cnt * wcnt
run_test swap_simple_lock cnt wcnt |> ignore
let b = check_items ()
printfn "check = %A" b
printfn "Exprected_counter = %A" expected_cnt
printfn "Global_counter = %A" !global_counter

Running the test almost proves that the problem is solved and, for practical purposes, one might also say that it is. However, the only thing we did is push the problem to a corner albeit similar to the corner it is pushed by the “Full SQL” solution. Starvation due to livelock is still possible, particularly since the ordering of resources makes some workers less privileged than others. To demonstrate that, I’ll “rig” the test a little.

let rigged_worker ix iy (swapfun:int->int->unit) cnt =
    let random = new System.Random()
    async {
        for i = 1 to cnt 
            do
                let flip = random.Next(5)
                let ix = if flip = 0 then 1 else ix
                let iy = if flip = 0 then ix else iy
                swapfun ix iy;
        lock mutex (fun () -> printfn "swapped %A, %A for %A times" ix iy cnt)
    }

let run_rigged_test swapfun cnt wcnt =
    let s = seq { for i in [1 .. wcnt] -> rigged_worker 4 5 swapfun cnt }
    Async.RunSynchronously (Async.Parallel s)

run_rigged_test swap_simple_lock cnt wcnt |> ignore

This set of probabilities makes a livelock appear in almost every run, on my machine. So a livelock is still possible, but is it more possible than running similar concurrent transactions in SQL? I must say I do not have an authoritative answer to that but, after monitoring systems running many millions of similar concurrent transactions per day, I can attest to only having witnessed locking of a transaction by another when the latter took more time than expected. Modern RDBMS’s include mechanisms to minimize starvation (here’s an article I found to that effect).

So, is this the end of it? I hope not, and I have some ideas on now to implement a solution that goes a bit further in the way of avoiding starvation. Rest assured that, should they come to fruition, you’ll be the first to know.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s