Category Archives: apache-spark-mllib

Matrix Multiplication – how to get the right type

I am trying to do a matrix multiplication using RDD, When I run the command in CLI i see that the output I get is the right type but when I run the function, it says I am returning a type of UNIT:

    val X = new RowMatrix(sc.parallelize(Seq(Vectors.dense(1.0, 2.0), Vectors.dense(3.0, 4.0), Vectors.dense(5.0, 6.0), Vectors.dense(7.0, 8.0), Vectors.dense(9.0, 10.0))))
val Y = new RowMatrix(sc.parallelize(Seq(Vectors.dense(11.0, 12.0), Vectors.dense(13.0, 14.0), Vectors.dense(15.0, 16.0), Vectors.dense(17.0, 18.0), Vectors.dense(19.0, 20.0))))

 val Z = computeMatrix(X,Y)

 def computeMatrix(W: RowMatrix, V: RowMatrix) : BDM[Double] = {
      val rows = W.rows.zip(V.rows).map(f =>
           DenseVector(f._1.toArray) *  DenseVector(f._2.toArray).t).reduce((x,y)=>x+y).mapValues(_).toDenseMatrix

}

RESULTS:

scala>   def computeWTV(W: RowMatrix, V: RowMatrix): BDM[Double] = {
     |     val rows = W.rows.zip(V.rows).map(f =>
     |       DenseVector(f._1.toArray) * DenseVector(f._2.toArray).t).reduce((x,y)=>x+y).mapValues(_).toDenseMatrix
     |   }
<console>:51: error: missing parameter type for expanded function ((x$1) => W.rows.zip(V.rows).map(((f) => DenseVector(f._1.toArray).$times(DenseVector(f._2.toArray).t))).reduce(((x, y) => x.$plus(y))).mapValues(x$1).toDenseMatrix)
             DenseVector(f._1.toArray) * DenseVector(f._2.toArray).t).reduce((x,y)=>x+y).mapValues(_).toDenseMatrix
                                                                                                   ^
<console>:51: error: not found: value DenseVector
             DenseVector(f._1.toArray) * DenseVector(f._2.toArray).t).reduce((x,y)=>x+y).mapValues(_).toDenseMatrix
             ^
<console>:52: error: type mismatch;
 found   : Unit
 required: breeze.linalg.DenseMatrix[Double]

How can I get the result to be BDM??? I am trying to find a way to get the results .. but I felt like I am missing something

Problems while extending Spark Apriori algorithms to get lift

I'm trying to extend the Apriori algos to print lift along with confidence. The program works when the data is small but when I run it on a big enough dataset it is taking a lot of time and the shuffle write memory is just growing and growing until the executors run out of memory. The DAG shows that the flatmap() operation on line 24 in CAssociationsRules.scala is hogging the memory. How can I improve the performance of the algorithm? The code : https://gist.github.com/masterlittle/a2b993cd4cc4aa201cdc6347e1fc7651 .

A brief synopsis of my algorithm : I create a map of items vs their occurrences in CFPGrowth.scala. I then converted it into a Broadcast variable and used it for lookup when the rule was being generated in CAssociationRules.scala. It is this step which is creating problems. I'm still a newbie to Spark so any help would be appreciated.

(2 node cluster with 4 cores and 8 partitions)

How to reduce shuffling and time taken by Spark while making a map of items?

I am using spark to read a csv file like this :

x, y, z
x, y
x
x, y, c, f
x, z

I want to make a map of items vs their count. This is the code I wrote :

private def genItemMap[Item: ClassTag](data: RDD[Array[Item]],     partitioner: HashPartitioner): mutable.Map[Item, Long] = {
    val immutableFreqItemsMap = data.flatMap(t => t)
      .map(v => (v, 1L))
      .reduceByKey(partitioner, _ + _)
      .collectAsMap()

    val freqItemsMap = mutable.Map(immutableFreqItemsMap.toSeq: _*)
    freqItemsMap
  }

When I run it, it is taking a lot of time and shuffle space. Is there a way to reduce the time?

I have a 2 node cluster with 2 cores each and 8 partitions. The number of lines in the csv file are 170000.

Apache Spark – MLlib – Matrix multiplication

