Total Pageviews

Thursday, 26 May 2016

Top Customers facing frequent call drops in Roaming Analysis by Scala and Spark


Problem:
You will have a CDR (Call Details Record) file, you need to find out top customers facing frequent call drops in Roaming. This is a very important report which telecom companies use to prevent customer churn out, by calling them back and at the same time contacting their roaming partners to improve the connectivity issues in specific areas.

Sol.
package com.ravi.cdr

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object CallDetailsProblem {
  def main(args:Array[String]){
 
//First we’ll read the data from the csv file
    val sc = new SparkContext(new   SparkConf().setAppName("CallDetailsProblem ").setMaster("local[2]"))
    val logFile = "/home/om_workspace/test.csv"
    val text = sc.textFile(logFile)
 
//As we’re dealing with a CSV file with no headers it’s a good idea to define a case class that defines the schema
   //define the schema using a case class
 
    case class Call(visitor_locn: String, call_duration:
     Integer, phone_no: String, error_code: String)

     //Then create a RDD of Calls
     val calls = text.map(_.split(",")).map(p =>
     Call(p(0),p(1).toInt,p(2),p(3)))
   
    println(calls.count());
    calls.foreach {
      x =>  println(x)
      }
 
     var result = calls.map(x => (x.visitor_locn,1)).reduceByKey(_+_).collect.sortBy(_._2);
   
   // println(result.reverse.mkString("\n"));
    //Number of different customers having errors:
    var result2 = calls.map(x => (x.error_code,1)).reduceByKey(_+_).collect.sortBy(_._2);
    println(result2.reverse.mkString("\n"));
 
  }


}

Tuesday, 10 May 2016

The busy Java developer's guide to Scala: Explore Scala concurrency

In 2003, Herb Sutter exposed the industry's biggest "dirty little secret" with his "The Free Lunch Is Over" article, demonstrating clearly that the era of ever-faster processors was at an end, to be replaced by a new era of parallelism via "cores" (virtual CPUs) on a single chip. The revelation sent shockwaves through the programming community because getting thread-safe code correct has always remained, in theory if not in practice, the province of high-powered software developers way too expensive for your company to hire. A privileged few, it seemed, understood enough about Java's threading model and concurrency APIs and "synchronized" keyword to write code that both provided safety and throughput ... and most of those had learned it the hard way.
It is presumed that the rest of the industry was left to fend for itself, clearly not a desirable conclusion, at least not to the IT departments paying for that software being developed.

Back-to-concurrency basics
Like Scala's sister language in the .NET space, F#, Scala stands as one of several purported solutions to the "concurrency problem." In this column, I have touched on several of Scala's properties that make it more amenable to writing thread-safe code such as immutable objects by default and a design preference for returning copies of objects rather than modifying their contents. Scala's support for concurrency reaches far deeper than just this though; it's high time to start poking around in the Scala libraries to see what lives there.
Before we can get too deep into Scala's concurrency support, it's a good idea to make sure that you have a good understanding of Java's basic concurrency model because Scala's support for concurrency builds, at some level, on top of the features and functionality provided by the JVM and supporting libraries. Toward that end, the code in Listing 1 contains a basic concurrency problem known as the Producer/Consumer problem (as described in the "Guarded Blocks" section of the Sun Java Tutorial). Note that the Java Tutorial version doesn't use thejava.util.concurrent classes in its solution, preferring instead to use the old wait()/notifyAll() methods from java.lang.Object:
Listing 1. Producer/Consumer (pre-Java5)
package com.tedneward.scalaexamples.notj5;

class Producer implements Runnable
{
  private Drop drop;
  private String importantInfo[] = {
    "Mares eat oats",
    "Does eat oats",
    "Little lambs eat ivy",
    "A kid will eat ivy too"
  };

  public Producer(Drop drop) { this.drop = drop; }

  public void run()
  {
    for (int i = 0; i < importantInfo.length; i++)
    {
      drop.put(importantInfo[i]);
    }
    drop.put("DONE");
  }
}

