Choose color scheme

Tag Archives: RDD

  • Advanced Example: Hadoop Map Reduce and Apache Spark

    Advanced Example: Hadoop Map Reduce and Apache Spark

    Objective

    Solve an advanced problem using Hadoop Map Reduce and demonstrate the same problem using Apache Spark.

     

    Hadoop Map Reduce Program

    http://blog.hampisoftware.com/?p=20

     

    Apache Spark based solution

    http://blog.hampisoftware.com/?p=41

     

    Differences between the two solutions

    Boilerplate code

    In the map reduce code, you have to write a mapper class and a reducer class.

    In Spark, you do not have to write a lot of boilerplate code.  If you use Java 8, then you can use Lambda expressions to cut the boilerplate code further.

    In Spark, there is no need for a mapper class or a reducer class. You can perform transformations such as map, flatMap, mapToPair, reduceByKey etc on a RDD.

     

    Performance

    The Spark based solution is very fast compared to the map reduce solution. Around 45secs for 16 million records in a virtual machine sandbox.

     

    Scala

    If you write Scala code, then the Apache Spark code can be further simplified into few lines of code. In this post, we have solved the patent citation problem using Java.

  • Advanced Spark Java Example: Patent Citation

    Background

    If you have not reviewed the earlier post on Hadoop Map Reduce example with Patent Citation, now is the time. That post will provide the problem context.

    In this post, we will use Apache Spark to solve the reverse patent citation problem.

    Environment Setup

    Just like the map reduce exercise we had, use the latest sandbox from one of the Hadoop vendors such as Hortonworks or Cloudera or MapR. They should have Spark integrated.

    Add the input file into HDFS as described in the map reduce post. The file should be placed in an hdfs directory called as /user/captain/input.

    Program in Apache Spark using Java

    package spark;
    import org.apache.spark.SparkConf; 
    import  org.apache.spark.api.java.JavaPairRDD; 
    import  org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext; 
    import org.apache.spark.api.java.function.Function; 
    import org.apache.spark.api.java.function.Function2; 
    import  org.apache.spark.api.java.function.PairFunction; 
    import scala.Tuple2; 
    
    public class  PatentCitation { 
    
      public static void  main(String[] args){ 
        String appName =  "Patent Citation" ;      
        SparkConf sparkConf =  new  SparkConf().setAppName(appName);  
        JavaSparkContext javaSparkContext =  new   JavaSparkContext((sparkConf));    
    
        JavaRDD<String> patentCitationFile = javaSparkContext.textFile(args[ 0 ]);     
    
        //Create a mapper to do reverse key value pair
        JavaPairRDD<String,String> lines = patentCitationFile.mapToPair( new  PairFunction<String, String, String>() { 
       @Override 
       public Tuple2<String, String> call(String s) throws Exception {           
         String[] pair = s.split( "," ); 
         return new  Tuple2<String, String>(pair[ 1 ], pair[ 0 ]); 
    } }); 
    
        //Let us reduce this 
        JavaPairRDD<String,String> cited = lines.reduceByKey( new Function2<String, String, String>() {           
        @Override 
        public  String call(String s1, String s2)  throws  Exception { 
         return s1 +  ","  + s2; 
       } }).sortByKey(); 
       
       //Format for storing 
       JavaRDD<String> formatted = cited.map( new  Function<Tuple2<String,String>, String>() 
      {
    
        @Override 
        public String call(Tuple2<String, String> theTuple) throws  Exception { 
           return  theTuple._1() +  " "  + theTuple._2();
            } 
          });
         //Store the formatted RDD in the text file</span></pre>
         formatted.saveAsTextFile(args[1]);
     }
    }
    
    

    Description of the program

    In the first step, we create a JavaPairRDD to read the input file – line by line. We split at a comma. So we are provided with 2 string variables. The first variable represents the Citing Patent and the second variable represents the Cited Patent.

    Since our problem is to find the reverse. All the citing patents for a particular patent, we create a tuple that is the reverse.

    In the second step, we reduce the RDD by key. Now the RDD gets two string values for a key. We just separate these values with a comma.

    Now our RDD is ready for storage. The challenge is that storing the tuple in a file will yield parentheses at the beginning and end of the line.  So we run another transformation on the RDD – format.

    Finally we store the RDD in a file.

     

    Program Run

    [sandbox spark]# spark/bin/spark-submit –class spark.PatentCitation –master local –executor-memory 1G –num-executors 10 patentcitation_spark.jar /user/captain/input/cite75_99.txt /user/captain/output

     

    15/09/15 19:18:45 INFO SparkContext: Running Spark version 1.3.1

    15/09/15 19:18:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable

    15/09/15 19:18:45 INFO SecurityManager: Changing view acls to: root

    15/09/15 19:18:45 INFO SecurityManager: Changing modify acls to: root

    15/09/15 19:18:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)

    15/09/15 19:18:45 INFO Slf4jLogger: Slf4jLogger started

    15/09/15 19:18:45 INFO Remoting: Starting remoting

    15/09/15 19:18:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@sandbox:36543]

    15/09/15 19:18:46 INFO Utils: Successfully started service ‘sparkDriver’ on port 36543.

    15/09/15 19:18:46 INFO SparkEnv: Registering MapOutputTracker

    15/09/15 19:18:46 INFO SparkEnv: Registering BlockManagerMaster

    15/09/15 19:18:46 INFO DiskBlockManager: Created local directory at /tmp/spark-c1f7c70b-349d-4984-b6bb-544080fc4d07/blockmgr-bc783dc8-c929-422c-bbae-1759e45dc59e

    15/09/15 19:18:46 INFO MemoryStore: MemoryStore started with capacity 265.4 MB

    15/09/15 19:18:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e0895997-d0e1-4dfe-9fe6-01df51e9a65c/httpd-6a121c63-3f76-47cd-b083-77d722fd95bf

    15/09/15 19:18:46 INFO HttpServer: Starting HTTP Server

    15/09/15 19:18:46 INFO Server: jetty-8.y.z-SNAPSHOT

    15/09/15 19:18:46 INFO AbstractConnector: Started SocketConnector@0.0.0.0:33490

    15/09/15 19:18:46 INFO Utils: Successfully started service ‘HTTP file server’ on port 33490.

    15/09/15 19:18:46 INFO SparkEnv: Registering OutputCommitCoordinator

    15/09/15 19:18:46 INFO Server: jetty-8.y.z-SNAPSHOT

    15/09/15 19:18:46 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040

    15/09/15 19:18:46 INFO Utils: Successfully started service ‘SparkUI’ on port 4040.

    15/09/15 19:18:46 INFO SparkUI: Started SparkUI at http://sandbox:4040

    15/09/15 19:18:46 INFO SparkContext: Added JAR file:/root/anil/spark/patentcitation_spark.jar at http://192.168.112.132:33490/jars/patentcitation_spark.jar with timestamp 1442344726418

    15/09/15 19:18:46 INFO Executor: Starting executor ID <driver> on host localhost

    15/09/15 19:18:46 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@sandbox:36543/user/HeartbeatReceiver

    15/09/15 19:18:46 INFO NettyBlockTransferService: Server created on 56529

    15/09/15 19:18:46 INFO BlockManagerMaster: Trying to register BlockManager

    15/09/15 19:18:46 INFO BlockManagerMasterActor: Registering block manager localhost:56529 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 56529)

    15/09/15 19:18:46 INFO BlockManagerMaster: Registered BlockManager

    15/09/15 19:18:46 INFO MemoryStore: ensureFreeSpace(169818) called with curMem=0, maxMem=278302556

    15/09/15 19:18:46 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 165.8 KB, free 265.2 MB)

    15/09/15 19:18:47 INFO MemoryStore: ensureFreeSpace(34017) called with curMem=169818, maxMem=278302556

    15/09/15 19:18:47 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 33.2 KB, free 265.2 MB)

    15/09/15 19:18:47 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:56529 (size: 33.2 KB, free: 265.4 MB)

    15/09/15 19:18:47 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0

    15/09/15 19:18:47 INFO SparkContext: Created broadcast 0 from textFile at PatentCitation.java:18

    15/09/15 19:18:47 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

    15/09/15 19:18:47 INFO FileInputFormat: Total input paths to process : 1

    15/09/15 19:18:47 INFO SparkContext: Starting job: sortByKey at PatentCitation.java:28

    15/09/15 19:18:47 INFO DAGScheduler: Registering RDD 2 (mapToPair at PatentCitation.java:19)

    15/09/15 19:18:47 INFO DAGScheduler: Got job 0 (sortByKey at PatentCitation.java:28) with 2 output partitions (allowLocal=false)

    15/09/15 19:18:47 INFO DAGScheduler: Final stage: Stage 1(sortByKey at PatentCitation.java:28)

    15/09/15 19:18:47 INFO DAGScheduler: Parents of final stage: List(Stage 0)

    15/09/15 19:18:47 INFO DAGScheduler: Missing parents: List(Stage 0)

    15/09/15 19:18:47 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at mapToPair at PatentCitation.java:19), which has no missing parents

    15/09/15 19:18:47 INFO MemoryStore: ensureFreeSpace(3736) called with curMem=203835, maxMem=278302556

    15/09/15 19:18:47 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.6 KB, free 265.2 MB)

    15/09/15 19:18:47 INFO MemoryStore: ensureFreeSpace(2723) called with curMem=207571, maxMem=278302556

    15/09/15 19:18:47 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 265.2 MB)

    15/09/15 19:18:47 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:56529 (size: 2.7 KB, free: 265.4 MB)

    15/09/15 19:18:47 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0

    15/09/15 19:18:47 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839

    15/09/15 19:18:47 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at mapToPair at PatentCitation.java:19)

    15/09/15 19:18:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks

    15/09/15 19:18:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 1389 bytes)

    15/09/15 19:18:47 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

    15/09/15 19:18:47 INFO Executor: Fetching http://192.168.112.132:33490/jars/patentcitation_spark.jar with timestamp 1442344726418

    15/09/15 19:18:47 INFO Utils: Fetching http://192.168.112.132:33490/jars/patentcitation_spark.jar to /tmp/spark-d5e8a049-e4b6-45d9-b533-6e7c1295ed19/userFiles-2ba84816-9443-4d34-b8f4-360e6a69aeaf/fetchFileTemp4125688246233447293.tmp

    15/09/15 19:18:47 INFO Executor: Adding file:/tmp/spark-d5e8a049-e4b6-45d9-b533-6e7c1295ed19/userFiles-2ba84816-9443-4d34-b8f4-360e6a69aeaf/patentcitation_spark.jar to class loader

    15/09/15 19:18:47 INFO HadoopRDD: Input split: hdfs://sandbox:8020/user/captain/input/cite75_99.txt:0+134217728

    15/09/15 19:18:47 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

    15/09/15 19:18:47 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

    15/09/15 19:18:47 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

    15/09/15 19:18:47 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

    15/09/15 19:18:47 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

    15/09/15 19:18:49 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:18:51 INFO ExternalSorter: Thread 65 spilling in-memory map of 86.6 MB to disk (2 times so far)

    15/09/15 19:18:53 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:18:55 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

    15/09/15 19:18:57 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (5 times so far)

    15/09/15 19:19:00 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.6 MB to disk (6 times so far)

    15/09/15 19:19:02 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.1 MB to disk (7 times so far)

    15/09/15 19:19:04 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (8 times so far)

    15/09/15 19:19:06 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (9 times so far)

    15/09/15 19:19:22 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2004 bytes result sent to driver

    15/09/15 19:19:22 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, ANY, 1389 bytes)

    15/09/15 19:19:22 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)

    15/09/15 19:19:22 INFO HadoopRDD: Input split: hdfs://sandbox:8020/user/captain/input/cite75_99.txt:134217728+129857703

    15/09/15 19:19:22 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 34712 ms on localhost (1/2)

    15/09/15 19:19:23 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:19:25 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

    15/09/15 19:19:28 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:19:31 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.1 MB to disk (4 times so far)

    15/09/15 19:19:33 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (5 times so far)

    15/09/15 19:19:35 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.1 MB to disk (6 times so far)

    15/09/15 19:19:37 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (7 times so far)

    15/09/15 19:19:40 INFO ExternalSorter: Thread 65 spilling in-memory map of 87.4 MB to disk (8 times so far)

    15/09/15 19:19:42 INFO ExternalSorter: Thread 65 spilling in-memory map of 84.2 MB to disk (9 times so far)

    15/09/15 19:19:53 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2004 bytes result sent to driver

    15/09/15 19:19:53 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 30861 ms on localhost (2/2)

    15/09/15 19:19:53 INFO DAGScheduler: Stage 0 (mapToPair at PatentCitation.java:19) finished in 65.576 s

    15/09/15 19:19:53 INFO DAGScheduler: looking for newly runnable stages

    15/09/15 19:19:53 INFO DAGScheduler: running: Set()

    15/09/15 19:19:53 INFO DAGScheduler: waiting: Set(Stage 1)

    15/09/15 19:19:53 INFO DAGScheduler: failed: Set()

    15/09/15 19:19:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

    15/09/15 19:19:53 INFO DAGScheduler: Missing parents for Stage 1: List()

    15/09/15 19:19:53 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[5] at sortByKey at PatentCitation.java:28), which is now runnable

    15/09/15 19:19:53 INFO MemoryStore: ensureFreeSpace(3312) called with curMem=210294, maxMem=278302556

    15/09/15 19:19:53 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.2 KB, free 265.2 MB)

    15/09/15 19:19:53 INFO MemoryStore: ensureFreeSpace(2378) called with curMem=213606, maxMem=278302556

    15/09/15 19:19:53 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.3 KB, free 265.2 MB)

    15/09/15 19:19:53 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:56529 (size: 2.3 KB, free: 265.4 MB)

    15/09/15 19:19:53 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0

    15/09/15 19:19:53 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839

    15/09/15 19:19:53 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (MapPartitionsRDD[5] at sortByKey at PatentCitation.java:28)

    15/09/15 19:19:53 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks

    15/09/15 19:19:53 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 1124 bytes)

    15/09/15 19:19:53 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)

    15/09/15 19:19:53 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:19:53 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms

    15/09/15 19:19:54 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:19:56 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.7 MB to disk (2 times so far)

    15/09/15 19:19:59 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:20:01 INFO BlockManager: Removing broadcast 1

    15/09/15 19:20:01 INFO BlockManager: Removing block broadcast_1_piece0

    15/09/15 19:20:01 INFO MemoryStore: Block broadcast_1_piece0 of size 2723 dropped from memory (free 278089295)

    15/09/15 19:20:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:56529 in memory (size: 2.7 KB, free: 265.4 MB)

    15/09/15 19:20:01 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0

    15/09/15 19:20:01 INFO BlockManager: Removing block broadcast_1

    15/09/15 19:20:01 INFO MemoryStore: Block broadcast_1 of size 3736 dropped from memory (free 278093031)

    15/09/15 19:20:01 INFO ContextCleaner: Cleaned broadcast 1

    15/09/15 19:20:01 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

    15/09/15 19:20:06 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1682 bytes result sent to driver

    15/09/15 19:20:06 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1124 bytes)

    15/09/15 19:20:06 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)

    15/09/15 19:20:06 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:20:06 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

    15/09/15 19:20:06 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 13527 ms on localhost (1/2)

    15/09/15 19:20:07 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:20:10 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

    15/09/15 19:20:12 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:20:19 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1685 bytes result sent to driver

    15/09/15 19:20:19 INFO DAGScheduler: Stage 1 (sortByKey at PatentCitation.java:28) finished in 25.877 s

    15/09/15 19:20:19 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 12352 ms on localhost (2/2)

    15/09/15 19:20:19 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

    15/09/15 19:20:19 INFO DAGScheduler: Job 0 finished: sortByKey at PatentCitation.java:28, took 91.526647 s

    15/09/15 19:20:19 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

    15/09/15 19:20:19 INFO SparkContext: Starting job: saveAsTextFile at PatentCitation.java:43

    15/09/15 19:20:19 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 157 bytes

    15/09/15 19:20:19 INFO DAGScheduler: Registering RDD 3 (reduceByKey at PatentCitation.java:28)

    15/09/15 19:20:19 INFO DAGScheduler: Got job 1 (saveAsTextFile at PatentCitation.java:43) with 2 output partitions (allowLocal=false)

    15/09/15 19:20:19 INFO DAGScheduler: Final stage: Stage 4(saveAsTextFile at PatentCitation.java:43)

    15/09/15 19:20:19 INFO DAGScheduler: Parents of final stage: List(Stage 3)

    15/09/15 19:20:19 INFO DAGScheduler: Missing parents: List(Stage 3)

    15/09/15 19:20:19 INFO DAGScheduler: Submitting Stage 3 (ShuffledRDD[3] at reduceByKey at PatentCitation.java:28), which has no missing parents

    15/09/15 19:20:19 INFO MemoryStore: ensureFreeSpace(3128) called with curMem=209525, maxMem=278302556

    15/09/15 19:20:19 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.1 KB, free 265.2 MB)

    15/09/15 19:20:19 INFO MemoryStore: ensureFreeSpace(2252) called with curMem=212653, maxMem=278302556

    15/09/15 19:20:19 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.2 KB, free 265.2 MB)

    15/09/15 19:20:19 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:56529 (size: 2.2 KB, free: 265.4 MB)

    15/09/15 19:20:19 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0

    15/09/15 19:20:19 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:839

    15/09/15 19:20:19 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3 (ShuffledRDD[3] at reduceByKey at PatentCitation.java:28)

    15/09/15 19:20:19 INFO TaskSchedulerImpl: Adding task set 3.0 with 2 tasks

    15/09/15 19:20:19 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 4, localhost, PROCESS_LOCAL, 1113 bytes)

    15/09/15 19:20:19 INFO Executor: Running task 0.0 in stage 3.0 (TID 4)

    15/09/15 19:20:19 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:20:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

    15/09/15 19:20:21 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:20:23 INFO BlockManager: Removing broadcast 2

    15/09/15 19:20:23 INFO BlockManager: Removing block broadcast_2

    15/09/15 19:20:23 INFO MemoryStore: Block broadcast_2 of size 3312 dropped from memory (free 278090963)

    15/09/15 19:20:23 INFO BlockManager: Removing block broadcast_2_piece0

    15/09/15 19:20:23 INFO MemoryStore: Block broadcast_2_piece0 of size 2378 dropped from memory (free 278093341)

    15/09/15 19:20:23 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:56529 in memory (size: 2.3 KB, free: 265.4 MB)

    15/09/15 19:20:23 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0

    15/09/15 19:20:23 INFO ContextCleaner: Cleaned broadcast 2

    15/09/15 19:20:23 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.7 MB to disk (2 times so far)

    15/09/15 19:20:25 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:20:27 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

    15/09/15 19:20:35 INFO Executor: Finished task 0.0 in stage 3.0 (TID 4). 1098 bytes result sent to driver

    15/09/15 19:20:35 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 5, localhost, PROCESS_LOCAL, 1113 bytes)

    15/09/15 19:20:35 INFO Executor: Running task 1.0 in stage 3.0 (TID 5)

    15/09/15 19:20:35 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 4) in 16394 ms on localhost (1/2)

    15/09/15 19:20:35 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:20:35 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

    15/09/15 19:20:37 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:20:39 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

    15/09/15 19:20:41 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:20:50 INFO Executor: Finished task 1.0 in stage 3.0 (TID 5). 1098 bytes result sent to driver

    15/09/15 19:20:50 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 5) in 14507 ms on localhost (2/2)

    15/09/15 19:20:50 INFO DAGScheduler: Stage 3 (reduceByKey at PatentCitation.java:28) finished in 30.901 s

    15/09/15 19:20:50 INFO DAGScheduler: looking for newly runnable stages

    15/09/15 19:20:50 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool

    15/09/15 19:20:50 INFO DAGScheduler: running: Set()

    15/09/15 19:20:50 INFO DAGScheduler: waiting: Set(Stage 4)

    15/09/15 19:20:50 INFO DAGScheduler: failed: Set()

    15/09/15 19:20:50 INFO DAGScheduler: Missing parents for Stage 4: List()

    15/09/15 19:20:50 INFO DAGScheduler: Submitting Stage 4 (MapPartitionsRDD[8] at saveAsTextFile at PatentCitation.java:43), which is now runnable

    15/09/15 19:20:50 INFO MemoryStore: ensureFreeSpace(152496) called with curMem=209215, maxMem=278302556

    15/09/15 19:20:50 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 148.9 KB, free 265.1 MB)

    15/09/15 19:20:50 INFO MemoryStore: ensureFreeSpace(97600) called with curMem=361711, maxMem=278302556

    15/09/15 19:20:50 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 95.3 KB, free 265.0 MB)

    15/09/15 19:20:50 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:56529 (size: 95.3 KB, free: 265.3 MB)

    15/09/15 19:20:50 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0

    15/09/15 19:20:50 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:839

    15/09/15 19:20:50 INFO DAGScheduler: Submitting 2 missing tasks from Stage 4 (MapPartitionsRDD[8] at saveAsTextFile at PatentCitation.java:43)

    15/09/15 19:20:50 INFO TaskSchedulerImpl: Adding task set 4.0 with 2 tasks

    15/09/15 19:20:50 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 6, localhost, PROCESS_LOCAL, 1124 bytes)

    15/09/15 19:20:50 INFO Executor: Running task 0.0 in stage 4.0 (TID 6)

    15/09/15 19:20:50 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:20:50 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

    15/09/15 19:20:51 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:20:52 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.4 MB to disk (2 times so far)

    15/09/15 19:20:53 INFO ExternalSorter: Thread 65 spilling in-memory map of 84.2 MB to disk (3 times so far)

    15/09/15 19:20:54 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

    15/09/15 19:20:55 INFO BlockManager: Removing broadcast 3

    15/09/15 19:20:55 INFO BlockManager: Removing block broadcast_3_piece0

    15/09/15 19:20:55 INFO MemoryStore: Block broadcast_3_piece0 of size 2252 dropped from memory (free 277845497)

    15/09/15 19:20:55 INFO BlockManagerInfo: Removed broadcast_3_piece0 on localhost:56529 in memory (size: 2.2 KB, free: 265.3 MB)

    15/09/15 19:20:55 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0

    15/09/15 19:20:55 INFO BlockManager: Removing block broadcast_3

    15/09/15 19:20:55 INFO MemoryStore: Block broadcast_3 of size 3128 dropped from memory (free 277848625)

    15/09/15 19:20:55 INFO ContextCleaner: Cleaned broadcast 3

    15/09/15 19:20:57 INFO FileOutputCommitter: Saved output of task ‘attempt_201509151920_0004_m_000000_6’ to hdfs://sandbox:8020/user/captain/output/_temporary/0/task_201509151920_0004_m_000000

    15/09/15 19:20:57 INFO SparkHadoopMapRedUtil: attempt_201509151920_0004_m_000000_6: Committed

    15/09/15 19:20:57 INFO Executor: Finished task 0.0 in stage 4.0 (TID 6). 1828 bytes result sent to driver

    15/09/15 19:20:57 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 7, localhost, PROCESS_LOCAL, 1124 bytes)

    15/09/15 19:20:57 INFO Executor: Running task 1.0 in stage 4.0 (TID 7)

    15/09/15 19:20:57 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 6) in 7135 ms on localhost (1/2)

    15/09/15 19:20:57 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:20:57 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

    15/09/15 19:20:58 INFO ExternalSorter: Thread 65 spilling in-memory map of 84.8 MB to disk (1 time so far)

    15/09/15 19:20:58 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

    15/09/15 19:20:59 INFO ExternalSorter: Thread 65 spilling in-memory map of 87.6 MB to disk (3 times so far)

    15/09/15 19:21:01 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

    15/09/15 19:21:02 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

    15/09/15 19:21:05 INFO FileOutputCommitter: Saved output of task ‘attempt_201509151920_0004_m_000001_7’ to hdfs://sandbox:8020/user/captain/output/_temporary/0/task_201509151920_0004_m_000001

    15/09/15 19:21:05 INFO SparkHadoopMapRedUtil: attempt_201509151920_0004_m_000001_7: Committed

    15/09/15 19:21:05 INFO Executor: Finished task 1.0 in stage 4.0 (TID 7). 1828 bytes result sent to driver

    15/09/15 19:21:05 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 7) in 8171 ms on localhost (2/2)

    15/09/15 19:21:05 INFO DAGScheduler: Stage 4 (saveAsTextFile at PatentCitation.java:43) finished in 15.306 s

    15/09/15 19:21:05 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool

    15/09/15 19:21:05 INFO DAGScheduler: Job 1 finished: saveAsTextFile at PatentCitation.java:43, took 46.343546 s

     

     

    Results

    [sandbox spark]# vi part-00000

    “CITED” “CITING”

    1 3964859,4647229

    10000 4539112

    100000 5031388

    1000006 4714284

    1000007 4766693

    1000011 5033339

    1000017 3908629

    1000026 4043055

    1000033 4975983,4190903

    1000043 4091523

    1000044 4055371,4082383

    1000045 4290571

    1000046 5918892,5525001

    1000049 5996916

    1000051 4541310

    1000054 4946631

    1000065 4748968

    1000067 5312208,4944640,5071294

    1000070 4928425,5009029

    1000073 5474494,4107819

    1000076 5845593,4867716

    1000083 5322091,5566726

    1000084 4683770,4182197

    1000086 4217220,4686189,4178246,4839046

    1000089 5277853,5571464,5807591,5395228,5503546,5505607,5505610,5505611,5540869,5544405

    1000094 4897975,4920718,5713167

    10001 4228735

    1000102 5120183,5791855

    1000108 4399627

    1000118 5029926,5058767,4919466

    100012 4641675,4838290

    1000120 4546821

    1000122 4048703,4297867

    1000125 4573823

    1000129 4748545

    1000133 5809659,4922621

    1000134 4977842,4175499,4373460

    100014 5179741

    1000145 3961423

    1000149 5876147,5193932

    1000150 4217940,4271874

    1000152 3962981,4032016,4269599

    1000168 4479408

    1000172 4865014,4974551,4653469

    1000173 5303740,5303742,4312704,4346730

    1000178 4830222

    1000179 5182923

    1000181 5185948

    1000185 4082391,3871143,5793021

    1000187 4470573

    1000188 4134380,4023548

    1000193 5092745

    1000195 4922643

    1000199 4166638,4846484,4664399

    1000201 4697655

    1000203 4541584