Category Archives: bigdata

Understanding combineByKey process in spark RDD

Here is my dataset:

val combineByKeyRdd = sc.parallelize(List(
                     ("key1", 1),
                     ("key2", 2),
                     ("key3", 39),
                     ("key1", 1),
                     ("key2", 2),
                     ("key4", 22)))  

and below is my combineByKey code:

combineByKeyRdd.combineByKey(
 |     (x:Int) => (x, 1),
 |     (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
 |     (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))  

Below is my understanding:

The first parameter i.e (x:Int) => (x, 1)-This function is invoked only once for every key.It creates the initial value of the accumulator

Second parameter i.e (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1) is for merging values. So what I understood is it will take every new key and it will add to the existing same key and its value and it will repeat till all the same keys are covered.So basically below is how I think it should be:

first combiner for key1 is initialized at 1 then below part  
mergeValue(accum[key1], 1) ==> accum[key1] = (1 + 1 (values), 1 + 1 (occurences/partitions))  

Third and the final part is of combining the combiners which adds values for that particular key:
For key1:

mergeCombiner((1, 1), (1, 1)) ==> (1 + 1, 1 + 1)  

final result would be in Array form:

Array[(String, (Int, Int))] = Array((key3,(39,1)), (key4,(22,1)), (key1,(2,2)),))  

So, am I correct conceptually?? I hope so that I am clear enough to express what I understood. If not please explain me in layman terms how it should be then. Thanks in advance.

Apache Ignite Affinity Collocation with Complex Join Conditions

I am using Ignite for a near real time application and I want to increase the performance of the queries I use. The query use SqlFieldsQuery and implies a complex join. for ex:

Select 
  col1 , col2 , col3 , col4 , .....
From 
  table1 A inner join table2 B
on 
  A.col5 = "subscribe" and A.col6="sell" and 
   ( 
     (A.col1 = B.col1 and B.col3="1")
     or (A.col2 = B.col1 and B.col3="2")
     or (A.col1 = B.col1 and B.col3="3")) .... etc.

1- The join here is not only using the same two columns to create affinity keys. What should I do? 2- Other than this, what are the different recommendations to increase the performance of ignite queries (specifically queries)?

Stability of the Apache Ambari REST-API [on hold]

I am starting to work with Apache Ambari and I have to realize a System to get a short Overview over all Ambari Instances in one place. My first approach is to connect to the Ambari-REST API and extract the relevant metadata. My colleagues told me, that the datamodel can be changed in the upgrade process of Ambari to a next Version. Now my Querstion, can I use the REST-API for my use-Case even if Ambari will be upgraded periodically.

Before I want to start programming, I want to weigh up all possible approaches.

I see forward for your Feedback

Best regards Dominik

Overview multiple Ambari instances

I am starting to work with Apache Ambari and I have to realize a System to get a short Overview over all Ambari Instances in one place. Ambari has a Dashboard to view the relevant metadata of the cluster, but in my environment I have multiple Instances. My first approach is to connect to the Ambari-REST API and extract the relevant metadata. This Approach is possible, but wasteful, if an existing System or Framework had realized this use-Case jet.

Before I want to start programming, I want to weigh up all possible approaches.

In following Metadata I am interested in:

  • Cluster-Name
  • Connected Hosts
  • Installed Services
  • Permission and Roles
  • Users
  • Jobs/Workflows
  • Alerts
  • Maybe Configurations

I see forward for your feedback

Best regards Dominik

How to know which stage of a job is currently running in Apache Spark?

Consider I have a job as follow ins Spark;

CSV File ==> Filter By A Column ==> Taking Sample ==> Save As JSON

Now my requirement is how do I know which step(Fetching file or Filtering or Sampling) of the job is currently executing programatically (Preferably using Java API)? Is there any way for this?

I can track Job,Stage and Task using SparkListener class. And it can be done like tracking a stage Id. But how to know which stage Id is for which step in the job chain.

Any idea please.

How to change the Block Placement policy in Hadoop

I am currently running a Hadoop cluster (taken from https://github.com/madiator/hadoop-20) where I am trying to modify the Block placement policy of Hadoop from the default placement policy (placing it randomly based on certain parameters using BlockPlacementPolicyDefault.java) to a deterministic placement that I am developing. I have been scanning the web to be able to do so and have come across multiple articles claiming that "the block placement policy can be modified by extending the BlockPlacementPolicy interface and pointing the class to the dfs.block.replicator.classname property in the Hadoop configuration files."

I have tried this method to no avail as I cannot find any specific documentation nor step by step guide to carry out this task properly. I have an overall high level view of Hadoop but this part requires the modification of the internals of this application.

It would be highly appreciated if someone can provide a set of step by step instructions or guide me to a resource that does so as this is a mammoth of a task without appropriate documentation.

I have also noticed that in the core package there is a class called PlacementMonitor.java which constantly checks the location of the blocks every 10 seconds and tries to call the move method in the class BlockMover.java to move the blocks for an optimal storage solution across nodes where the blocks belonging to the same rack do not get stored in the same node. Will this process try to compete with the modified Block Placement policy I will introduce (If I can figure this out...) ?

Any and all help will be highly appreciated.

Problems while extending Spark Apriori algorithms to get lift

I'm trying to extend the Apriori algos to print lift along with confidence. The program works when the data is small but when I run it on a big enough dataset it is taking a lot of time and the shuffle write memory is just growing and growing until the executors run out of memory. The DAG shows that the flatmap() operation on line 24 in CAssociationsRules.scala is hogging the memory. How can I improve the performance of the algorithm? The code : https://gist.github.com/masterlittle/a2b993cd4cc4aa201cdc6347e1fc7651 .

A brief synopsis of my algorithm : I create a map of items vs their occurrences in CFPGrowth.scala. I then converted it into a Broadcast variable and used it for lookup when the rule was being generated in CAssociationRules.scala. It is this step which is creating problems. I'm still a newbie to Spark so any help would be appreciated.

(2 node cluster with 4 cores and 8 partitions)

How Cassandra store data for materialized views

I am wondering what's the cost for the disk space for the materialized views?

If I have a base table with 10 fields, primary keys are f1, f2, f3. I create one materialized view from it, which include all the 10 fields, primary keys are f4, f1, f2, f3.

How much disk space the materialized view takes?

Almost same disk as the base table?

Or the materialized view only uses disk for its primary keys f4, f1, f2, f3.

I kind of think it's the first case. - as materialized view is implemented as a normal Cassandra table.

How to handle error and don’t commit when use Kafka Streams DSL

For Kafka Streams, if we use lower-level processor API, we can control to commit or not. So if problems happens in our code, and we don't want to commit this message. In this case, Kafka will redeliver this message multiple times until the problem gets fixed.

But how to control whether commit the message when use its higher-level stream DSL API?

Resources:

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

Can not start a job from java code in spark; Initial job has not accepted any resource

Hello my Spark configuration in JAVA is :

ss=SparkSession.builder()
    .config("spark.driver.host", "192.168.0.103")
    .config("spark.driver.port", "4040")
    .config("spark.dynamicAllocation.enabled", "false")
    .config("spark.cores.max","1")
    .config("spark.executor.memory","471859200")
    .config("spark.executor.cores","1")
    //.master("local[*]")
    .master("spark://kousik-pc:7077")
    .appName("abc")
    .getOrCreate();

Now when I am submitting any job from inside code(not submitting jar) I am getting the Warning:

TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

The spark UI is enter image description here

The worker that is in the screenshot is started from command:

~/spark/sbin/start-slave.sh

All the four jobs those are in waiting stage is submitted from java code. Tried all solutions from all sites. Any idea please.