class Consumer implements Runnable
{
  private Drop drop;

  public Consumer(Drop drop) { this.drop = drop; }

  public void run()
  {
    for (String message = drop.take(); !message.equals("DONE");
         message = drop.take())
    {
      System.out.format("MESSAGE RECEIVED: %s%n", message);
    }
  }
}

class Drop
{
  //Message sent from producer to consumer.
  private String message;
  
  //True if consumer should wait for producer to send message,
  //false if producer should wait for consumer to retrieve message.
  private boolean empty = true;

  //Object to use to synchronize against so as to not "leak" the
  //"this" monitor
  private Object lock = new Object();

  public String take()
  {
    synchronized(lock)
    {
      //Wait until message is available.
      while (empty)
      {
        try
        {
          lock.wait();
        }
        catch (InterruptedException e) {}
      }
      //Toggle status.
      empty = true;
      //Notify producer that status has changed.
      lock.notifyAll();
      return message;
    }
  }

  public void put(String message)
  {
    synchronized(lock)
    {
      //Wait until message has been retrieved.
      while (!empty)
      {
        try
        { 
          lock.wait();
        } catch (InterruptedException e) {}
      }
      //Toggle status.
      empty = false;
      //Store message.
      this.message = message;
      //Notify consumer that status has changed.
      lock.notifyAll();
    }
  }
}

public class ProdConSample
{
  public static void main(String[] args)
  {
    Drop drop = new Drop();
    (new Thread(new Producer(drop))).start();
    (new Thread(new Consumer(drop))).start();
  }
}

The Java Tutorial "bug"

Curious readers will probably go and compare this code against that found in the Java Tutorial to see what the difference is; those who do will discover that instead of simply "synchronized"-ing the put and takemethods, I've instead used a lock object stored inside the Drop. The reason for this is simple: The monitor of an object is never encapsulated inside of the class, so as written the Java Tutorial version allows this (obviously insane) code to break it:

public class ProdConSample
{
public static void main(String[] args)
{
Drop drop = new Drop();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
synchronized(drop)
{
Thread.sleep(60 * 60 * 24 * 365 * 10); // sleep for 10 years?!?
}
}
}
By using a private object as the monitor on which the lock is based, this code will have no effect. In essence, now the thread-safety implementation is encapsulated; before it relied on client goodwill to work correctly.
Note: The code I present here is slightly modified from the Sun tutorial solution; there's a small design flaw in the code they present (see The Java Tutorial "bug").
The core of the Producer/Consumer problem is a simple one to understand: one (or more) producer entities want to provide data for one (or more) consumer entities to consume and do something with (in this case it consists of printing the data to the console). The Producer andConsumer classes are pretty straightforward Runnable-implementing classes: TheProducer takes Strings from an array and puts them into a buffer for the Consumer totake as desired.
The hard part of the problem is that if the Producer runs too fast, data will be potentially lost as it is overwritten; if the Consumer runs too fast, data will be potentially double-processed as the Consumer reads the same data twice. The buffer (called the Drop in the Java Tutorial code) must ensure that neither condition occurs. Not to mention that there is no potential for data corruption (hard in the case of String references, but still a concern) as messages are putin and taken out of the buffer.
A full discussion of the subject is best left to Brian Goetz's Java Concurrency in Practice or Doug Lea's earlier Concurrent Programming in Java (see Resources), but a quick rundown of how this code works is necessary before you apply Scala to it.
When the Java compiler sees the synchronized keyword, it generates a try/finallyblock in place of the synchronized block with a monitorenter opcode at the top of the block and a monitorexit opcode in the finally block to ensure that the monitor (the Java basis for atomicity) is released regardless of how the code exits. Thus, the put code in Drop gets rewritten to look like Listing 2:
Listing 2. Drop.put after compiler helpfulness
  // This is pseudocode
  public void put(String message)
  {
    try
    {
   monitorenter(lock)
 
      //Wait until message has been retrieved.
      while (!empty)
      {
        try
        { 
          lock.wait();
        } catch (InterruptedException e) {}
      }
      //Toggle status.
      empty = false;
      //Store message.
      this.message = message;
      //Notify consumer that status has changed.
      lock.notifyAll();
    }
 finally
 {
   monitorexit(lock)
 }
  }
