As simple as possible, but not simpler

The simplicity being alluded to in the title, is not some aesthetic measure of code brevity inspired by Martin Odersky’s keynote in ScalaDays. It actually refers to simplifying arithmetic conditions, with the specific goal of proving whether the intersection of two sets, defined intensionally, is empty or not. This problem is actually very hard, but the case I’m interested in is where atomic conditions are linear inequalities involving single variables, which you have to imagine as referring to properties of a type, instances of which you define sets of. I can hear your enthusiasm winding down, because I’ve now simplified the problem a millionfold. One would think that this would be a toy problem for which there would be many solutions and yet, I couldn’t find any that didn’t involve finding concrete solutions and, all I needed was a simplified expression of the condition for the set intersection (for example, I looked at JSR 331). So, I decided to tackle it myself; why not have a little fun, huh?

Digging through the pits of my school-days memory, my first hunch was that some kind of normalization would be in order, and I settled on disjunctive normal form. Please don’t write me that all SAT and SMT solvers use conjunctive normal form and that I probably dug through the armpits of my school-days memory. Hear me out, first.

Each conjunction in DNF can have all its constituent atomic conditions on same variables be merged together, which results in either false, or another atomic condition. Consequently, each conjunction in DNF can be brought to, what can be described as, a simplified form where each variable is mentioned only once. And disjunctions will be fairly uncommon in the use case I was envisioning, which is some query builder interface (but not absent).

I’m going to walk you through my code and thought process. You’ll have to imagine the former being enclosed in a Scala object and the latter enclosed inside that glass display case for brains, that you see in movies. You’ll also have to imagine what role various types and case classes/enums play, because their names are evocative enough.

def toFlatDNF(c: ICriterion) : ICriterion = simplify(toDNF(c))

In a top-down fashion, I’m going first to transform the expression to DNF, then simplify it.

def flatten(c: ICriterion) : List[ICriterion] = {
    c match {
        case Composite(_, li2) => li2
        case _ => List(c)
    }
}

def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match {
   case Nil => List(Nil)
   case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt
}

def distributeOrOver(ors: List[ICriterion], ands: List[ICriterion]): List[ICriterion] = {
    val andsInOrs = ors.map(flatten)
    val product = cartesianProduct(andsInOrs)
    val allEnds = product.map(_ ++ ands)
    allEnds.map({
        case List(c) => c
        case li => Composite(AND,li)
    })
}

def toDNF(c: ICriterion) : ICriterion = {
    c match {
        case Composite(op,li) => {
            val li2 = li.map(toDNF).toList 
// each might be an OR of up to depth 2 or an AND of depth up to 1
            val (ands, ors) = li2.partition {
                case Composite(AND, c) => true
                case Composite(OR, c) => false
                case _ => true
            }
            op match {
                case OR => {
                    val flators = ors.flatMap(flatten)
                    Composite(OR, flators ++ ands)
                }
                case AND => {
                    val flatands = ands.flatMap(flatten)
                    if (ors.length == 0)
                         Composite(AND, flatands)
                    else {
                        val over = distributeOrOver(ors, flatands)
                        Composite(OR, over)
                    }
                }
            }

        }
        case _ => c
    }
}

What this (supposedly) does is to distribute OR over AND in order to arrive to DNF.

Now, the simplification that follows is based on extending the domain of all values to add a global MIN and a global MAX value, and on rephrasing all equalities and inequalities as interval conditions, whose either limit can be open or closed (methods left() and right() return the corresponding comparisons to the corresponding limit). A special comparison function, cmp arranges for MIN/MAX-aware comparisons.

The expression is normalized, in that way, then merged, then denormalized again to simple equalities and inequalities.

def groupByParameterEvaluable(liFinal: List[ICriterion]): Map[ParameterEvaluable,List[ICriterion]] = {
    liFinal.groupBy({case ParamOpVal(p,_,_) => p})
}

def normalize(c: ICriterion) : List[ICriterion] = c match {
    case ParamOpVal(p, op, values) =>
        op match {
            case EQ => List(ParamOpVal(p, BTW_E, List(values(0), values(0))))
            case LT => List(ParamOpVal(p, BTW_NE, List(MIN, values(0))))
            case GT => List(ParamOpVal(p, BTW_NE, List(values(0), MAX)))
            case LE => List(ParamOpVal(p, BTW_GE, List(MIN, values(0))))
            case GE => List(ParamOpVal(p, BTW_LE, List(values(0), MAX)))
            case NE => List(
                ParamOpVal(p, BTW_NE, List(MIN, values(0))),
                ParamOpVal(p, BTW_NE, List(values(0), MAX))
            )
            case _ => List(c)
        }
}