I'm trying to use MLlib for matrix multiplication problem.

I am aware that Spark MLLib uses native libraries, which need to be present on the nodes. (that it does not come with spark installation).

So I already installed libgfortran library on all nodes (I did the same as Apache Spark -- MlLib -- Collaborative filtering)

But then I still encounter this error when running on a cluster.

Lost task 0.3 in stage 2.0 (TID 11, ibm-power-6.dima.tu-berlin.de): java.lang.UnsatisfiedLinkError: org.jblas.NativeBlas.dgemm(CCIIID[DII[DIID[DII)V
    at org.jblas.NativeBlas.dgemm(Native Method)
    at org.jblas.SimpleBlas.gemm(SimpleBlas.java:247)
    .....

How can I solve this error?

Spark-Submit Error :Name or service not known

I am using Amazon machine to run the pyspark code

code in pyspark shell:
a=open("test.txt")
s=sc.parallelize(a)
print(s.count())

Its working as i am not able to use directly sc.textFile("test.txt") due to some issues.

code in python file:

from pyspark import SparkContxt

sc=SparkContext()
with open("test.txt") as f:
s=sc.parallelize(f)
print(s.count())

when i try spark-submit test.py i am getting error Name or service not known

[email protected]:~/Deepak/projects$ spark-submit test1.py
16/06/12 03:44:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/12 03:44:59 ERROR : 10-0-0-32: 10-0-0-32: Name or service not known
java.net.UnknownHostException: 10-0-0-32: 10-0-0-32: Name or service not known
    at java.net.InetAddress.getLocalHost(InetAddress.java:1496)
    at tachyon.util.network.NetworkAddressUtils.getLocalIpAddress(NetworkAddressUtils.java:355)
    at tachyon.util.network.NetworkAddressUtils.getLocalHostName(NetworkAddressUtils.java:320)
    at tachyon.conf.TachyonConf.<init>(TachyonConf.java:122)
    at tachyon.conf.TachyonConf.<init>(TachyonConf.java:111)
    at tachyon.Version.<clinit>(Version.java:27)
    at tachyon.Constants.<clinit>(Constants.java:328)
    at tachyon.hadoop.AbstractTFS.<clinit>(AbstractTFS.java:63)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.lang.Class.newInstance(Class.java:383)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
    at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2364)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.spark.SparkContext.addFile(SparkContext.scala:1362)
    at org.apache.spark.SparkContext.addFile(SparkContext.scala:1340)
    at org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:491)
    at org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:491)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:491)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:214)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.UnknownHostException: 10-0-0-32: Name or service not known
    at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
    at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:922)
    at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1316)
    at java.net.InetAddress.getLocalHost(InetAddress.java:1492)
    ... 40 more
16/06/12 03:44:59 ERROR SparkContext: Error initializing SparkContext.
java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider tachyon.hadoop.TFS could not be instantiated
    at java.util.ServiceLoader.fail(ServiceLoader.java:224)
    at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
    at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2364)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.spark.SparkContext.addFile(SparkContext.scala:1362)
    at org.apache.spark.SparkContext.addFile(SparkContext.scala:1340)
    at org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:491)
    at org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:491)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:491)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:214)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ExceptionInInitializerError
    at tachyon.Constants.<clinit>(Constants.java:328)
    at tachyon.hadoop.AbstractTFS.<clinit>(AbstractTFS.java:63)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.lang.Class.newInstance(Class.java:383)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
    ... 27 more
Caused by: java.lang.RuntimeException: java.net.UnknownHostException: 10-0-0-32: 10-0-0-32: Name or service not known
    at org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
    at tachyon.util.network.NetworkAddressUtils.getLocalIpAddress(NetworkAddressUtils.java:398)
    at tachyon.util.network.NetworkAddressUtils.getLocalHostName(NetworkAddressUtils.java:320)
    at tachyon.conf.TachyonConf.<init>(TachyonConf.java:122)
    at tachyon.conf.TachyonConf.<init>(TachyonConf.java:111)
    at tachyon.Version.<clinit>(Version.java:27)
    ... 35 more