The wait() method tells the current thread to go into an inactive state and wait for another thread to call notifyAll() on that object. The thread just notified must then attempt to acquire the monitor again after which point it is free to continue execution. In essence, wait() andnotify()/notifyAll() act as a simple signaling mechanism, allowing the Drop to coordinate between the Producer and the Consumerthreads, one take to each put.
The code download that accompanies this article uses the Java5 concurrency enhancements (the Lock and Condition interfaces and theReentrantLock lock implementation) to provide timeout-based versions of Listing 2, but the basic code pattern remains the same. That is the problem: Developers who write code like in Listing 2 have to focus too exclusively on the details, the low-level implementation code, of the threading and locking required to make it all work correctly. What's more, developers have to reason about each and every line in the code, looking to see if it needs to be protected because too much synchronization is just as bad as too little.
Now let's look at Scala alternatives.

Good old Scala concurrency (v1)

One way to start working with concurrency in Scala is to simply translate the Java code directly over to Scala, taking advantage of Scala's syntax in places to simplify the code, a least a little:
Listing 3. ProdConSample (Scala)
object ProdConSample
{
  class Producer(drop : Drop)
    extends Runnable
  {
    val importantInfo : Array[String] = Array(
      "Mares eat oats",
      "Does eat oats",
      "Little lambs eat ivy",
      "A kid will eat ivy too"
    );
  
    override def run() : Unit =
    {
      importantInfo.foreach((msg) => drop.put(msg))
      drop.put("DONE")
    }
  }
  
  class Consumer(drop : Drop)
    extends Runnable
  {
    override def run() : Unit =
    {
      var message = drop.take()
      while (message != "DONE")
      {
        System.out.format("MESSAGE RECEIVED: %s%n", message)
        message = drop.take()
      }
    }
  }
  
  class Drop
  {
    var message : String = ""
    var empty : Boolean = true
    var lock : AnyRef = new Object()
  
    def put(x: String) : Unit =
      lock.synchronized
      {
        // Wait until message has been retrieved
        await (empty == true)
        // Toggle status
        empty = false
        // Store message
        message = x
        // Notify consumer that status has changed
        lock.notifyAll()
      }

    def take() : String =
      lock.synchronized
      {
        // Wait until message is available.
        await (empty == false)
        // Toggle status
        empty=true
        // Notify producer that staus has changed
        lock.notifyAll()
        // Return the message
        message
      }

    private def await(cond: => Boolean) =
      while (!cond) { lock.wait() }
  }

  def main(args : Array[String]) : Unit =
  {
    // Create Drop
    val drop = new Drop();
  
    // Spawn Producer
    new Thread(new Producer(drop)).start();
    
    // Spawn Consumer
    new Thread(new Consumer(drop)).start();
  }
}
The Producer and Consumer classes are almost identical to their Java cousins, again extending (implementing) the Runnable interface and overriding the run() method, and — in Producer's case — using the built-in iteration method for each to walk the contents of theimportantInfo array. (Actually, to make it more like Scala, importantInfo should probably be a List instead of an Array, but in this first pass, I want to keep things as close to the original Java code as possible.)
The Drop class also looks similar to the Java version except that in Scala, "synchronized" isn't a keyword, it's a method defined on the classAnyRef, the Scala "root of all reference types." This means that to synchronize on a particular object, you simply call the synchronize method on that object; in this case, on the object held in the lock field on Drop.
Note that we also make use of a Scala-ism in the Drop class in the definition of the await() method: The cond parameter is a block of code waiting to be evaluated rather than evaluated prior to being passed in to the method. Formally in Scala, this is known as "call-by-name"; here it serves as a useful way of capturing the conditional-waiting logic that had to be repeated twice (once in put, once in take) in the Java version.
Finally, in main(), you create the Drop instance, instantiate two threads, kick them off with start(), and then simply fall off of the end ofmain(), trusting that the JVM will have started those two threads before you finish with main(). (In production code, this probably shouldn't be taken for granted, but for a simple example like this, it's going to be OK 99.99 percent of the time. Caveat emptor.)
However, having said all that, the same basic problem remains: Programmers still have to worry way too much about the issues of signaling and coordinating the two threads. While some of the Scala-isms might make the syntax easier to live with, it's not really an incredibly compelling win so far.

