Category Archives: apache-spark-sql

TypeError: withColumn() takes exactly 3 arguments (2 given)

I'm trying to create 2 columns in Databricks which are the result of substractin the values of 2 columns and adding the values of these 2 colums.

This is the code I've entered.

dfPrep = dfCleanYear.withColumn(df.withColumn("NuevaCol", df["AverageTemperature"] - df["AverageTemperatureUncertainty"])).withColumn(df.withColumn("NuevaCol", df["AverageTemperature"] + df["AverageTemperatureUncertainty"]))

And this the error.

TypeError: withColumn() takes exactly 3 arguments (2 given)

Would you know which argument is missing?


Apche spark case with multiple when clauses on different columns

Given the below structure:

val df = Seq("Color", "Shape", "Range","Size").map(Tuple1.apply).toDF("color")

val df1 = df.withColumn("Success", when($"color"<=> "white", "Diamond").otherwise(0))

I want to write one more WHEN condition at above where size > 10 and Shape column value is Rhombus then "Diamond" value should be inserted to the column else 0. I tried like below but it's failing

val df1 = df.withColumn("Success", when($"color" <=> "white", "Diamond").otherwise(0)).when($"size">10)

Please suggest me with only dataframe option with scala. Spark-SQL with sqlContext is not helpful idea for me.

Thanks ! Error: Permission Denied

I am using Spark v2.0 and trying to read a csv file using:"filepath")

But getting the below error:

java.lang.RuntimeException: java.lang.RuntimeException: Permission denied
  at org.apache.hadoop.hive.ql.session.SessionState.start(
  at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:171)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
  at java.lang.reflect.Constructor.newInstance(
  at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:258)
  at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:359)
  at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:263)
  at org.apache.spark.sql.hive.HiveSharedState.metadataHive$lzycompute(HiveSharedState.scala:39)
  at org.apache.spark.sql.hive.HiveSharedState.metadataHive(HiveSharedState.scala:38)
  at org.apache.spark.sql.hive.HiveSharedState.externalCatalog$lzycompute(HiveSharedState.scala:46)
  at org.apache.spark.sql.hive.HiveSharedState.externalCatalog(HiveSharedState.scala:45)
  at org.apache.spark.sql.hive.HiveSessionState.catalog$lzycompute(HiveSessionState.scala:50)
  at org.apache.spark.sql.hive.HiveSessionState.catalog(HiveSessionState.scala:48)
  at org.apache.spark.sql.hive.HiveSessionState$$anon$1.<init>(HiveSessionState.scala:63)
  at org.apache.spark.sql.hive.HiveSessionState.analyzer$lzycompute(HiveSessionState.scala:63)
  at org.apache.spark.sql.hive.HiveSessionState.analyzer(HiveSessionState.scala:62)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
  at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:382)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:143)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:401)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:342)
  ... 48 elided
