Background

If you have not reviewed the earlier post on Hadoop Map Reduce example with Patent Citation, now is the time. That post will provide the problem context.

In this post, we will use Apache Spark to solve the reverse patent citation problem.

Environment Setup

Just like the map reduce exercise we had, use the latest sandbox from one of the Hadoop vendors such as Hortonworks or Cloudera or MapR. They should have Spark integrated.

Add the input file into HDFS as described in the map reduce post. The file should be placed in an hdfs directory called as /user/captain/input.

Program in Apache Spark using Java

package spark;
import org.apache.spark.SparkConf; 
import  org.apache.spark.api.java.JavaPairRDD; 
import  org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import  org.apache.spark.api.java.function.PairFunction; 
import scala.Tuple2; 

public class  PatentCitation { 

  public static void  main(String[] args){ 
    String appName =  "Patent Citation" ;      
    SparkConf sparkConf =  new  SparkConf().setAppName(appName);  
    JavaSparkContext javaSparkContext =  new   JavaSparkContext((sparkConf));    

    JavaRDD<String> patentCitationFile = javaSparkContext.textFile(args[ 0 ]);     

    //Create a mapper to do reverse key value pair
    JavaPairRDD<String,String> lines = patentCitationFile.mapToPair( new  PairFunction<String, String, String>() { 
   @Override 
   public Tuple2<String, String> call(String s) throws Exception {           
     String[] pair = s.split( "," ); 
     return new  Tuple2<String, String>(pair[ 1 ], pair[ 0 ]); 
} }); 

    //Let us reduce this 
    JavaPairRDD<String,String> cited = lines.reduceByKey( new Function2<String, String, String>() {           
    @Override 
    public  String call(String s1, String s2)  throws  Exception { 
     return s1 +  ","  + s2; 
   } }).sortByKey(); 
   
   //Format for storing 
   JavaRDD<String> formatted = cited.map( new  Function<Tuple2<String,String>, String>() 
  {

    @Override 
    public String call(Tuple2<String, String> theTuple) throws  Exception { 
       return  theTuple._1() +  " "  + theTuple._2();
        } 
      });
     //Store the formatted RDD in the text file</span></pre>
     formatted.saveAsTextFile(args[1]);
 }
}

Description of the program

In the first step, we create a JavaPairRDD to read the input file – line by line. We split at a comma. So we are provided with 2 string variables. The first variable represents the Citing Patent and the second variable represents the Cited Patent.

Since our problem is to find the reverse. All the citing patents for a particular patent, we create a tuple that is the reverse.

In the second step, we reduce the RDD by key. Now the RDD gets two string values for a key. We just separate these values with a comma.

Now our RDD is ready for storage. The challenge is that storing the tuple in a file will yield parentheses at the beginning and end of the line.  So we run another transformation on the RDD – format.

Finally we store the RDD in a file.

 

Program Run

[sandbox spark]# spark/bin/spark-submit –class spark.PatentCitation –master local –executor-memory 1G –num-executors 10 patentcitation_spark.jar /user/captain/input/cite75_99.txt /user/captain/output

 

15/09/15 19:18:45 INFO SparkContext: Running Spark version 1.3.1

15/09/15 19:18:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable

15/09/15 19:18:45 INFO SecurityManager: Changing view acls to: root

15/09/15 19:18:45 INFO SecurityManager: Changing modify acls to: root

15/09/15 19:18:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)

15/09/15 19:18:45 INFO Slf4jLogger: Slf4jLogger started

15/09/15 19:18:45 INFO Remoting: Starting remoting

15/09/15 19:18:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@sandbox:36543]

15/09/15 19:18:46 INFO Utils: Successfully started service ‘sparkDriver’ on port 36543.

15/09/15 19:18:46 INFO SparkEnv: Registering MapOutputTracker

15/09/15 19:18:46 INFO SparkEnv: Registering BlockManagerMaster

15/09/15 19:18:46 INFO DiskBlockManager: Created local directory at /tmp/spark-c1f7c70b-349d-4984-b6bb-544080fc4d07/blockmgr-bc783dc8-c929-422c-bbae-1759e45dc59e

15/09/15 19:18:46 INFO MemoryStore: MemoryStore started with capacity 265.4 MB