Scala concurrency, v2

A quick look at the Scala Library Reference reveals an interesting package: scala.concurrency. This package contains a number of different concurrency constructs, including the first one we're going to make use of, the MailBox class.
As its name implies, MailBox is essentially the Drop by itself, a single-slot buffer that holds a piece of data until it has been retrieved. However, the big advantage of MailBox is that it completely encapsulates the details of the sending and receiving behind a combination of pattern-matching and case classes, making it more flexible than the simple Drop (or the Drop's big multi-slot data-holding brother,java.util.concurrent.BoundedBuffer).
Listing 4. ProdConSample, v2 (Scala)
package com.tedneward.scalaexamples.scala.V2
{
  import concurrent.{MailBox, ops}

  object ProdConSample
  {
    class Producer(drop : Drop)
      extends Runnable
    {
      val importantInfo : Array[String] = Array(
        "Mares eat oats",
        "Does eat oats",
        "Little lambs eat ivy",
        "A kid will eat ivy too"
      );
    
      override def run() : Unit =
      {
        importantInfo.foreach((msg) => drop.put(msg))
        drop.put("DONE")
      }
    }
    
    class Consumer(drop : Drop)
      extends Runnable
    {
      override def run() : Unit =
      {
        var message = drop.take()
        while (message != "DONE")
        {
          System.out.format("MESSAGE RECEIVED: %s%n", message)
          message = drop.take()
        }
      }
    }

    class Drop
    {
      private val m = new MailBox()
      
      private case class Empty()
      private case class Full(x : String)
      
      m send Empty()  // initialization
      
      def put(msg : String) : Unit =
      {
        m receive
        {
          case Empty() =>
            m send Full(msg)
        }
      }
      
      def take() : String =
      {
        m receive
        {
          case Full(msg) =>
            m send Empty(); msg
        }
      }
    }
  
    def main(args : Array[String]) : Unit =
    {
      // Create Drop
      val drop = new Drop()
      
      // Spawn Producer
      new Thread(new Producer(drop)).start();
      
      // Spawn Consumer
      new Thread(new Consumer(drop)).start();
    }
  }
}
The only difference here between v2 and v1 is in the implementation of Drop, which now makes use of the MailBox class to handle the blocking and signaling of messages coming in and being removed from the Drop. (We could have rewritten Producer and Consumer to use the MailBoxdirectly, but for simplicity's sake, I assume that we want to keep the Drop API consistent across all the examples.) Using a MailBox is a bit different from the classic BoundedBuffer (Drop) that we've been using, so let's walk through that code in detail.
MailBox has two basic operations: send and receive. The receiveWithin method is simply a timeout-based version of receiveMailBoxtakes messages that can be of any type whatsoever. The send() method essentially drops the message into the mailbox, notifying any pending receivers immediately if it's of a type they care about, and appending it to a linked list of messages for later retrieval. The receive() method blocks until a message appropriate to the function block that's passed in to it is received.
Therefore, in this situation, we create two case classes, one containing nothing (Empty) that indicates the MailBox is empty and one containing the data (Full) with the message data in it.
  • The put method, because it is putting data into the Drop, calls receive() on the MailBox looking for an Empty instance, thus blocking untilEmpty has been sent. At this point, it sends a Full instance to the MailBox containing the new data.
  • The take method, because it is removing data from the Drop, calls receive() on the MailBox looking for a Full instance, extracts the message (again thanks to pattern-matching's ability to extract values from inside the case class and bind them to local variables), and sends anEmpty instance to the MailBox.
No explicit locking required, and no thinking about monitors.

Scala concurrency, v3

In fact, we can shorten the code up considerably if it turns out that Producer and Consumer don't really have to be full-fledged classes at all (which is the case here) — both are essentially thin wrappers around the Runnable.run() method, which Scala can do away with entirely by using the scala.concurrent.ops object's spawn method, like in Listing 5:
Listing 5. ProdConSample, v3 (Scala)
package com.tedneward.scalaexamples.scala.V3
{
  import concurrent.MailBox
  import concurrent.ops._

  object ProdConSample
  {
    class Drop
    {
      private val m = new MailBox()
      
      private case class Empty()
      private case class Full(x : String)
      
      m send Empty()  // initialization
      
      def put(msg : String) : Unit =
      {
        m receive
        {
          case Empty() =>
            m send Full(msg)
        }
      }
      
      def take() : String =
      {
        m receive
        {
          case Full(msg) =>
            m send Empty(); msg
        }
      }
    }
  
    def main(args : Array[String]) : Unit =
    {
      // Create Drop
      val drop = new Drop()
      
      // Spawn Producer
      spawn
      {
        val importantInfo : Array[String] = Array(
          "Mares eat oats",
          "Does eat oats",
          "Little lambs eat ivy",
          "A kid will eat ivy too"
        );
        
        importantInfo.foreach((msg) => drop.put(msg))
        drop.put("DONE")
      }
      
      // Spawn Consumer
      spawn
      {
        var message = drop.take()
        while (message != "DONE")
        {
          System.out.format("MESSAGE RECEIVED: %s%n", message)
          message = drop.take()
        }
      }
    }
  }
}
The spawn method (imported via the ops object just at the top of the package block) takes a block of code (another by-name parameter example) and wraps it inside the run() method of an anonymously-constructed thread object. In fact, it's not too difficult to understand what spawn's definition looks like inside of the ops class:
Listing 6. scala.concurrent.ops.spawn()
  def spawn(p: => Unit) = {
    val t = new Thread() { override def run() = p }
    t.start()
  }
... which once again highlights the power of by-name parameters.
One drawback to the ops.spawn method is the basic fact that it was written in 2003 before the Java 5 concurrency classes had taken effect. In particular, the java.util.concurrent.Executor and its ilk were created to make things easier for developers to spawn threads without having to actually handle the details of creating thread objects directly. Fortunately, spawn's definition is simple enough to recreate in a custom library of your own, making use of Executor (or ExecutorService or ScheduledExecutorService) to do the actual launching of the thread.
In fact, Scala's support for concurrency goes well beyond the MailBox and ops classes; Scala also supports a similar concept called "Actors," which uses a similar kind of message-passing approach that the MailBox uses, but to a much greater degree and with much more flexibility. But that's for next time.

Conclusion

Scala provides two levels of support for concurrency, much as it does for other Java-related topics:
  • The first, full access to the underlying libraries (such as java.util.concurrent) and support for the "traditional" Java concurrency semantics (such as monitors and wait()/notifyAll()).
  • The second, a layer of abstraction on top of those basic mechanics, as exemplified by the MailBox class discussed in this article and the Actors library that we'll discuss in the next article in the series.
The goal is the same in both cases: to make it easier for developers to focus on the meat of the problem rather than having to think about the low-level details of concurrent programming (obviously the second approach achieves that better than the first, at least to those who aren't too deeply invested in thinking at the low-level primitives level).
One clear deficiency to the current Scala libraries, however, is the obvious lack of Java 5 support; the scala.concurrent.ops class should have operations like spawn that make use of the new Executor interfaces. It should also support versions of synchronized that make use of the new Lock interfaces. Fortunately, these are all library improvements that can be done at any point during Scala's life cycle without breaking existing code; they can even be done by Scala developers themselves without having to wait for Scala's core development team to provide it to them (all it takes is a little time).