def denormalize(c: ICriterion) : List[ICriterion] = c match {
    case ParamOpVal(p, BTW_E, List(x, y)) if x == y => List(ParamOpVal(p, EQ, List(x)))
    case ParamOpVal(p, BTW_NE, List(MIN, x)) => List(ParamOpVal(p, LT, List(x)))
    case ParamOpVal(p, BTW_GE, List(MIN, x)) => List(ParamOpVal(p, LE, List(x)))
    case ParamOpVal(p, BTW_NE, List(x, MAX)) => List(ParamOpVal(p, GT, List(x)))
    case ParamOpVal(p, BTW_LE, List(x, MAX)) => List(ParamOpVal(p, GE, List(x)))
    case _ => List(c)
}

def merge(li: List[ICriterion]) : ICriterion = li.reduceLeft(combine)

def simplifyCriteria(criteria: List[ICriterion]): List[ICriterion] = {
    var normalized = criteria.flatMap(normalize)
    var merged = merge(normalized)
    denormalize(merged)
}

def simplify(cDnf: ICriterion): ICriterion = {
    cDnf match {
        case Composite(OR, liConj) => Composite(OR, liConj.map(simplify))
        case Composite(AND, liFinal) => {
            val grouped = groupByParameterEvaluable(liFinal)
            val groupedMerged = grouped.iterator.map({
                case (_, v) => simplifyCriteria(v)
            })
            val merged = groupedMerged.flatten.toList
            Composite(AND, merged)
        }
        case _ => cDnf
    }
}

The most complicated operation in all that is how to combine too intervals (one step of the merging process). Notice that, in order for two intervals to stand a chance of being able to combine, their closed-closed analogs should intersect and the easiest way to express that is as a negation of the condition that they not intersect: either one is before the other, or vice versa.

def fuseBetweenOperator(op1: CriteriaOperator, op2: CriteriaOperator) : CriteriaOperator =
    (op1, op2) match {
        case (GE, LE) => BTW_E
        case (GT, LE) => BTW_GE
        case (GE, LT) => BTW_LE
        case (GT, LT) => BTW_NE
        case _ => throw new Error(s"fuseBetweenOperator cannot be used on $op1 and $op2")
    }

//This combines two comparisons with conjunction
def combine(ic1: ICriterion, ic2: ICriterion) : ICriterion = {
    val FALSE = new FalseCriterion
    if(ic1.isInstanceOf[FalseCriterion]) return FALSE
    val c1 = ic1.asInstanceOf[SimpleCriterion]
    val c2 = ic2.asInstanceOf[SimpleCriterion]
    (c1,c2) match {
        case (ParamOpVal(p,op1,List(v1,v2)),ParamOpVal(_,op2,List(w1,w2))) if cmp(w1,v2)<=0 =>
            if(cmp(w1,v2)==0 && (op1.right()==LT || op2.left()==GT))
                FALSE
            else {
                val (lowerBound, leftop) = if(cmp(w1,v1)<=0) (v1,op1.left()) else (w1,op2.left())
                val (upperBound, rightop) = if(cmp(v2,w2)<=0) (v2,op1.right()) else (w2,op2.right())
                val operator = fuseBetweenOperator(leftop, rightop)
                if(cmp(lowerBound,upperBound)==0 && operator!=BTW_E)
                    FALSE
                else
                    ParamOpVal(p, operator, List(lowerBound, upperBound))
            }
        case (ParamOpVal(p,op1,List(v1,v2)),ParamOpVal(_,op2,List(w1,w2))) if cmp(w1,v2)<=0 =>
            if(cmp(v1,w2)==0 && (op2.right()==LT || op1.left()==GT))
                FALSE
            else {
                val (lowerBound, leftop) = if(cmp(w1,v1)<=0) (v1,op1.left()) else (w1,op2.left())
                val (upperBound, rightop) = if(cmp(v2,w2)<=0) (v2,op1.right()) else (w2,op2.right())
                val operator = fuseBetweenOperator(leftop, rightop)
                if(cmp(lowerBound,upperBound)==0 && operator!=BTW_E)
                    FALSE
                else
                    ParamOpVal(p, operator, List(lowerBound, upperBound))
            }
        case _ => FALSE
    }
}

Having all that implemented, testing for intersection is as simple as forming the conjunction of the conditions that define the sets involved, performing toFlatDNF, and seeing if the result is the false expression. If it isn’t, you have it as an expression to inspect and maybe think about how to change the set definitions so that they not intersect.