15/09/15 19:18:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e0895997-d0e1-4dfe-9fe6-01df51e9a65c/httpd-6a121c63-3f76-47cd-b083-77d722fd95bf

15/09/15 19:18:46 INFO HttpServer: Starting HTTP Server

15/09/15 19:18:46 INFO Server: jetty-8.y.z-SNAPSHOT

15/09/15 19:18:46 INFO AbstractConnector: Started SocketConnector@0.0.0.0:33490

15/09/15 19:18:46 INFO Utils: Successfully started service ‘HTTP file server’ on port 33490.

15/09/15 19:18:46 INFO SparkEnv: Registering OutputCommitCoordinator

15/09/15 19:18:46 INFO Server: jetty-8.y.z-SNAPSHOT

15/09/15 19:18:46 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040

15/09/15 19:18:46 INFO Utils: Successfully started service ‘SparkUI’ on port 4040.

15/09/15 19:18:46 INFO SparkUI: Started SparkUI at http://sandbox:4040

15/09/15 19:18:46 INFO SparkContext: Added JAR file:/root/anil/spark/patentcitation_spark.jar at http://192.168.112.132:33490/jars/patentcitation_spark.jar with timestamp 1442344726418

15/09/15 19:18:46 INFO Executor: Starting executor ID <driver> on host localhost

15/09/15 19:18:46 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@sandbox:36543/user/HeartbeatReceiver

15/09/15 19:18:46 INFO NettyBlockTransferService: Server created on 56529

15/09/15 19:18:46 INFO BlockManagerMaster: Trying to register BlockManager

15/09/15 19:18:46 INFO BlockManagerMasterActor: Registering block manager localhost:56529 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 56529)

15/09/15 19:18:46 INFO BlockManagerMaster: Registered BlockManager

15/09/15 19:18:46 INFO MemoryStore: ensureFreeSpace(169818) called with curMem=0, maxMem=278302556

15/09/15 19:18:46 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 165.8 KB, free 265.2 MB)

15/09/15 19:18:47 INFO MemoryStore: ensureFreeSpace(34017) called with curMem=169818, maxMem=278302556

15/09/15 19:18:47 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 33.2 KB, free 265.2 MB)

15/09/15 19:18:47 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:56529 (size: 33.2 KB, free: 265.4 MB)

15/09/15 19:18:47 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0

15/09/15 19:18:47 INFO SparkContext: Created broadcast 0 from textFile at PatentCitation.java:18

15/09/15 19:18:47 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

15/09/15 19:18:47 INFO FileInputFormat: Total input paths to process : 1

15/09/15 19:18:47 INFO SparkContext: Starting job: sortByKey at PatentCitation.java:28

15/09/15 19:18:47 INFO DAGScheduler: Registering RDD 2 (mapToPair at PatentCitation.java:19)

15/09/15 19:18:47 INFO DAGScheduler: Got job 0 (sortByKey at PatentCitation.java:28) with 2 output partitions (allowLocal=false)

15/09/15 19:18:47 INFO DAGScheduler: Final stage: Stage 1(sortByKey at PatentCitation.java:28)

15/09/15 19:18:47 INFO DAGScheduler: Parents of final stage: List(Stage 0)

15/09/15 19:18:47 INFO DAGScheduler: Missing parents: List(Stage 0)

15/09/15 19:18:47 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at mapToPair at PatentCitation.java:19), which has no missing parents

15/09/15 19:18:47 INFO MemoryStore: ensureFreeSpace(3736) called with curMem=203835, maxMem=278302556

15/09/15 19:18:47 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.6 KB, free 265.2 MB)

15/09/15 19:18:47 INFO MemoryStore: ensureFreeSpace(2723) called with curMem=207571, maxMem=278302556

15/09/15 19:18:47 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 265.2 MB)

15/09/15 19:18:47 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:56529 (size: 2.7 KB, free: 265.4 MB)

15/09/15 19:18:47 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0

15/09/15 19:18:47 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839

15/09/15 19:18:47 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at mapToPair at PatentCitation.java:19)

15/09/15 19:18:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks

15/09/15 19:18:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 1389 bytes)

15/09/15 19:18:47 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

15/09/15 19:18:47 INFO Executor: Fetching http://192.168.112.132:33490/jars/patentcitation_spark.jar with timestamp 1442344726418