Caused by: java.lang.RuntimeException: Permission denied
  at org.apache.hadoop.hive.ql.session.SessionState.start(
  ... 71 more
Caused by: Permission denied
  at Method)
  at org.apache.hadoop.hive.ql.session.SessionState.createTempFile(
  at org.apache.hadoop.hive.ql.session.SessionState.start(
  ... 71 more

I have also tried using .format("csv").csv("filepath"),but that is also giving same results.

Need to push the Dataframe 1 data to Empty Dataframe 2 data in apache spark using java

Here i need to push the particular data from dataframe1 to empty dataframe2, but i am facing trouble can any one please help me out. Below is the code.

 public class PrintValue {
                public static void main(String[] args) {
                System.setProperty("hadoop.home.dir", "C:\\winutils");
                JavaSparkContext sc = new JavaSparkContext(newSparkConf().setAppName("JoinFunctions").setMaster("local[*]"));
                SQLContext sqlContext = new SQLContext(sc);
                SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();

                JaroWinkler jw = new JaroWinkler();

                List<Row> data = Arrays.asList(
                RowFactory.create(1,"Hi I heard about Spark"),
                RowFactory.create(2,"I wish Java could use case classes"),

                StructType schema = new StructType(new StructField[] {
                new StructField("label", DataTypes.IntegerType, false,
                new StructField("sentence", DataTypes.StringType, false,
                Metadata.empty()) });
                Dataset<Row> DataFrame1 = spark.createDataFrame(data, schema);

                List<Row> data1 = Arrays.asList();
                StructType schema2 = new StructType(new StructField[] {
                new StructField("label2", DataTypes.IntegerType, false,Metadata.empty()),
                new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) });
                Dataset<Row> DataFrame1 = spark.createDataFrame(sc.emptyRDD(), schema2);

Expected Output is: DataFrame1 +-----+--------------------+ |label| sentence| +-----+--------------------+ | 1|Hi I heard about ...| | 2|I wish Java could...| | 3|Logistic,regressi...| +-----+--------------------+

DataFrame2 +-----+--------------------+ |label2| sentence2 | +-----+--------------------+ | | | | 2|I wish Java could...| | | | +-----+--------------------+

How to implement a generic Encoder in Spark 2.0?

I want to implement a generic encoder because I have a small project on Spark 1.6 and when I migrate it to the spark 2.0 its giving me the warning and error of

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

at many places hence I want to implement a generic encoder and put it in package object. I just want to know how can I implement a generic Encoder in spark 2.0 ?

Is it possible to perform JOIN operation on two DATAFILES in apache spark?

I am doing some work related to sort shuffling in Spark. I think one map task creates one datafile (data in form of serialized objects) and one indexfile (to point records of that datafile). I want to perform JOIN on two different datafiles (of two different map tasks). Is it possible to do that by changing internal code of Spark? Please help me. Thank you

Error in forEach Spark Scala : value select is not a member of org.apache.spark.sql.Row

I am trying to get avg of ratings of all json objects in a file. I loaded the file and converted to data frame but getting error while parsing for avg. Sample Request :

        "country": "France",
        "customerId": "France001",
        "visited": [
                "placeName": "US",
                "rating": "2.3",
                "famousRest": "N/A",
                "placeId": "AVBS34"

                "placeName": "US",
                "rating": "3.3",
                "famousRest": "SeriousPie",
                "placeId": "VBSs34"

                "placeName": "Canada",
                "rating": "4.3",
                "famousRest": "TimHortons",
                "placeId": "AVBv4d"


so for this json, US avg rating will be (2.3 + 3.3)/2 = 2.8

        "country": "Egypt",
        "customerId": "Egypt009",
        "visited": [
                "placeName": "US",
                "rating": "1.3",
                "famousRest": "McDonald",
                "placeId": "Dedcf3"

                "placeName": "US",
                "rating": "3.3",
                "famousRest": "EagleNest",
                "placeId": "CDfet3"



        "country": "Canada",
        "customerId": "Canada012",
        "visited": [
                "placeName": "UK",
                "rating": "3.3",
                "famousRest": "N/A",
                "placeId": "XSdce2"



for this avg for us= (3.3 +1.3)/2 = 2.3

so over all, the average rating will be : (2.8 + 2.3)/2 = 2.55 (only two requests have 'US' in their visited list)

My schema :

|-- country: string(nullable=true)
|-- customerId:string(nullable=true)
|-- visited: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |   |-- placeId: string (nullable = true)
|    |   |-- placeName: string (nullable = true) 
|    |   |-- famousRest: string (nullable = true)
|    |   |-- rating: string (nullable = true)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.jsonFile("temp.txt") 

When doing :

val app ="strategies"); app.registerTempTable("app"); app.printSchema();
  t =>"placeName", "rating").where(t("placeName") == "US")

I am getting : 
<console>:31: error: value select is not a member of org.apache.spark.sql.Row t =>"placeName", "rating").where(t("placeName") == "US") ^

Can someone tell me what I am doing wrong here ?