Category Archives: apriori

Apache Spark flatMap time complexity

I've been trying to find a way to count the number of times sets of Strings occur in a transaction database (implementing the Apriori algorithm in a distributed fashion). The code I have currently is as follows:

val cand_br = sc.broadcast(cand)
transactions.flatMap(trans => freq(trans, cand_br.value))
            .reduceByKey(_ + _)            
}

def freq(trans: Set[String], cand: Array[Set[String]]) : Array[(Set[String],Int)] = {
    var res = ArrayBuffer[(Set[String],Int)]()
    for (c <- cand) {
        if (c.subsetOf(trans)) {
            res += ((c,1))
        }
    }
    return res.toArray
}

transactions starts out as an RDD[Set[String]], and I'm trying to convert it to an RDD[(K, V), with K every element in cand and V the number of occurrences of each element in cand in the transaction list.

When watching performance on the UI, the flatMap stage quickly takes about 3min to finish, whereas the rest takes < 1ms.

transactions.count() ~= 88000 and cand.length ~= 24000 for an idea of the data I'm dealing with. I've tried different ways of persisting the data, but I'm pretty positive that it's an algorithmic problem I am faced with.

Is there a more optimal solution to solve this subproblem?

PS: I'm fairly new to Scala / Spark framework, so there might be some strange constructions in this code