15/09/15 19:18:47 INFO Utils: Fetching http://192.168.112.132:33490/jars/patentcitation_spark.jar to /tmp/spark-d5e8a049-e4b6-45d9-b533-6e7c1295ed19/userFiles-2ba84816-9443-4d34-b8f4-360e6a69aeaf/fetchFileTemp4125688246233447293.tmp

15/09/15 19:18:47 INFO Executor: Adding file:/tmp/spark-d5e8a049-e4b6-45d9-b533-6e7c1295ed19/userFiles-2ba84816-9443-4d34-b8f4-360e6a69aeaf/patentcitation_spark.jar to class loader

15/09/15 19:18:47 INFO HadoopRDD: Input split: hdfs://sandbox:8020/user/captain/input/cite75_99.txt:0+134217728

15/09/15 19:18:47 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

15/09/15 19:18:47 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

15/09/15 19:18:47 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

15/09/15 19:18:47 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

15/09/15 19:18:47 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

15/09/15 19:18:49 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

15/09/15 19:18:51 INFO ExternalSorter: Thread 65 spilling in-memory map of 86.6 MB to disk (2 times so far)

15/09/15 19:18:53 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

15/09/15 19:18:55 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

15/09/15 19:18:57 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (5 times so far)

15/09/15 19:19:00 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.6 MB to disk (6 times so far)

15/09/15 19:19:02 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.1 MB to disk (7 times so far)

15/09/15 19:19:04 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (8 times so far)

15/09/15 19:19:06 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (9 times so far)

15/09/15 19:19:22 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2004 bytes result sent to driver

15/09/15 19:19:22 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, ANY, 1389 bytes)

15/09/15 19:19:22 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)

15/09/15 19:19:22 INFO HadoopRDD: Input split: hdfs://sandbox:8020/user/captain/input/cite75_99.txt:134217728+129857703

15/09/15 19:19:22 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 34712 ms on localhost (1/2)

15/09/15 19:19:23 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

15/09/15 19:19:25 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

15/09/15 19:19:28 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

15/09/15 19:19:31 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.1 MB to disk (4 times so far)

15/09/15 19:19:33 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (5 times so far)

15/09/15 19:19:35 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.1 MB to disk (6 times so far)

15/09/15 19:19:37 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (7 times so far)

15/09/15 19:19:40 INFO ExternalSorter: Thread 65 spilling in-memory map of 87.4 MB to disk (8 times so far)

15/09/15 19:19:42 INFO ExternalSorter: Thread 65 spilling in-memory map of 84.2 MB to disk (9 times so far)

15/09/15 19:19:53 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2004 bytes result sent to driver

15/09/15 19:19:53 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 30861 ms on localhost (2/2)

15/09/15 19:19:53 INFO DAGScheduler: Stage 0 (mapToPair at PatentCitation.java:19) finished in 65.576 s

15/09/15 19:19:53 INFO DAGScheduler: looking for newly runnable stages

15/09/15 19:19:53 INFO DAGScheduler: running: Set()

15/09/15 19:19:53 INFO DAGScheduler: waiting: Set(Stage 1)

15/09/15 19:19:53 INFO DAGScheduler: failed: Set()

15/09/15 19:19:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

15/09/15 19:19:53 INFO DAGScheduler: Missing parents for Stage 1: List()

15/09/15 19:19:53 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[5] at sortByKey at PatentCitation.java:28), which is now runnable

15/09/15 19:19:53 INFO MemoryStore: ensureFreeSpace(3312) called with curMem=210294, maxMem=278302556

15/09/15 19:19:53 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.2 KB, free 265.2 MB)

15/09/15 19:19:53 INFO MemoryStore: ensureFreeSpace(2378) called with curMem=213606, maxMem=278302556

15/09/15 19:19:53 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.3 KB, free 265.2 MB)

15/09/15 19:19:53 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:56529 (size: 2.3 KB, free: 265.4 MB)

15/09/15 19:19:53 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0

15/09/15 19:19:53 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839

15/09/15 19:19:53 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (MapPartitionsRDD[5] at sortByKey at PatentCitation.java:28)

15/09/15 19:19:53 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks

15/09/15 19:19:53 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 1124 bytes)