Caused by: java.net.UnknownHostException: 10-0-0-32: 10-0-0-32: Name or service not known
    at java.net.InetAddress.getLocalHost(InetAddress.java:1496)
    at tachyon.util.network.NetworkAddressUtils.getLocalIpAddress(NetworkAddressUtils.java:355)
    ... 39 more
Caused by: java.net.UnknownHostException: 10-0-0-32: Name or service not known
    at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
    at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:922)
    at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1316)
    at java.net.InetAddress.getLocalHost(InetAddress.java:1492)
    ... 40 more
16/06/12 03:44:59 WARN MetricsSystem: Stopping a MetricsSystem that is not running
Traceback (most recent call last):
File "/home/ubuntu/Deepak/projects/test1.py", line 2, in <module>
sc = SparkContext("local", "test1", pyFiles=['test1.py'])
File "/home/ubuntu/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/context.py", line 115, in __init__
File "/home/ubuntu/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/context.py", line 172, in _do_init
File "/home/ubuntu/spark-1.6.0-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/context.py", line 235, in _initialize_context
File "/home/ubuntu/spark-1.6.0-bin-hadoop2.4/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 1064, in __call__
File "/home/ubuntu/spark-1.6.0-bin-hadoop2.4/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider tachyon.hadoop.TFS could not be instantiated
    at java.util.ServiceLoader.fail(ServiceLoader.java:224)
    at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
    at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2364)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2375)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.spark.SparkContext.addFile(SparkContext.scala:1362)
    at org.apache.spark.SparkContext.addFile(SparkContext.scala:1340)
    at org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:491)
    at org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:491)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:491)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:214)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ExceptionInInitializerError
    at tachyon.Constants.<clinit>(Constants.java:328)
    at tachyon.hadoop.AbstractTFS.<clinit>(AbstractTFS.java:63)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.lang.Class.newInstance(Class.java:383)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
    ... 27 more
Caused by: java.lang.RuntimeException: java.net.UnknownHostException: 10-0-0-32: 10-0-0-32: Name or service not known
    at org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
    at tachyon.util.network.NetworkAddressUtils.getLocalIpAddress(NetworkAddressUtils.java:398)
    at tachyon.util.network.NetworkAddressUtils.getLocalHostName(NetworkAddressUtils.java:320)
    at tachyon.conf.TachyonConf.<init>(TachyonConf.java:122)
    at tachyon.conf.TachyonConf.<init>(TachyonConf.java:111)
    at tachyon.Version.<clinit>(Version.java:27)
    ... 35 more
Caused by: java.net.UnknownHostException: 10-0-0-32: 10-0-0-32: Name or service not known
    at java.net.InetAddress.getLocalHost(InetAddress.java:1496)
    at tachyon.util.network.NetworkAddressUtils.getLocalIpAddress(NetworkAddressUtils.java:355)
    ... 39 more
Caused by: java.net.UnknownHostException: 10-0-0-32: Name or service not known
    at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
    at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:922)
    at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1316)
    at java.net.InetAddress.getLocalHost(InetAddress.java:1492)
    ... 40 more

Apache Spark Worker Nodes unable to find my application classes

Using apache spark 1.6 in a standalone cluster mode with a Master and few Workers in a single machine having Windows 7 OS.

I created spark context within java application and wrote few classes (e.g. MyFunction which extends org.apache.spark.api.java.function.Function) in order to perform transformation on data. I called it like

javaRDD.map(new MyFunction());

Unfortunately, Spark Workers does not find MyFunction class and job terminated with ClassNotFoundException...

I did some research and found a method SparkConf.setJars(jars). So, i build my application into jar (myapp.jar) and placed it in a directory (e.g. D:/spark)

String[] jars = { "file:D:/spark/myappjar" };
sparkConf.setJars(jars);

But it leads to Exception:

2016-03-30 15:27:07 WARN  TaskSetManager:70 
- Lost task 0.0 in stage 0.0 (TID 0, alivia): 
java.lang.ClassNotFoundException: com.ai.cache.spark.NumericFieldsToVector
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:274)

Questions:

  1. How do i make the application's own class files available to Spark Worker from Spark Driver Program

  2. If i need to provide jar of my application to Spark Driver Program, How can i do that. (The sparkConf.setJars() mothod failed for me as described above)