Choose color scheme

Monthly Archives: September 2015

  • Strata Hadoop NYC – Day 1 Highlights

    Strata Hadoop NYC – Day 1 Highlights

    Big Data ecosystem undergoing a slow and steady pricing collapse

    Big Data is not a product; it’s an ecosystem, and “the ecosystem is really just one big mess,” announced Dave Vellante, cohost of theCUBE, from the SiliconANGLE Media team. With Big Data infrastructure software under going a “slow and steady collapse,” Vellante said that there is an “interesting rift going on, lots of jockeying for position.”

    “The technology under the hood is changing,” added John Furrier, who joined Vellante and Wikibon’s George Gilbert, cohosts of theCUBE, at BigDataNYC 2015 during Strata + Hadoop World. There are big questions about what is going on in Big Data, and conference presenters Cloudera, Inc. and O’Reilly Media, Inc. are at the center.
    Read more at http://siliconangle.com/blog/2015/09/29/big-data-ecosystem-undergoing-a-slow-and-steady-collapse-bigdatanyc/

    Machine Learning With Scikit-Learn – Pydata

    IBM:Difference between Hadoop and Spark

    https://www.ibm.com/developerworks/community/blogs/f0f3cd83-63c2-4744-9021-9ff31e7004a9/entry/What_s_the_Difference_Between_Apache_Hadoop_and_Apache_Spark?lang=en

    Microsoft makes Hadoop on Linux GA – Azure

    https://azure.microsoft.com/en-us/blog/azure-hdinsight-makes-linux-generally-available-at-strata/?Ocid=C+E%20Social%20FY16_Social_TW_Azure_20150929_243236269

    What good is an in-memory hadoop json database?

    http://www.cmswire.com/big-data/what-good-is-an-in-hadoop-json-database-stratahadoop/?utm_source=cmswire.com&utm_medium=rss&utm_campaign=cm&utm_content=main-rss

    Hortonworks Whitepaper: start saving with Hadoop

    http://hortonworks.com/info/hadoop-enterprise-data-warehouse/?utm_source=twitter&utm_medium=social-ad&utm_campaign=mda

    Hortonworks HDP with ECS – Configuration and Best Practices Guide

    https://community.emc.com/docs/DOC-48986

    HP: Data Protection

    https://pbs.twimg.com/media/CQGexEVWwAA4tpd.pngData Protection

    Press Releases

    Aerospike Advances Real-Time Decisioning for Spark and Hadoop Users

    http://www.aerospike.com/press-releases/aerospike-advances-real-time-decisioning-for-spark-and-hadoop-users/

    Booth Pictures

    Confluent Inc

    Confluent Inc

    Speaker Slides

    http://strataconf.com/big-data-conference-ny-2015/public/schedule/proceedings

  • Apache Big Data Budapest – Day 2

    Apache Big Data Budapest – Day 2

    bigdataeurope

    Important highlights of day 2 of the Apache Big Data Europe 2015 in Budapest.

    Hadoop’s next mission is to be more business friendly

    ​Hortonworks co-founder and architect Arun Murthy talked about the next decade for Hadoop and how it needs to become more business process oriented.
    http://www.zdnet.com/article/hortonworks-murthy-hadoops-next-mission-is-to-be-more-business-friendly/

    Big Intelligence: BI Meets Big Data, with Apache Drill

    Welcome to a whole new world of data exploration—a world where SQL specialists are now first class citizens and no longer have to wait for weeks/months before they can access new datasets; a world where IT does not have to be a bottleneck in preparing and maintaining schemas for the BI user; a world where data scientists are free to follow the information trail wherever it may lead them.
    http://www.smartdatacollective.com/kingmesal/346069/big-intelligence-bi-meets-big-data-apache-drill

    Being Ready for Apache Kafka

    Architecture of Flink’s Streaming Runtime

    Apache Phoenix

    Apache Kylin

    Fraud Detection in Real-time

    Who is using Apache Kafka?

    Picture for who is using Apache Kafka?

    Information on Apache Hawq

    Apache Hawq Resources

    Features of Apache Falcon by Hortonworks

    Apache Falcon features

    Apache Geode

    Is a distributed in-memory database with strong consistency…
    What is Apache Geode?

    Day 1 Highlights

    http://blog.hampisoftware.com/?p=62

  • Apache Big Data Budapest : Day 1

    Apache Big Data Europe 2015 – Budapest : Day 1

    bigdataeurope

    Some of the interesting things from the day 1 of Apache Big Data at Budapest.

    ODPI, a collaborative project of the Linux Foundation

    ODPi speeds delivery of enterprise #BigData apps, hosted by Linux Foundation.
    http://www.linuxfoundation.org/news-media/announcements/2015/09/odpi-doubles-membership-announces-technical-milestones-and-open

    Apache Zeppelin

    A web-based notebook that enables interactive data analytics.
    You can make beautiful data-driven, interactive and collaborative documents with SQL, Scala and more.
    https://zeppelin.incubator.apache.org/

    zeppelin

    Apache Stream Comparison by Matt Caldwell

    streamcomp

    Geospatial Querying in Apache Marmotta

    What is Apache Marmotta? ● An Open Platform for Linked Data an open implementation of a Linked Data Platform that can be easily used, extended and deployed by organizations who want to publish Linked Data or build custom applications on Linked Data. ● Key features: ○ Read-Write Linked Data server ○ RDF triple store with transactions, versioning and rule-base reasoning ○ LDP, SPARQL and LDPath query ○ Transparent Linked Data Caching ○ Integrated basic security mechanisms ● Visit http://marmotta.apache.org/ for further details and documentation.

    Large-Scale Stream Processing in the Hadoop Ecosystem

    Apache HBase

  • Advanced Example: Spark Action with OOzie

    Advanced Example: Spark Action with OOzie

    In this post, we will look at running a Spark job using Apache OOzie.

    For background, you should read up on my earlier post on Patent Citation.

    SparkAction on OOzie

    As of OOzie 4.2, there is an action for running Spark jobs.

    The workflow.xml is going to look as follows.

    <workflow-app xmlns='uri:oozie:workflow:0.2' name='oozie-java-spark-wf'>
       <start to='java-spark' />
    
       <action name='java-spark'>
        <spark xmlns="uri:oozie:spark-action:0.1">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <prepare>
                    <delete path="${jobOutput}"/>
                </prepare>
                <configuration>
                    <property>
                        <name>mapred.compress.map.output</name>
                        <value>true</value>
                    </property>
                </configuration>
                <master>local</master>
                <name>Spark Patent Citation</name>
                <class>spark.PatentCitation</class>
                <jar>${nameNode}/user/root/oozie/spark-patent/lib/patentcitation_spark.jar</jar>
                <spark-opts>--executor-memory 1G --num-executors 10</spark-opts>
                <arg>${nameNode}/user/captain/input/cite75_99.txt</arg>
                <arg>${nameNode}/user/captain/output</arg>
    </spark>
    
    
        <ok to="end"/>
        <error to="fail"/>
        </action>
    
        <kill name="fail">
          <message>Spark Java PatentCitation failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
        </kill>
        <end name="end"/>
    </workflow-app>
    
    

    OOzie Workflow Properties

    The job.properties is as follows:

    
    nameNode=hdfs://sandbox:8020
    jobTracker=sandbox:8050
    master=local[*]
    appRoot=spark-patent
    jobOutput=/user/captain/output
    oozie.wf.application.path=/user/root/oozie/spark-patent
    oozie.use.system.libpath=true
    

    Ensure that you save the workflow.xml in hdfs under the location

    /user/root/oozie/spark-parent

    The patentcitation_spark.jar goes into the lib directory in hdfs.

    How do you start OOzie job?
    oozie job -oozie htp://localhost:11000/oozie -config job.properties -run

    Note: job.properties should be in local directory and not on hdfs.

    The cite75_99.txt input file goes in hdfs directory /user/captain/input

    Once the oozie job has finished, you will find the output in
    /user/captain/output/part-0000

    If you have any questions, do not hesitate to ask.

  • Advanced Example: Hadoop Map Reduce and Apache Spark

    Advanced Example: Hadoop Map Reduce and Apache Spark

    Objective

    Solve an advanced problem using Hadoop Map Reduce and demonstrate the same problem using Apache Spark.

     

    Hadoop Map Reduce Program

    http://blog.hampisoftware.com/?p=20

     

    Apache Spark based solution

    http://blog.hampisoftware.com/?p=41

     

    Differences between the two solutions

    Boilerplate code

    In the map reduce code, you have to write a mapper class and a reducer class.

    In Spark, you do not have to write a lot of boilerplate code.  If you use Java 8, then you can use Lambda expressions to cut the boilerplate code further.

    In Spark, there is no need for a mapper class or a reducer class. You can perform transformations such as map, flatMap, mapToPair, reduceByKey etc on a RDD.

     

    Performance

    The Spark based solution is very fast compared to the map reduce solution. Around 45secs for 16 million records in a virtual machine sandbox.

     

    Scala

    If you write Scala code, then the Apache Spark code can be further simplified into few lines of code. In this post, we have solved the patent citation problem using Java.

  • Advanced Spark Java Example: Patent Citation

    Background

    If you have not reviewed the earlier post on Hadoop Map Reduce example with Patent Citation, now is the time. That post will provide the problem context.

    In this post, we will use Apache Spark to solve the reverse patent citation problem.

    Environment Setup

    Just like the map reduce exercise we had, use the latest sandbox from one of the Hadoop vendors such as Hortonworks or Cloudera or MapR. They should have Spark integrated.

    Add the input file into HDFS as described in the map reduce post. The file should be placed in an hdfs directory called as /user/captain/input.

    Program in Apache Spark using Java

    package spark;
    import org.apache.spark.SparkConf; 
    import  org.apache.spark.api.java.JavaPairRDD; 
    import  org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext; 
    import org.apache.spark.api.java.function.Function; 
    import org.apache.spark.api.java.function.Function2; 
    import  org.apache.spark.api.java.function.PairFunction; 
    import scala.Tuple2; 
    
    public class  PatentCitation { 
    
      public static void  main(String[] args){ 
        String appName =  "Patent Citation" ;      
        SparkConf sparkConf =  new  SparkConf().setAppName(appName);  
        JavaSparkContext javaSparkContext =  new   JavaSparkContext((sparkConf));    
    
        JavaRDD<String> patentCitationFile = javaSparkContext.textFile(args[ 0 ]);     
    
        //Create a mapper to do reverse key value pair
        JavaPairRDD<String,String> lines = patentCitationFile.mapToPair( new  PairFunction<String, String, String>() { 
       @Override 
       public Tuple2<String, String> call(String s) throws Exception {           
         String[] pair = s.split( "," ); 
         return new  Tuple2<String, String>(pair[ 1 ], pair[ 0 ]); 
    } }); 
    
        //Let us reduce this 
        JavaPairRDD<String,String> cited = lines.reduceByKey( new Function2<String, String, String>() {           
        @Override 
        public  String call(String s1, String s2)  throws  Exception { 
         return s1 +  ","  + s2; 
       } }).sortByKey(); 
       
       //Format for storing 
       JavaRDD<String> formatted = cited.map( new  Function<Tuple2<String,String>, String>() 
      {
    
        @Override 
        public String call(Tuple2<String, String> theTuple) throws  Exception { 
           return  theTuple._1() +  " "  + theTuple._2();
            } 
          });
         //Store the formatted RDD in the text file</span></pre>
         formatted.saveAsTextFile(args[1]);
     }
    }
    
    

    Description of the program

    In the first step, we create a JavaPairRDD to read the input file – line by line. We split at a comma. So we are provided with 2 string variables. The first variable represents the Citing Patent and the second variable represents the Cited Patent.

    Since our problem is to find the reverse. All the citing patents for a particular patent, we create a tuple that is the reverse.

    In the second step, we reduce the RDD by key. Now the RDD gets two string values for a key. We just separate these values with a comma.

    Now our RDD is ready for storage. The challenge is that storing the tuple in a file will yield parentheses at the beginning and end of the line.  So we run another transformation on the RDD – format.

    Finally we store the RDD in a file.

     

    Program Run

    [sandbox spark]# spark/bin/spark-submit –class spark.PatentCitation –master local –executor-memory 1G –num-executors 10 patentcitation_spark.jar /user/captain/input/cite75_99.txt /user/captain/output

     

    15/09/15 19:18:45 INFO SparkContext: Running Spark version 1.3.1

    15/09/15 19:18:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable

    15/09/15 19:18:45 INFO SecurityManager: Changing view acls to: root

    15/09/15 19:18:45 INFO SecurityManager: Changing modify acls to: root

    15/09/15 19:18:45 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)

    15/09/15 19:18:45 INFO Slf4jLogger: Slf4jLogger started

    15/09/15 19:18:45 INFO Remoting: Starting remoting

    15/09/15 19:18:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@sandbox:36543]

    15/09/15 19:18:46 INFO Utils: Successfully started service ‘sparkDriver’ on port 36543.

    15/09/15 19:18:46 INFO SparkEnv: Registering MapOutputTracker

    15/09/15 19:18:46 INFO SparkEnv: Registering BlockManagerMaster

    15/09/15 19:18:46 INFO DiskBlockManager: Created local directory at /tmp/spark-c1f7c70b-349d-4984-b6bb-544080fc4d07/blockmgr-bc783dc8-c929-422c-bbae-1759e45dc59e

    15/09/15 19:18:46 INFO MemoryStore: MemoryStore started with capacity 265.4 MB

    15/09/15 19:18:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-e0895997-d0e1-4dfe-9fe6-01df51e9a65c/httpd-6a121c63-3f76-47cd-b083-77d722fd95bf

    15/09/15 19:18:46 INFO HttpServer: Starting HTTP Server

    15/09/15 19:18:46 INFO Server: jetty-8.y.z-SNAPSHOT

    15/09/15 19:18:46 INFO AbstractConnector: Started SocketConnector@0.0.0.0:33490

    15/09/15 19:18:46 INFO Utils: Successfully started service ‘HTTP file server’ on port 33490.

    15/09/15 19:18:46 INFO SparkEnv: Registering OutputCommitCoordinator

    15/09/15 19:18:46 INFO Server: jetty-8.y.z-SNAPSHOT

    15/09/15 19:18:46 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040

    15/09/15 19:18:46 INFO Utils: Successfully started service ‘SparkUI’ on port 4040.

    15/09/15 19:18:46 INFO SparkUI: Started SparkUI at http://sandbox:4040

    15/09/15 19:18:46 INFO SparkContext: Added JAR file:/root/anil/spark/patentcitation_spark.jar at http://192.168.112.132:33490/jars/patentcitation_spark.jar with timestamp 1442344726418

    15/09/15 19:18:46 INFO Executor: Starting executor ID <driver> on host localhost

    15/09/15 19:18:46 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@sandbox:36543/user/HeartbeatReceiver

    15/09/15 19:18:46 INFO NettyBlockTransferService: Server created on 56529

    15/09/15 19:18:46 INFO BlockManagerMaster: Trying to register BlockManager

    15/09/15 19:18:46 INFO BlockManagerMasterActor: Registering block manager localhost:56529 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 56529)

    15/09/15 19:18:46 INFO BlockManagerMaster: Registered BlockManager

    15/09/15 19:18:46 INFO MemoryStore: ensureFreeSpace(169818) called with curMem=0, maxMem=278302556

    15/09/15 19:18:46 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 165.8 KB, free 265.2 MB)

    15/09/15 19:18:47 INFO MemoryStore: ensureFreeSpace(34017) called with curMem=169818, maxMem=278302556

    15/09/15 19:18:47 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 33.2 KB, free 265.2 MB)

    15/09/15 19:18:47 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:56529 (size: 33.2 KB, free: 265.4 MB)

    15/09/15 19:18:47 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0

    15/09/15 19:18:47 INFO SparkContext: Created broadcast 0 from textFile at PatentCitation.java:18

    15/09/15 19:18:47 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.

    15/09/15 19:18:47 INFO FileInputFormat: Total input paths to process : 1

    15/09/15 19:18:47 INFO SparkContext: Starting job: sortByKey at PatentCitation.java:28

    15/09/15 19:18:47 INFO DAGScheduler: Registering RDD 2 (mapToPair at PatentCitation.java:19)

    15/09/15 19:18:47 INFO DAGScheduler: Got job 0 (sortByKey at PatentCitation.java:28) with 2 output partitions (allowLocal=false)

    15/09/15 19:18:47 INFO DAGScheduler: Final stage: Stage 1(sortByKey at PatentCitation.java:28)

    15/09/15 19:18:47 INFO DAGScheduler: Parents of final stage: List(Stage 0)

    15/09/15 19:18:47 INFO DAGScheduler: Missing parents: List(Stage 0)

    15/09/15 19:18:47 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at mapToPair at PatentCitation.java:19), which has no missing parents

    15/09/15 19:18:47 INFO MemoryStore: ensureFreeSpace(3736) called with curMem=203835, maxMem=278302556

    15/09/15 19:18:47 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.6 KB, free 265.2 MB)

    15/09/15 19:18:47 INFO MemoryStore: ensureFreeSpace(2723) called with curMem=207571, maxMem=278302556

    15/09/15 19:18:47 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB, free 265.2 MB)

    15/09/15 19:18:47 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:56529 (size: 2.7 KB, free: 265.4 MB)

    15/09/15 19:18:47 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0

    15/09/15 19:18:47 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:839

    15/09/15 19:18:47 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at mapToPair at PatentCitation.java:19)

    15/09/15 19:18:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks

    15/09/15 19:18:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 1389 bytes)

    15/09/15 19:18:47 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

    15/09/15 19:18:47 INFO Executor: Fetching http://192.168.112.132:33490/jars/patentcitation_spark.jar with timestamp 1442344726418

    15/09/15 19:18:47 INFO Utils: Fetching http://192.168.112.132:33490/jars/patentcitation_spark.jar to /tmp/spark-d5e8a049-e4b6-45d9-b533-6e7c1295ed19/userFiles-2ba84816-9443-4d34-b8f4-360e6a69aeaf/fetchFileTemp4125688246233447293.tmp

    15/09/15 19:18:47 INFO Executor: Adding file:/tmp/spark-d5e8a049-e4b6-45d9-b533-6e7c1295ed19/userFiles-2ba84816-9443-4d34-b8f4-360e6a69aeaf/patentcitation_spark.jar to class loader

    15/09/15 19:18:47 INFO HadoopRDD: Input split: hdfs://sandbox:8020/user/captain/input/cite75_99.txt:0+134217728

    15/09/15 19:18:47 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

    15/09/15 19:18:47 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

    15/09/15 19:18:47 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

    15/09/15 19:18:47 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

    15/09/15 19:18:47 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

    15/09/15 19:18:49 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:18:51 INFO ExternalSorter: Thread 65 spilling in-memory map of 86.6 MB to disk (2 times so far)

    15/09/15 19:18:53 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:18:55 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

    15/09/15 19:18:57 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (5 times so far)

    15/09/15 19:19:00 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.6 MB to disk (6 times so far)

    15/09/15 19:19:02 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.1 MB to disk (7 times so far)

    15/09/15 19:19:04 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (8 times so far)

    15/09/15 19:19:06 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (9 times so far)

    15/09/15 19:19:22 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2004 bytes result sent to driver

    15/09/15 19:19:22 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, ANY, 1389 bytes)

    15/09/15 19:19:22 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)

    15/09/15 19:19:22 INFO HadoopRDD: Input split: hdfs://sandbox:8020/user/captain/input/cite75_99.txt:134217728+129857703

    15/09/15 19:19:22 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 34712 ms on localhost (1/2)

    15/09/15 19:19:23 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:19:25 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

    15/09/15 19:19:28 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:19:31 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.1 MB to disk (4 times so far)

    15/09/15 19:19:33 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (5 times so far)

    15/09/15 19:19:35 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.1 MB to disk (6 times so far)

    15/09/15 19:19:37 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (7 times so far)

    15/09/15 19:19:40 INFO ExternalSorter: Thread 65 spilling in-memory map of 87.4 MB to disk (8 times so far)

    15/09/15 19:19:42 INFO ExternalSorter: Thread 65 spilling in-memory map of 84.2 MB to disk (9 times so far)

    15/09/15 19:19:53 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2004 bytes result sent to driver

    15/09/15 19:19:53 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 30861 ms on localhost (2/2)

    15/09/15 19:19:53 INFO DAGScheduler: Stage 0 (mapToPair at PatentCitation.java:19) finished in 65.576 s

    15/09/15 19:19:53 INFO DAGScheduler: looking for newly runnable stages

    15/09/15 19:19:53 INFO DAGScheduler: running: Set()

    15/09/15 19:19:53 INFO DAGScheduler: waiting: Set(Stage 1)

    15/09/15 19:19:53 INFO DAGScheduler: failed: Set()

    15/09/15 19:19:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

    15/09/15 19:19:53 INFO DAGScheduler: Missing parents for Stage 1: List()

    15/09/15 19:19:53 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[5] at sortByKey at PatentCitation.java:28), which is now runnable

    15/09/15 19:19:53 INFO MemoryStore: ensureFreeSpace(3312) called with curMem=210294, maxMem=278302556

    15/09/15 19:19:53 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.2 KB, free 265.2 MB)

    15/09/15 19:19:53 INFO MemoryStore: ensureFreeSpace(2378) called with curMem=213606, maxMem=278302556

    15/09/15 19:19:53 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.3 KB, free 265.2 MB)

    15/09/15 19:19:53 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:56529 (size: 2.3 KB, free: 265.4 MB)

    15/09/15 19:19:53 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0

    15/09/15 19:19:53 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839

    15/09/15 19:19:53 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (MapPartitionsRDD[5] at sortByKey at PatentCitation.java:28)

    15/09/15 19:19:53 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks

    15/09/15 19:19:53 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 1124 bytes)

    15/09/15 19:19:53 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)

    15/09/15 19:19:53 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:19:53 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms

    15/09/15 19:19:54 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:19:56 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.7 MB to disk (2 times so far)

    15/09/15 19:19:59 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:20:01 INFO BlockManager: Removing broadcast 1

    15/09/15 19:20:01 INFO BlockManager: Removing block broadcast_1_piece0

    15/09/15 19:20:01 INFO MemoryStore: Block broadcast_1_piece0 of size 2723 dropped from memory (free 278089295)

    15/09/15 19:20:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:56529 in memory (size: 2.7 KB, free: 265.4 MB)

    15/09/15 19:20:01 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0

    15/09/15 19:20:01 INFO BlockManager: Removing block broadcast_1

    15/09/15 19:20:01 INFO MemoryStore: Block broadcast_1 of size 3736 dropped from memory (free 278093031)

    15/09/15 19:20:01 INFO ContextCleaner: Cleaned broadcast 1

    15/09/15 19:20:01 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

    15/09/15 19:20:06 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1682 bytes result sent to driver

    15/09/15 19:20:06 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1124 bytes)

    15/09/15 19:20:06 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)

    15/09/15 19:20:06 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:20:06 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

    15/09/15 19:20:06 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 13527 ms on localhost (1/2)

    15/09/15 19:20:07 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:20:10 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

    15/09/15 19:20:12 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:20:19 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1685 bytes result sent to driver

    15/09/15 19:20:19 INFO DAGScheduler: Stage 1 (sortByKey at PatentCitation.java:28) finished in 25.877 s

    15/09/15 19:20:19 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 12352 ms on localhost (2/2)

    15/09/15 19:20:19 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

    15/09/15 19:20:19 INFO DAGScheduler: Job 0 finished: sortByKey at PatentCitation.java:28, took 91.526647 s

    15/09/15 19:20:19 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

    15/09/15 19:20:19 INFO SparkContext: Starting job: saveAsTextFile at PatentCitation.java:43

    15/09/15 19:20:19 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 157 bytes

    15/09/15 19:20:19 INFO DAGScheduler: Registering RDD 3 (reduceByKey at PatentCitation.java:28)

    15/09/15 19:20:19 INFO DAGScheduler: Got job 1 (saveAsTextFile at PatentCitation.java:43) with 2 output partitions (allowLocal=false)

    15/09/15 19:20:19 INFO DAGScheduler: Final stage: Stage 4(saveAsTextFile at PatentCitation.java:43)

    15/09/15 19:20:19 INFO DAGScheduler: Parents of final stage: List(Stage 3)

    15/09/15 19:20:19 INFO DAGScheduler: Missing parents: List(Stage 3)

    15/09/15 19:20:19 INFO DAGScheduler: Submitting Stage 3 (ShuffledRDD[3] at reduceByKey at PatentCitation.java:28), which has no missing parents

    15/09/15 19:20:19 INFO MemoryStore: ensureFreeSpace(3128) called with curMem=209525, maxMem=278302556

    15/09/15 19:20:19 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.1 KB, free 265.2 MB)

    15/09/15 19:20:19 INFO MemoryStore: ensureFreeSpace(2252) called with curMem=212653, maxMem=278302556

    15/09/15 19:20:19 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.2 KB, free 265.2 MB)

    15/09/15 19:20:19 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:56529 (size: 2.2 KB, free: 265.4 MB)

    15/09/15 19:20:19 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0

    15/09/15 19:20:19 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:839

    15/09/15 19:20:19 INFO DAGScheduler: Submitting 2 missing tasks from Stage 3 (ShuffledRDD[3] at reduceByKey at PatentCitation.java:28)

    15/09/15 19:20:19 INFO TaskSchedulerImpl: Adding task set 3.0 with 2 tasks

    15/09/15 19:20:19 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 4, localhost, PROCESS_LOCAL, 1113 bytes)

    15/09/15 19:20:19 INFO Executor: Running task 0.0 in stage 3.0 (TID 4)

    15/09/15 19:20:19 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:20:19 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

    15/09/15 19:20:21 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:20:23 INFO BlockManager: Removing broadcast 2

    15/09/15 19:20:23 INFO BlockManager: Removing block broadcast_2

    15/09/15 19:20:23 INFO MemoryStore: Block broadcast_2 of size 3312 dropped from memory (free 278090963)

    15/09/15 19:20:23 INFO BlockManager: Removing block broadcast_2_piece0

    15/09/15 19:20:23 INFO MemoryStore: Block broadcast_2_piece0 of size 2378 dropped from memory (free 278093341)

    15/09/15 19:20:23 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:56529 in memory (size: 2.3 KB, free: 265.4 MB)

    15/09/15 19:20:23 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0

    15/09/15 19:20:23 INFO ContextCleaner: Cleaned broadcast 2

    15/09/15 19:20:23 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.7 MB to disk (2 times so far)

    15/09/15 19:20:25 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:20:27 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

    15/09/15 19:20:35 INFO Executor: Finished task 0.0 in stage 3.0 (TID 4). 1098 bytes result sent to driver

    15/09/15 19:20:35 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 5, localhost, PROCESS_LOCAL, 1113 bytes)

    15/09/15 19:20:35 INFO Executor: Running task 1.0 in stage 3.0 (TID 5)

    15/09/15 19:20:35 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 4) in 16394 ms on localhost (1/2)

    15/09/15 19:20:35 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:20:35 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

    15/09/15 19:20:37 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:20:39 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

    15/09/15 19:20:41 INFO ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 83.6 MB to disk (3 times so far)

    15/09/15 19:20:50 INFO Executor: Finished task 1.0 in stage 3.0 (TID 5). 1098 bytes result sent to driver

    15/09/15 19:20:50 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 5) in 14507 ms on localhost (2/2)

    15/09/15 19:20:50 INFO DAGScheduler: Stage 3 (reduceByKey at PatentCitation.java:28) finished in 30.901 s

    15/09/15 19:20:50 INFO DAGScheduler: looking for newly runnable stages

    15/09/15 19:20:50 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool

    15/09/15 19:20:50 INFO DAGScheduler: running: Set()

    15/09/15 19:20:50 INFO DAGScheduler: waiting: Set(Stage 4)

    15/09/15 19:20:50 INFO DAGScheduler: failed: Set()

    15/09/15 19:20:50 INFO DAGScheduler: Missing parents for Stage 4: List()

    15/09/15 19:20:50 INFO DAGScheduler: Submitting Stage 4 (MapPartitionsRDD[8] at saveAsTextFile at PatentCitation.java:43), which is now runnable

    15/09/15 19:20:50 INFO MemoryStore: ensureFreeSpace(152496) called with curMem=209215, maxMem=278302556

    15/09/15 19:20:50 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 148.9 KB, free 265.1 MB)

    15/09/15 19:20:50 INFO MemoryStore: ensureFreeSpace(97600) called with curMem=361711, maxMem=278302556

    15/09/15 19:20:50 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 95.3 KB, free 265.0 MB)

    15/09/15 19:20:50 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:56529 (size: 95.3 KB, free: 265.3 MB)

    15/09/15 19:20:50 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0

    15/09/15 19:20:50 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:839

    15/09/15 19:20:50 INFO DAGScheduler: Submitting 2 missing tasks from Stage 4 (MapPartitionsRDD[8] at saveAsTextFile at PatentCitation.java:43)

    15/09/15 19:20:50 INFO TaskSchedulerImpl: Adding task set 4.0 with 2 tasks

    15/09/15 19:20:50 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 6, localhost, PROCESS_LOCAL, 1124 bytes)

    15/09/15 19:20:50 INFO Executor: Running task 0.0 in stage 4.0 (TID 6)

    15/09/15 19:20:50 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:20:50 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

    15/09/15 19:20:51 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (1 time so far)

    15/09/15 19:20:52 INFO ExternalSorter: Thread 65 spilling in-memory map of 85.4 MB to disk (2 times so far)

    15/09/15 19:20:53 INFO ExternalSorter: Thread 65 spilling in-memory map of 84.2 MB to disk (3 times so far)

    15/09/15 19:20:54 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

    15/09/15 19:20:55 INFO BlockManager: Removing broadcast 3

    15/09/15 19:20:55 INFO BlockManager: Removing block broadcast_3_piece0

    15/09/15 19:20:55 INFO MemoryStore: Block broadcast_3_piece0 of size 2252 dropped from memory (free 277845497)

    15/09/15 19:20:55 INFO BlockManagerInfo: Removed broadcast_3_piece0 on localhost:56529 in memory (size: 2.2 KB, free: 265.3 MB)

    15/09/15 19:20:55 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0

    15/09/15 19:20:55 INFO BlockManager: Removing block broadcast_3

    15/09/15 19:20:55 INFO MemoryStore: Block broadcast_3 of size 3128 dropped from memory (free 277848625)

    15/09/15 19:20:55 INFO ContextCleaner: Cleaned broadcast 3

    15/09/15 19:20:57 INFO FileOutputCommitter: Saved output of task ‘attempt_201509151920_0004_m_000000_6’ to hdfs://sandbox:8020/user/captain/output/_temporary/0/task_201509151920_0004_m_000000

    15/09/15 19:20:57 INFO SparkHadoopMapRedUtil: attempt_201509151920_0004_m_000000_6: Committed

    15/09/15 19:20:57 INFO Executor: Finished task 0.0 in stage 4.0 (TID 6). 1828 bytes result sent to driver

    15/09/15 19:20:57 INFO TaskSetManager: Starting task 1.0 in stage 4.0 (TID 7, localhost, PROCESS_LOCAL, 1124 bytes)

    15/09/15 19:20:57 INFO Executor: Running task 1.0 in stage 4.0 (TID 7)

    15/09/15 19:20:57 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 6) in 7135 ms on localhost (1/2)

    15/09/15 19:20:57 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks

    15/09/15 19:20:57 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

    15/09/15 19:20:58 INFO ExternalSorter: Thread 65 spilling in-memory map of 84.8 MB to disk (1 time so far)

    15/09/15 19:20:58 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (2 times so far)

    15/09/15 19:20:59 INFO ExternalSorter: Thread 65 spilling in-memory map of 87.6 MB to disk (3 times so far)

    15/09/15 19:21:01 INFO ExternalSorter: Thread 65 spilling in-memory map of 83.6 MB to disk (4 times so far)

    15/09/15 19:21:02 INFO FileOutputCommitter: File Output Committer Algorithm version is 1

    15/09/15 19:21:05 INFO FileOutputCommitter: Saved output of task ‘attempt_201509151920_0004_m_000001_7’ to hdfs://sandbox:8020/user/captain/output/_temporary/0/task_201509151920_0004_m_000001

    15/09/15 19:21:05 INFO SparkHadoopMapRedUtil: attempt_201509151920_0004_m_000001_7: Committed

    15/09/15 19:21:05 INFO Executor: Finished task 1.0 in stage 4.0 (TID 7). 1828 bytes result sent to driver

    15/09/15 19:21:05 INFO TaskSetManager: Finished task 1.0 in stage 4.0 (TID 7) in 8171 ms on localhost (2/2)

    15/09/15 19:21:05 INFO DAGScheduler: Stage 4 (saveAsTextFile at PatentCitation.java:43) finished in 15.306 s

    15/09/15 19:21:05 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool

    15/09/15 19:21:05 INFO DAGScheduler: Job 1 finished: saveAsTextFile at PatentCitation.java:43, took 46.343546 s

     

     

    Results

    [sandbox spark]# vi part-00000

    “CITED” “CITING”

    1 3964859,4647229

    10000 4539112

    100000 5031388

    1000006 4714284

    1000007 4766693

    1000011 5033339

    1000017 3908629

    1000026 4043055

    1000033 4975983,4190903

    1000043 4091523

    1000044 4055371,4082383

    1000045 4290571

    1000046 5918892,5525001

    1000049 5996916

    1000051 4541310

    1000054 4946631

    1000065 4748968

    1000067 5312208,4944640,5071294

    1000070 4928425,5009029

    1000073 5474494,4107819

    1000076 5845593,4867716

    1000083 5322091,5566726

    1000084 4683770,4182197

    1000086 4217220,4686189,4178246,4839046

    1000089 5277853,5571464,5807591,5395228,5503546,5505607,5505610,5505611,5540869,5544405

    1000094 4897975,4920718,5713167

    10001 4228735

    1000102 5120183,5791855

    1000108 4399627

    1000118 5029926,5058767,4919466

    100012 4641675,4838290

    1000120 4546821

    1000122 4048703,4297867

    1000125 4573823

    1000129 4748545

    1000133 5809659,4922621

    1000134 4977842,4175499,4373460

    100014 5179741

    1000145 3961423

    1000149 5876147,5193932

    1000150 4217940,4271874

    1000152 3962981,4032016,4269599

    1000168 4479408

    1000172 4865014,4974551,4653469

    1000173 5303740,5303742,4312704,4346730

    1000178 4830222

    1000179 5182923

    1000181 5185948

    1000185 4082391,3871143,5793021

    1000187 4470573

    1000188 4134380,4023548

    1000193 5092745

    1000195 4922643

    1000199 4166638,4846484,4664399

    1000201 4697655

    1000203 4541584

  • Advanced Map Reduce Program: Patent Citation

    Advanced Map Reduce Program: Patent Citation

    Problem

    Derive Reverse Patent Citation.

     

    Dataset

    Visit http://nber.org/patents/

     

    Download: http://nber.org/patents/acite75_99.zip   zip file

    Unzipping should yield  cite75_99.txt

    This file lists patents and all the patents that it cites.

    This post will explore the problem of reversing this. We want to determine the patents and all its references (that is patents that cite it)

     

    Number of Records

    $wc cite75_99.txt

    16522439 16522439 264075431 cite75_99.txt

     

    16.5 million records

     

    Map Reduce Program

     

    package mapreduce; 
    import java.io.IOException; 
    import org.apache.hadoop.conf.Configuration; 
    import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.io.Text; 
    import org.apache.hadoop.mapreduce.Job; 
    import org.apache.hadoop.mapreduce.Mapper; 
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class PatentCitation {
       public static class  PatentCitationMapper  extends Mapper<Text, Text, Text, Text> { 
          public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
           String[] citation = key.toString().split( "," );
           Text cited = new Text(citation[1]); 
           Text citing = new Text(citation[0]);
           
           context.write(cited, citing);
          }
        }
    
        public static class PatentCitationReducer extends Reducer<Text, Text, Text, Text> {
           @Override
           protected void reduce(Text key, Iterable<Text> values, Context context)  throws  IOException, InterruptedException {
           String csv = "";
           for (Text value : values) {
             if (csv.length() >  0) { 
               csv +=  ","; 
           }
           csv += value.toString();
          }
          context.write(key,  new  Text(csv));
       }
      }
    
      public static void  main(String[] args)  throws  Exception {
        Configuration conf =  new  Configuration();
        Job job = Job.getInstance(conf, "Hadoop Patent Citation Example" );
        job.setJarByClass(PatentCitation.class );
        job.setMapperClass(PatentCitationMapper.class);
        job.setReducerClass(PatentCitationReducer.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class); 
    
       FileInputFormat.addInputPath(job, new Path(args[1]));
       FileOutputFormat.setOutputPath(job, new Path(args[2]));
       System.exit(job.waitForCompletion(true) ? 0 :  1 ); 
       }
    }
    
    

    Package this class as patentcitation.jar

     

    Setup the environment

    Either download the Hadoop distribution from Apache web site or use a sandbox such as Hortonworks Sandbox.

     

    Create HDFS Directories

    [root@sandbox]# hadoop fs -mkdir /user/captain

    [root@sandbox]# hadoop fs -mkdir /user/captain/input

     

    We have created an input directory in hdfs.  Let us copy the patent citation file into hdfs.

    [root@sandbox]# hdfs dfs -copyFromLocal input/cite75_99.txt /user/captain/input

     

    Run the Map Reduce Program

    yarn jar patentcitation.jar PatentCitation /user/captain/input /user/captain/output

     

    15/09/09 19:45:22 INFO impl.TimelineClientImpl: Timeline service address: http://sandbox:8188/ws/v1/timeline/

    15/09/09 19:45:22 INFO client.RMProxy: Connecting to ResourceManager at sandbox/192.168.112.132:8050

    15/09/09 19:45:22 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.

    15/09/09 19:45:23 INFO input.FileInputFormat: Total input paths to process : 1

    15/09/09 19:45:23 INFO mapreduce.JobSubmitter: number of splits:2

    15/09/09 19:45:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1441826432501_0004

    15/09/09 19:45:23 INFO impl.YarnClientImpl: Submitted application application_1441826432501_0004

    15/09/09 19:45:23 INFO mapreduce.Job: The url to track the job: http://sandbox:8088/proxy/application_1441826432501_0004/

    15/09/09 19:45:23 INFO mapreduce.Job: Running job: job_1441826432501_0004

    15/09/09 19:45:29 INFO mapreduce.Job: Job job_1441826432501_0004 running in uber mode : false

    15/09/09 19:45:29 INFO mapreduce.Job:  map 0% reduce 0%

    15/09/09 19:45:40 INFO mapreduce.Job:  map 17% reduce 0%

    15/09/09 19:45:43 INFO mapreduce.Job:  map 24% reduce 0%

    15/09/09 19:45:47 INFO mapreduce.Job:  map 29% reduce 0%

    15/09/09 19:45:50 INFO mapreduce.Job:  map 33% reduce 0%

    15/09/09 19:45:53 INFO mapreduce.Job:  map 41% reduce 0%

    15/09/09 19:45:56 INFO mapreduce.Job:  map 50% reduce 0%

    15/09/09 19:45:59 INFO mapreduce.Job:  map 58% reduce 0%

    15/09/09 19:46:02 INFO mapreduce.Job:  map 65% reduce 0%

    15/09/09 19:46:05 INFO mapreduce.Job:  map 69% reduce 0%

    15/09/09 19:46:08 INFO mapreduce.Job:  map 77% reduce 0%

    15/09/09 19:46:11 INFO mapreduce.Job:  map 89% reduce 0%

    15/09/09 19:46:13 INFO mapreduce.Job:  map 92% reduce 0%

    15/09/09 19:46:14 INFO mapreduce.Job:  map 98% reduce 0%

    15/09/09 19:46:15 INFO mapreduce.Job:  map 100% reduce 0%

    15/09/09 19:46:24 INFO mapreduce.Job:  map 100% reduce 68%

    15/09/09 19:46:27 INFO mapreduce.Job:  map 100% reduce 69%

    15/09/09 19:46:30 INFO mapreduce.Job:  map 100% reduce 71%

    15/09/09 19:46:33 INFO mapreduce.Job:  map 100% reduce 73%

    15/09/09 19:46:36 INFO mapreduce.Job:  map 100% reduce 75%

    15/09/09 19:46:39 INFO mapreduce.Job:  map 100% reduce 77%

    15/09/09 19:46:42 INFO mapreduce.Job:  map 100% reduce 79%

    15/09/09 19:46:45 INFO mapreduce.Job:  map 100% reduce 81%

    15/09/09 19:46:48 INFO mapreduce.Job:  map 100% reduce 83%

    15/09/09 19:46:51 INFO mapreduce.Job:  map 100% reduce 85%

    15/09/09 19:46:54 INFO mapreduce.Job:  map 100% reduce 88%

    15/09/09 19:46:57 INFO mapreduce.Job:  map 100% reduce 91%

    15/09/09 19:47:00 INFO mapreduce.Job:  map 100% reduce 93%

    15/09/09 19:47:03 INFO mapreduce.Job:  map 100% reduce 96%

    15/09/09 19:47:06 INFO mapreduce.Job:  map 100% reduce 98%

    15/09/09 19:47:09 INFO mapreduce.Job:  map 100% reduce 100%

    15/09/09 19:47:11 INFO mapreduce.Job: Job job_1441826432501_0004 completed successfully

    15/09/09 19:47:11 INFO mapreduce.Job: Counters: 49

    File System Counters

    FILE: Number of bytes read=594240702

    FILE: Number of bytes written=891741626

    FILE: Number of read operations=0

    FILE: Number of large read operations=0

    FILE: Number of write operations=0

    HDFS: Number of bytes read=264206769

    HDFS: Number of bytes written=158078539

    HDFS: Number of read operations=9

    HDFS: Number of large read operations=0

    HDFS: Number of write operations=2

    Job Counters

    Launched map tasks=2

    Launched reduce tasks=1

    Data-local map tasks=2

    Total time spent by all maps in occupied slots (ms)=85024

    Total time spent by all reduces in occupied slots (ms)=55354

    Total time spent by all map tasks (ms)=85024

    Total time spent by all reduce tasks (ms)=55354

    Total vcore-seconds taken by all map tasks=85024

    Total vcore-seconds taken by all reduce tasks=55354

    Total megabyte-seconds taken by all map tasks=21256000

    Total megabyte-seconds taken by all reduce tasks=13838500

    Map-Reduce Framework

    Map input records=16522439

    Map output records=16522439

    Map output bytes=264075431

    Map output materialized bytes=297120321

    Input split bytes=266

    Combine input records=0

    Combine output records=0

    Reduce input groups=3258984

    Reduce shuffle bytes=297120321

    Reduce input records=16522439

    Reduce output records=3258984

    Spilled Records=49567317

    Shuffled Maps =2

    Failed Shuffles=0

    Merged Map outputs=2

    GC time elapsed (ms)=3356

    CPU time spent (ms)=136790

    Physical memory (bytes) snapshot=552968192

    Virtual memory (bytes) snapshot=2486214656

    Total committed heap usage (bytes)=412090368

    Shuffle Errors

    BAD_ID=0

    CONNECTION=0

    IO_ERROR=0

    WRONG_LENGTH=0

    WRONG_MAP=0

    WRONG_REDUCE=0

    File Input Format Counters

    Bytes Read=264206503

    File Output Format Counters

    Bytes Written=158078539

     

     

    Copy the result file to local directory

    # hdfs dfs -ls /user/captain/output

    Found 2 items

    -rw-r–r–   1 root hdfs          0 2015-09-09 19:47 /user/captain/output/_SUCCESS

    -rw-r–r–   1 root hdfs  158078539 2015-09-09 19:47 /user/captain/output/part-r-00000

    # hdfs dfs -copyToLocal /user/captain/output/part-r-00000 output/

    15/09/09 19:50:45 WARN hdfs.DFSClient: DFSInputStream has been closed already

     

    View the results

    #vi part-r-0000

     

    “CITED” “CITING”

    1       3964859,4647229

    10000   4539112

    100000  5031388

    1000006 4714284

    1000007 4766693

    1000011 5033339

    1000017 3908629

    1000026 4043055

    1000033 4975983,4190903

    1000043 4091523

    1000044 4082383,4055371

    1000045 4290571

    1000046 5525001,5918892

    1000049 5996916

    1000051 4541310

    1000054 4946631

    1000065 4748968

    1000067 4944640,5071294,5312208

    1000070 5009029,4928425

    1000073 4107819,5474494

    1000076 5845593,4867716

    1000083 5322091,5566726

    1000084 4182197,4683770

    1000086 4686189,4839046,4217220,4178246

    1000089 5505607,5540869,5505610,5544405,5571464,5505611,5277853,5807591,5395228,5503546

    1000094 5713167,4897975,4920718

  • Introduction to Apache Zookeeper

    Apache Zookeeper

     

    zookeeper

     

    Apache Zookeeper is a :

    • centralized
    • high performance
    • coordination system

    for distributed applications.

    Apache Zookeeper enables distributed systems.

     

    Applications using Apache Zookeeper

    • Apache Hadoop
    • Apache HBase
    • Apache Kafka
    • Apache Accumulo
    • Apache Mesos
    • Apache Solr
    • Neo4j

     

    Zookeeper Primitives and Recipes

    Zookeeper provides primitives for distributed coordination. Rather than exposing the primitives directly to client applications, it exposes a file system like API.

    Recipes are the implementations of primitives in Zookeeper. Recipes provide the operations on Zookeeper data nodes (called ZNodes).

    The ZNodes are organized in a hierarchical tree model similar to a file system.

    ZNodes

    zookeeper_tree

    In this diagram,

    the /employees znode is the parent znode for all znodes representing employees. An example is Matt which is a znode employee-1

    the /dept znode is the parent znode for all znodes representing departments. An example is HR which is a znode dept-1

    the /offices znode is the parent znode for all znodes representing offices. An example is Boston which is a znode office-1

    ZNodes can contain data or no data. If there is data in a znode, it is stored as a byte array.

    The leaf nodes in the tree represent the data. Every time data is added, a znode is added. A znode is removed when data is deleted.

    There are 4 modes for Zookeeper ZNodes:

    1. Persistent
    2. Ephemeral
    3. Persistent_Sequential
    4. Ephemeral_Sequential

    Persistent Nodes are znodes that can be deleted only by request. They survive service restarts and are backed up in disk.

    Ephemeral Nodes are znodes that exist as long as the session that created the znode is active. When the session ends the znode is deleted. Because of this behavior, ephemeral znodes are not allowed to have children.

    Sequence: When creating a znode you can also request that ZooKeeper append a monotonically increasing counter to the end of path. This counter is unique to the parent znode. The counter has a format of %010d — that is 10 digits with 0 (zero) padding.

    The Curator framework also defines the following recipe: a persistent ephemeral node is an ephemeral node that attempts to stay present in ZooKeeper, even through connection and session interruptions.

    Zookeeper API

    There are 6 primary operations exposed by the API:

    • create /path data    –  Creates a znode named with /path and containing data
    • delete /path     –  Deletes the znode /path
    • exists /path     – Checks whether /path exists
    • setData /path data    –  Sets the data of znode /path to data
    • getData /path    –  Returns the data in /path
    • getChildren /path    – Returns the list of children under /path

    Installing Zookeeper

    Download stable version of Zookeeper from https://zookeeper.apache.org/releases.html

    $> tar xvz zookeeper-3.4.6.tar.gz
    $> cd zookeeper-3.4.6/conf

     

    Create zoo.cfg file with the following info:

    tickTime=2000
    dataDir=/home/xyz/zookeeper/data
    clientPort=2181

     

    Remember to change the data dir value to something that is writable by the zookeeper process.

     

    $> cd zookeeper-3.4.6/bin
    $>./zkServer.sh start
     JMX enabled by default
     Using config: /home/zyx/zookeeper-3.4.6/bin/../conf/zoo.cfg
     Starting zookeeper ... STARTED

     

    Now that the zookeeper server has started, time to interact with it.

     

    In another terminal/command window, go to the bin directory of your zookeeper installation.

    bin$ ./zkCli.sh -server 127.0.0.1:2181
    Connecting to 127.0.0.1:2181
    2015-09-09 21:22:29,700 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
    2015-09-09 21:22:29,704 [myid:] - INFO [main:Environment@100] - Client environment:host.name=xxx..xxxx.xxx
    2015-09-09 21:22:29,704 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.7.0_51
    2015-09-09 21:22:29,707 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
    2015-09-09 21:22:29,707 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.7.0_51.jdk/Contents/Home/jre
    2015-09-09 21:22:29,707 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/Users/......
    2015-09-09 21:22:29,727 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/Users/xyz/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
    2015-09-09 21:22:29,727 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/var/folders/dt/p17rgljd56v_jd0hy9s73l3w0000gn/T/
    2015-09-09 21:22:29,728 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA>
    2015-09-09 21:22:29,728 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Mac OS X
    2015-09-09 21:22:29,728 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=x86_64
    2015-09-09 21:22:29,728 [myid:] - INFO [main:Environment@100] - Client environment:os.version=10.10.5 
    2015-09-09 21:22:29,729 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/Users/xyz
    2015-09-09 21:22:29,729 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/Users/xyz/zookeeper/zookeeper-3.4.6/bin
    2015-09-09 21:22:29,731 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@1d88a478
    Welcome to ZooKeeper!
    2015-09-09 21:22:29,766 [myid:] - INFO [main-SendThread(127.0.0.1:2181):ClientCnxn$SendThread@975] - Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
    JLine support is enabled
    2015-09-09 21:22:29,775 [myid:] - INFO [main-SendThread(127.0.0.1:2181):ClientCnxn$SendThread@852] - Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
    2015-09-09 21:22:29,806 [myid:] - INFO [main-SendThread(127.0.0.1:2181):ClientCnxn$SendThread@1235] - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x14fb50eae000000, negotiated timeout = 30000
    WATCHER::
    WatchedEvent state:SyncConnected type:None path:null

    Type help to get all the available commands. After that, we are going to use the “ls”, “get” and “set” commands.

    [zk: 127.0.0.1:2181(CONNECTED) 0] help
    ZooKeeper -server host:port cmd args
    connect host:port
    get path [watch]
    ls path [watch]
    set path data [version]
    rmr path
    delquota [-n|-b] path
    quit
    printwatches on|off
    create [-s] [-e] path data acl
    stat path [watch]
    close
    ls2 path [watch]
    history
    listquota path
    setAcl path acl
    getAcl path
    sync path
    redo cmdno
    addauth scheme auth
    delete path [version]
    setquota -n|-b val path
    [zk: 127.0.0.1:2181(CONNECTED) 1]
    
    
    [zk: 127.0.0.1:2181(CONNECTED) 1] ls /
    [zookeeper]
    [zk: 127.0.0.1:2181(CONNECTED) 2] create /blog_testing test_data
    Created /blog_testing
    [zk: 127.0.0.1:2181(CONNECTED) 3] ls /
    [blog_testing, zookeeper]
    [zk: 127.0.0.1:2181(CONNECTED) 4] get /blog_testing
    test_data
    cZxid = 0x2
    ctime = Wed Sep 09 21:48:02 CDT 2015
    mZxid = 0x2
    mtime = Wed Sep 09 21:48:02 CDT 2015
    pZxid = 0x2
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 9
    numChildren = 0
    [zk: 127.0.0.1:2181(CONNECTED) 5] set /blog_testing updated_text
    cZxid = 0x2
    ctime = Wed Sep 09 21:48:02 CDT 2015
    mZxid = 0x3
    mtime = Wed Sep 09 21:48:42 CDT 2015
    pZxid = 0x2
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 12
    numChildren = 0
    [zk: 127.0.0.1:2181(CONNECTED) 6] get /blog_testing
    updated_text
    cZxid = 0x2
    ctime = Wed Sep 09 21:48:02 CDT 2015
    mZxid = 0x3
    mtime = Wed Sep 09 21:48:42 CDT 2015
    pZxid = 0x2
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 12
    numChildren = 0
    [zk: 127.0.0.1:2181(CONNECTED) 7] delete /blog_testing
    [zk: 127.0.0.1:2181(CONNECTED) 8] ls /
    [zookeeper]
    [zk: 127.0.0.1:2181(CONNECTED) 9]
    

     

    To shut down the zookeeper server, in the bin directory

    $>./zkServer.sh stop
    JMX enabled by default
    Using config: /Users/xyz/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg
    Stopping zookeeper ... STOPPED
    
    

    Zookeeper Programming

    If you want to write programs interacting with Zookeeper, you should definitely use the Apache Curator framework.

    curator-logo

     

    Unit Testing with Zookeeper

    The Apache Curator project provides an embedded zookeeper instance that can be used for unit testing.

    import org.apache.curator.test.TestingServer;
    TestingServer testingServer = new TestingServer();
    testingServer.start();
    String zookeeperConnectionStr = testingServer.getConnectString();

     

    Stay Tuned!

  • Zookeeper driven big data infrastructure

    Background

    Any big data infrastructure operating at scale, requires the following technologies:

    • Hadoop
    • Enterprise Search
    • Enterprise Messaging

    Managing these three verticals is a mammoth task.

    zookeeper

    Coordination

    When you have your big data infrastructure scaling according to business needs, you need to choose management technologies that are common/applicable across multiple areas.  This way you minimize the number of complementary technologies in operation at your big data infrastructure.

    One such technology used in management and coordination is Apache Zookeeper.

    When you use the following technologies in your big data infrastructure, you can use Apache Zookeeper for coordination:

    • Hadoop
    • Apache Solr for Enterprise Search
    • Apache Kafka for Enterprise Messaging

    zookeeper_bigdata

    As depicted in the diagram, Zookeeper can be the central pivot for managing your big data infrastructure.

     

    Please do not hesitate to review my Introduction to Zookeeper post.

    Stay Tuned!

     

    References

    Introduction to Zookeeper