15/09/15 19:19:53 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)

15/09/15 19:19:53 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

15/09/15 19:19:53 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms

15/09/15 19:19:54 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

15/09/15 19:19:56 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.7 MB to disk (2 times so far)

15/09/15 19:19:59 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

15/09/15 19:20:01 INFO BlockManager: Removing broadcast 1

15/09/15 19:20:01 INFO BlockManager: Removing block broadcast_1_piece0

15/09/15 19:20:01 INFO MemoryStore: Block broadcast_1_piece0 of size 2723 dropped from memory (free 278089295)

15/09/15 19:20:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:56529 in memory (size: 2.7 KB, free: 265.4 MB)

15/09/15 19:20:01 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0

15/09/15 19:20:01 INFO BlockManager: Removing block broadcast_1

15/09/15 19:20:01 INFO MemoryStore: Block broadcast_1 of size 3736 dropped from memory (free 278093031)

15/09/15 19:20:01 INFO ContextCleaner: Cleaned broadcast 1

15/09/15 19:20:01 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

15/09/15 19:20:06 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1682 bytes result sent to driver

15/09/15 19:20:06 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1124 bytes)

15/09/15 19:20:06 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)

15/09/15 19:20:06 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

15/09/15 19:20:06 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

15/09/15 19:20:06 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 13527 ms on localhost (1/2)

15/09/15 19:20:07 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

15/09/15 19:20:10 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

15/09/15 19:20:12 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

15/09/15 19:20:19 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1685 bytes result sent to driver

15/09/15 19:20:19 INFO DAGScheduler: Stage 1 (sortByKey at PatentCitation.java:28) finished in 25.877 s

15/09/15 19:20:19 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 12352 ms on localhost (2/2)

15/09/15 19:20:19 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

15/09/15 19:20:19 INFO DAGScheduler: Job 0 finished: sortByKey at PatentCitation.java:28, took 91.526647 s

15/09/15 19:20:19 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

15/09/15 19:20:19 INFO SparkContext: Starting job: saveAsTextFile at PatentCitation.java:43

15/09/15 19:20:19 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 157 bytes

15/09/15 19:20:19 INFO DAGScheduler: Registering RDD 3 (reduceByKey at PatentCitation.java:28)

15/09/15 19:20:19 INFO DAGScheduler: Got job 1 (saveAsTextFile at PatentCitation.java:43) with 2 output partitions (allowLocal=false)

15/09/15 19:20:19 INFO DAGScheduler: Final stage: Stage 4(saveAsTextFile at PatentCitation.java:43)

15/09/15 19:20:19 INFO DAGScheduler: Parents of final stage: List(Stage 3)

15/09/15 19:20:19 INFO DAGScheduler: Missing parents: List(Stage 3)

15/09/15 19:20:19 INFO DAGScheduler: Submitting Stage 3 (ShuffledRDD[3] at reduceByKey at PatentCitation.java:28), which has no missing parents

15/09/15 19:20:19 INFO MemoryStore: ensureFreeSpace(3128) called with curMem=209525, maxMem=278302556

15/09/15 19:20:19 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.1 KB, free 265.2 MB)

15/09/15 19:20:19 INFO MemoryStore: ensureFreeSpace(2252) called with curMem=212653, maxMem=278302556

15/09/15 19:20:19 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.2 KB, free 265.2 MB)

15/09/15 19:20:19 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:56529 (size: 2.2 KB, free: 265.4 MB)

15/09/15 19:20:19 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0

15/09/15 19:20:19 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:839

15/09/15 19:20:19 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3 (ShuffledRDD[3] at reduceByKey at PatentCitation.java:28)

15/09/15 19:20:19 INFO TaskSchedulerImpl: Adding task set 3.0 with 2 tasks

15/09/15 19:20:19 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 4, localhost, PROCESS_LOCAL, 1113 bytes)

15/09/15 19:20:19 INFO Executor: Running task 0.0 in stage 3.0 (TID 4)

15/09/15 19:20:19 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

15/09/15 19:20:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

15/09/15 19:20:21 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

15/09/15 19:20:23 INFO BlockManager: Removing broadcast 2

15/09/15 19:20:23 INFO BlockManager: Removing block broadcast_2

15/09/15 19:20:23 INFO MemoryStore: Block broadcast_2 of size 3312 dropped from memory (free 278090963)

15/09/15 19:20:23 INFO BlockManager: Removing block broadcast_2_piece0

15/09/15 19:20:23 INFO MemoryStore: Block broadcast_2_piece0 of size 2378 dropped from memory (free 278093341)

15/09/15 19:20:23 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:56529 in memory (size: 2.3 KB, free: 265.4 MB)

15/09/15 19:20:23 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0

15/09/15 19:20:23 INFO ContextCleaner: Cleaned broadcast 2

15/09/15 19:20:23 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.7 MB to disk (2 times so far)

15/09/15 19:20:25 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

15/09/15 19:20:27 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

15/09/15 19:20:35 INFO Executor: Finished task 0.0 in stage 3.0 (TID 4). 1098 bytes result sent to driver

15/09/15 19:20:35 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 5, localhost, PROCESS_LOCAL, 1113 bytes)

15/09/15 19:20:35 INFO Executor: Running task 1.0 in stage 3.0 (TID 5)

15/09/15 19:20:35 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 4) in 16394 ms on localhost (1/2)

15/09/15 19:20:35 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

15/09/15 19:20:35 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

15/09/15 19:20:37 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

15/09/15 19:20:39 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

15/09/15 19:20:41 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

15/09/15 19:20:50 INFO Executor: Finished task 1.0 in stage 3.0 (TID 5). 1098 bytes result sent to driver

15/09/15 19:20:50 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 5) in 14507 ms on localhost (2/2)

15/09/15 19:20:50 INFO DAGScheduler: Stage 3 (reduceByKey at PatentCitation.java:28) finished in 30.901 s

15/09/15 19:20:50 INFO DAGScheduler: looking for newly runnable stages

15/09/15 19:20:50 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool

15/09/15 19:20:50 INFO DAGScheduler: running: Set()

15/09/15 19:20:50 INFO DAGScheduler: waiting: Set(Stage 4)

15/09/15 19:20:50 INFO DAGScheduler: failed: Set()

15/09/15 19:20:50 INFO DAGScheduler: Missing parents for Stage 4: List()

15/09/15 19:20:50 INFO DAGScheduler: Submitting Stage 4 (MapPartitionsRDD[8] at saveAsTextFile at PatentCitation.java:43), which is now runnable

15/09/15 19:20:50 INFO MemoryStore: ensureFreeSpace(152496) called with curMem=209215, maxMem=278302556

15/09/15 19:20:50 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 148.9 KB, free 265.1 MB)

15/09/15 19:20:50 INFO MemoryStore: ensureFreeSpace(97600) called with curMem=361711, maxMem=278302556

15/09/15 19:20:50 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 95.3 KB, free 265.0 MB)

15/09/15 19:20:50 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:56529 (size: 95.3 KB, free: 265.3 MB)

15/09/15 19:20:50 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0

15/09/15 19:20:50 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:839

15/09/15 19:20:50 INFO DAGScheduler: Submitting 2 missing tasks from Stage 4 (MapPartitionsRDD[8] at saveAsTextFile at PatentCitation.java:43)

15/09/15 19:20:50 INFO TaskSchedulerImpl: Adding task set 4.0 with 2 tasks

15/09/15 19:20:50 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 6, localhost, PROCESS_LOCAL, 1124 bytes)

15/09/15 19:20:50 INFO Executor: Running task 0.0 in stage 4.0 (TID 6)

15/09/15 19:20:50 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

15/09/15 19:20:50 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

15/09/15 19:20:51 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

15/09/15 19:20:52 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.4 MB to disk (2 times so far)

15/09/15 19:20:53 INFO ExternalSorter: Thread 65 spilling in-memory map of 84.2 MB to disk (3 times so far)

15/09/15 19:20:54 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

15/09/15 19:20:55 INFO BlockManager: Removing broadcast 3

15/09/15 19:20:55 INFO BlockManager: Removing block broadcast_3_piece0

15/09/15 19:20:55 INFO MemoryStore: Block broadcast_3_piece0 of size 2252 dropped from memory (free 277845497)

15/09/15 19:20:55 INFO BlockManagerInfo: Removed broadcast_3_piece0 on localhost:56529 in memory (size: 2.2 KB, free: 265.3 MB)

15/09/15 19:20:55 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0

15/09/15 19:20:55 INFO BlockManager: Removing block broadcast_3

15/09/15 19:20:55 INFO MemoryStore: Block broadcast_3 of size 3128 dropped from memory (free 277848625)

15/09/15 19:20:55 INFO ContextCleaner: Cleaned broadcast 3

15/09/15 19:20:57 INFO FileOutputCommitter: Saved output of task ‘attempt_201509151920_0004_m_000000_6’ to hdfs://sandbox:8020/user/captain/output/_temporary/0/task_201509151920_0004_m_000000

15/09/15 19:20:57 INFO SparkHadoopMapRedUtil: attempt_201509151920_0004_m_000000_6: Committed

15/09/15 19:20:57 INFO Executor: Finished task 0.0 in stage 4.0 (TID 6). 1828 bytes result sent to driver

15/09/15 19:20:57 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 7, localhost, PROCESS_LOCAL, 1124 bytes)

15/09/15 19:20:57 INFO Executor: Running task 1.0 in stage 4.0 (TID 7)

15/09/15 19:20:57 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 6) in 7135 ms on localhost (1/2)

15/09/15 19:20:57 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

15/09/15 19:20:57 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

15/09/15 19:20:58 INFO ExternalSorter: Thread 65 spilling in-memory map of 84.8 MB to disk (1 time so far)

15/09/15 19:20:58 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

15/09/15 19:20:59 INFO ExternalSorter: Thread 65 spilling in-memory map of 87.6 MB to disk (3 times so far)

15/09/15 19:21:01 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

15/09/15 19:21:02 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

15/09/15 19:21:05 INFO FileOutputCommitter: Saved output of task ‘attempt_201509151920_0004_m_000001_7’ to hdfs://sandbox:8020/user/captain/output/_temporary/0/task_201509151920_0004_m_000001

15/09/15 19:21:05 INFO SparkHadoopMapRedUtil: attempt_201509151920_0004_m_000001_7: Committed

15/09/15 19:21:05 INFO Executor: Finished task 1.0 in stage 4.0 (TID 7). 1828 bytes result sent to driver

15/09/15 19:21:05 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 7) in 8171 ms on localhost (2/2)

15/09/15 19:21:05 INFO DAGScheduler: Stage 4 (saveAsTextFile at PatentCitation.java:43) finished in 15.306 s

15/09/15 19:21:05 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool

15/09/15 19:21:05 INFO DAGScheduler: Job 1 finished: saveAsTextFile at PatentCitation.java:43, took 46.343546 s

 

 

Results

[sandbox spark]# vi part-00000

“CITED” “CITING”

1 3964859,4647229

10000 4539112

100000 5031388

1000006 4714284

1000007 4766693

1000011 5033339

1000017 3908629

1000026 4043055

1000033 4975983,4190903

1000043 4091523

1000044 4055371,4082383

1000045 4290571

1000046 5918892,5525001

1000049 5996916

1000051 4541310

1000054 4946631

1000065 4748968

1000067 5312208,4944640,5071294

1000070 4928425,5009029

1000073 5474494,4107819

1000076 5845593,4867716

1000083 5322091,5566726

1000084 4683770,4182197

1000086 4217220,4686189,4178246,4839046

1000089 5277853,5571464,5807591,5395228,5503546,5505607,5505610,5505611,5540869,5544405

1000094 4897975,4920718,5713167

10001 4228735

1000102 5120183,5791855

1000108 4399627

1000118 5029926,5058767,4919466

100012 4641675,4838290

1000120 4546821

1000122 4048703,4297867

1000125 4573823

1000129 4748545

1000133 5809659,4922621

1000134 4977842,4175499,4373460

100014 5179741

1000145 3961423

1000149 5876147,5193932

1000150 4217940,4271874

1000152 3962981,4032016,4269599

1000168 4479408

1000172 4865014,4974551,4653469

1000173 5303740,5303742,4312704,4346730

1000178 4830222

1000179 5182923

1000181 5185948

1000185 4082391,3871143,5793021

1000187 4470573

1000188 4134380,4023548

1000193 5092745

1000195 4922643

1000199 4166638,4846484,4664399

1000201 4697655

1000203 4541584