Total Pageviews

Friday 14 October 2016

TensorFlow Machine Learning System






Google recently open sourced TensorFlow providing access to a powerful machine learning system. TensorFlow is a machine learning library with tools for data scientists to design intelligent systems (interface for expressing machine learning algorithms and implementation for executing such algorithms). It runs on CPUs or GPUs, and on desktop, server, laptop, or mobile platforms with a single API. See paper here.

Originally developed by the Google Brain Team in Machine Intelligence Research, TensorFlow has a flexible, portable and general architecture for a wide variety of applications. The system has been used for deploying machine learning systems for information retrieval, simulations, speech recognition, computer vision, robotics, natural language processing, geographic information extraction, and computational drug discovery.

The system uses data flow graphs where data with multiple dimensions (values) are passed along from mathematical computation to mathematical computation. Complex bits of data are tensors and math-y bits are nodes, and tensors flow through the graph of nodes. The way data transforms from node to node tells the system relationships in the data.

What is really cool is the systems flexibility and simplicity with ability to quickly experiment with different ideas on the laptop, easily move into production, use GPUs with no code changes, and deploy on a mobile device.

The ability to simply and easily deploy products on mobile devices is a valuable feature. It can be used on a wide variety of heterogeneous systems, including large-scale distributed systems.

TensorFlow has an Apache 2.0 open source license for use commercially.

Sunday 21 August 2016

What’s the future for IoT?

We asked 19 executives involved in different IoT companies this question. Their answers predict a future of completely connected devices and real-time access to virtually any data imaginable for completely informed decision making.
Here’s what they said:
  1. We’ll overestimate what it can do in 10 years and underestimate what it can do in 20. More seamless interaction. We’re removing humanity—in some places this is good, in others? (e.g., electronic seal with alzheimers patient versus a real person).
  2. The ability to aggregate and analyze disparate data. Whatever you can monitor/measure and imagine, you’ll be able to do.
  3. We are currently at the Internet of Thing. Every manufacturer is creating a single product. We have to go across manufacturers and start talking to one another. This is two to five years aways.
  4. The market is endless. It’s exciting. Build great software with a sophisticated backend with multiple security levels like Cisco is offering. Bring order and sophistication to data. Security is currently 10th on the list. It needs to be in the top five. Security is an art that involves cryptography. Most companies don’t have the talent they need to develop secure products.
  5. Very wild west. Major players will emerge. Platform wars will enable a vast network of sensors to work together. A dominant platform will emerge. There are a lot of custom platforms right now. We’ll see more standardization of platforms and more plug and play. Right now everyone is doing different things for different reasons - raw socket versus HTTP requests. Things will standardize over time. Ten years ago I worked on a vehicle management system that, today, has evolved to more standardized common protocol for a more reliable and scalable infrastructure. Hardware manufacturers that make server boards are now doing distributed processing because data is not going to a centralized location. There’s so much data, you have to decide what’s relevant to keep.
  6. It’s becoming ubiquitous. In 10 years, anything that needs to be connected will have the ability to be present on the internet.
  7. We will have 25 to 50 billion connected devices which will connect via a mesh network. This will bring global, ubiquitous access to data and will make the life of telcos more difficult. Google is looking in to ubiquitous wi-fi - the devices can connect to the network or range of devices through a decentralized mesh network. Cloud networks are expanding. The global cloud will have all devices residing there. Render connected devices provide computational power to someone on the other side of the world.
  8. More and more devices are connected to each other via APTs. Connect all the things that you want. Drive to your home, garage door opens without pressing a button, coffee starts, heat adjusts, etc.
  9. We’re just seeing the tip of the iceberg. Adoption of technology is a fraction of what it will be.
  10. Piece of human computer interaction. Bigger than putting things on the internet. Make it understandable, understanding, enhance humanity. Make IoT move in a more humanistic way.
  11. Unified ecosystem of all devices where companies talk to each other. There has to be a standard. Maybe it’s established by Apple or Google because of the size of their user bases.
  12. We will continue to see the evolution of personal devices - glasses, watches, personal devices, home devices. Hardware will connect everything. This will be led at the enterprise level by the IBMs and Ciscos. I’m not as close to the enterprise level. Will introduce to Belkin and Fitbit.
  13. The ability to use data from devices to make intelligent, informed decisions.
  14. Using data to iterate your products quickly while they’re in the customers’ hands.
  15. Creating a smarter world. Companies not working in silos. Open to other partners, providers and vendors to promote the sharing of information, automation and best practices.
  16. Remaking everything - healthcare, cars, cities, homes, industry.
  17. It’s exciting. We haven’t even started to fully understand the possibilities of getting remote control via the phone. Learning patterns will automatically adjust over time and people will see new possibilities.
  18. Visibility and reliability. The percent of industrial equipment that is currently connected is between one and 25% depending on the company. There’s tremendous opportunity to connect equipment. Likewise, the probability of detecting an important event without false positives is about 30%, except at GE where we’re a world class 99%. The ability to tell what’s wrong and to diagnose a solution is nowhere near 99%. We’re probably at 30% right now. We must improve the model of the machine and the people that interacted with it and what they did. When we’re able to capture more information on the machines and the people, we’ll move from anomaly detection to correction. Providing better operational data to the cloud will help us improve operational optimization.
  19. It will evolve like other IT. Wildly hyped and then clocked and scaled over the next five to seven years. The next frontier will be more customer centric. We’re currently working with a commercial carpet manufacturer putting RFID chips in squares of carpet to see traffic patterns in stores.

Thursday 26 May 2016

Top Customers facing frequent call drops in Roaming Analysis by Scala and Spark


Problem:
You will have a CDR (Call Details Record) file, you need to find out top customers facing frequent call drops in Roaming. This is a very important report which telecom companies use to prevent customer churn out, by calling them back and at the same time contacting their roaming partners to improve the connectivity issues in specific areas.

Sol.
package com.ravi.cdr

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object CallDetailsProblem {
  def main(args:Array[String]){
 
//First we’ll read the data from the csv file
    val sc = new SparkContext(new   SparkConf().setAppName("CallDetailsProblem ").setMaster("local[2]"))
    val logFile = "/home/om_workspace/test.csv"
    val text = sc.textFile(logFile)
 
//As we’re dealing with a CSV file with no headers it’s a good idea to define a case class that defines the schema
   //define the schema using a case class
 
    case class Call(visitor_locn: String, call_duration:
     Integer, phone_no: String, error_code: String)

     //Then create a RDD of Calls
     val calls = text.map(_.split(",")).map(p =>
     Call(p(0),p(1).toInt,p(2),p(3)))
   
    println(calls.count());
    calls.foreach {
      x =>  println(x)
      }
 
     var result = calls.map(x => (x.visitor_locn,1)).reduceByKey(_+_).collect.sortBy(_._2);
   
   // println(result.reverse.mkString("\n"));
    //Number of different customers having errors:
    var result2 = calls.map(x => (x.error_code,1)).reduceByKey(_+_).collect.sortBy(_._2);
    println(result2.reverse.mkString("\n"));
 
  }


}

Tuesday 10 May 2016

The busy Java developer's guide to Scala: Explore Scala concurrency

In 2003, Herb Sutter exposed the industry's biggest "dirty little secret" with his "The Free Lunch Is Over" article, demonstrating clearly that the era of ever-faster processors was at an end, to be replaced by a new era of parallelism via "cores" (virtual CPUs) on a single chip. The revelation sent shockwaves through the programming community because getting thread-safe code correct has always remained, in theory if not in practice, the province of high-powered software developers way too expensive for your company to hire. A privileged few, it seemed, understood enough about Java's threading model and concurrency APIs and "synchronized" keyword to write code that both provided safety and throughput ... and most of those had learned it the hard way.
It is presumed that the rest of the industry was left to fend for itself, clearly not a desirable conclusion, at least not to the IT departments paying for that software being developed.

Back-to-concurrency basics
Like Scala's sister language in the .NET space, F#, Scala stands as one of several purported solutions to the "concurrency problem." In this column, I have touched on several of Scala's properties that make it more amenable to writing thread-safe code such as immutable objects by default and a design preference for returning copies of objects rather than modifying their contents. Scala's support for concurrency reaches far deeper than just this though; it's high time to start poking around in the Scala libraries to see what lives there.
Before we can get too deep into Scala's concurrency support, it's a good idea to make sure that you have a good understanding of Java's basic concurrency model because Scala's support for concurrency builds, at some level, on top of the features and functionality provided by the JVM and supporting libraries. Toward that end, the code in Listing 1 contains a basic concurrency problem known as the Producer/Consumer problem (as described in the "Guarded Blocks" section of the Sun Java Tutorial). Note that the Java Tutorial version doesn't use thejava.util.concurrent classes in its solution, preferring instead to use the old wait()/notifyAll() methods from java.lang.Object:
Listing 1. Producer/Consumer (pre-Java5)
package com.tedneward.scalaexamples.notj5;

class Producer implements Runnable
{
  private Drop drop;
  private String importantInfo[] = {
    "Mares eat oats",
    "Does eat oats",
    "Little lambs eat ivy",
    "A kid will eat ivy too"
  };

  public Producer(Drop drop) { this.drop = drop; }

  public void run()
  {
    for (int i = 0; i < importantInfo.length; i++)
    {
      drop.put(importantInfo[i]);
    }
    drop.put("DONE");
  }
}

class Consumer implements Runnable
{
  private Drop drop;

  public Consumer(Drop drop) { this.drop = drop; }

  public void run()
  {
    for (String message = drop.take(); !message.equals("DONE");
         message = drop.take())
    {
      System.out.format("MESSAGE RECEIVED: %s%n", message);
    }
  }
}

class Drop
{
  //Message sent from producer to consumer.
  private String message;
  
  //True if consumer should wait for producer to send message,
  //false if producer should wait for consumer to retrieve message.
  private boolean empty = true;

  //Object to use to synchronize against so as to not "leak" the
  //"this" monitor
  private Object lock = new Object();

  public String take()
  {
    synchronized(lock)
    {
      //Wait until message is available.
      while (empty)
      {
        try
        {
          lock.wait();
        }
        catch (InterruptedException e) {}
      }
      //Toggle status.
      empty = true;
      //Notify producer that status has changed.
      lock.notifyAll();
      return message;
    }
  }

  public void put(String message)
  {
    synchronized(lock)
    {
      //Wait until message has been retrieved.
      while (!empty)
      {
        try
        { 
          lock.wait();
        } catch (InterruptedException e) {}
      }
      //Toggle status.
      empty = false;
      //Store message.
      this.message = message;
      //Notify consumer that status has changed.
      lock.notifyAll();
    }
  }
}

public class ProdConSample
{
  public static void main(String[] args)
  {
    Drop drop = new Drop();
    (new Thread(new Producer(drop))).start();
    (new Thread(new Consumer(drop))).start();
  }
}

The Java Tutorial "bug"

Curious readers will probably go and compare this code against that found in the Java Tutorial to see what the difference is; those who do will discover that instead of simply "synchronized"-ing the put and takemethods, I've instead used a lock object stored inside the Drop. The reason for this is simple: The monitor of an object is never encapsulated inside of the class, so as written the Java Tutorial version allows this (obviously insane) code to break it:

public class ProdConSample
{
public static void main(String[] args)
{
Drop drop = new Drop();
(new Thread(new Producer(drop))).start();
(new Thread(new Consumer(drop))).start();
synchronized(drop)
{
Thread.sleep(60 * 60 * 24 * 365 * 10); // sleep for 10 years?!?
}
}
}
By using a private object as the monitor on which the lock is based, this code will have no effect. In essence, now the thread-safety implementation is encapsulated; before it relied on client goodwill to work correctly.
Note: The code I present here is slightly modified from the Sun tutorial solution; there's a small design flaw in the code they present (see The Java Tutorial "bug").
The core of the Producer/Consumer problem is a simple one to understand: one (or more) producer entities want to provide data for one (or more) consumer entities to consume and do something with (in this case it consists of printing the data to the console). The Producer andConsumer classes are pretty straightforward Runnable-implementing classes: TheProducer takes Strings from an array and puts them into a buffer for the Consumer totake as desired.
The hard part of the problem is that if the Producer runs too fast, data will be potentially lost as it is overwritten; if the Consumer runs too fast, data will be potentially double-processed as the Consumer reads the same data twice. The buffer (called the Drop in the Java Tutorial code) must ensure that neither condition occurs. Not to mention that there is no potential for data corruption (hard in the case of String references, but still a concern) as messages are putin and taken out of the buffer.
A full discussion of the subject is best left to Brian Goetz's Java Concurrency in Practice or Doug Lea's earlier Concurrent Programming in Java (see Resources), but a quick rundown of how this code works is necessary before you apply Scala to it.
When the Java compiler sees the synchronized keyword, it generates a try/finallyblock in place of the synchronized block with a monitorenter opcode at the top of the block and a monitorexit opcode in the finally block to ensure that the monitor (the Java basis for atomicity) is released regardless of how the code exits. Thus, the put code in Drop gets rewritten to look like Listing 2:
Listing 2. Drop.put after compiler helpfulness
  // This is pseudocode
  public void put(String message)
  {
    try
    {
   monitorenter(lock)
 
      //Wait until message has been retrieved.
      while (!empty)
      {
        try
        { 
          lock.wait();
        } catch (InterruptedException e) {}
      }
      //Toggle status.
      empty = false;
      //Store message.
      this.message = message;
      //Notify consumer that status has changed.
      lock.notifyAll();
    }
 finally
 {
   monitorexit(lock)
 }
  }
The wait() method tells the current thread to go into an inactive state and wait for another thread to call notifyAll() on that object. The thread just notified must then attempt to acquire the monitor again after which point it is free to continue execution. In essence, wait() andnotify()/notifyAll() act as a simple signaling mechanism, allowing the Drop to coordinate between the Producer and the Consumerthreads, one take to each put.
The code download that accompanies this article uses the Java5 concurrency enhancements (the Lock and Condition interfaces and theReentrantLock lock implementation) to provide timeout-based versions of Listing 2, but the basic code pattern remains the same. That is the problem: Developers who write code like in Listing 2 have to focus too exclusively on the details, the low-level implementation code, of the threading and locking required to make it all work correctly. What's more, developers have to reason about each and every line in the code, looking to see if it needs to be protected because too much synchronization is just as bad as too little.
Now let's look at Scala alternatives.

Good old Scala concurrency (v1)

One way to start working with concurrency in Scala is to simply translate the Java code directly over to Scala, taking advantage of Scala's syntax in places to simplify the code, a least a little:
Listing 3. ProdConSample (Scala)
object ProdConSample
{
  class Producer(drop : Drop)
    extends Runnable
  {
    val importantInfo : Array[String] = Array(
      "Mares eat oats",
      "Does eat oats",
      "Little lambs eat ivy",
      "A kid will eat ivy too"
    );
  
    override def run() : Unit =
    {
      importantInfo.foreach((msg) => drop.put(msg))
      drop.put("DONE")
    }
  }
  
  class Consumer(drop : Drop)
    extends Runnable
  {
    override def run() : Unit =
    {
      var message = drop.take()
      while (message != "DONE")
      {
        System.out.format("MESSAGE RECEIVED: %s%n", message)
        message = drop.take()
      }
    }
  }
  
  class Drop
  {
    var message : String = ""
    var empty : Boolean = true
    var lock : AnyRef = new Object()
  
    def put(x: String) : Unit =
      lock.synchronized
      {
        // Wait until message has been retrieved
        await (empty == true)
        // Toggle status
        empty = false
        // Store message
        message = x
        // Notify consumer that status has changed
        lock.notifyAll()
      }

    def take() : String =
      lock.synchronized
      {
        // Wait until message is available.
        await (empty == false)
        // Toggle status
        empty=true
        // Notify producer that staus has changed
        lock.notifyAll()
        // Return the message
        message
      }

    private def await(cond: => Boolean) =
      while (!cond) { lock.wait() }
  }

  def main(args : Array[String]) : Unit =
  {
    // Create Drop
    val drop = new Drop();
  
    // Spawn Producer
    new Thread(new Producer(drop)).start();
    
    // Spawn Consumer
    new Thread(new Consumer(drop)).start();
  }
}
The Producer and Consumer classes are almost identical to their Java cousins, again extending (implementing) the Runnable interface and overriding the run() method, and — in Producer's case — using the built-in iteration method for each to walk the contents of theimportantInfo array. (Actually, to make it more like Scala, importantInfo should probably be a List instead of an Array, but in this first pass, I want to keep things as close to the original Java code as possible.)
The Drop class also looks similar to the Java version except that in Scala, "synchronized" isn't a keyword, it's a method defined on the classAnyRef, the Scala "root of all reference types." This means that to synchronize on a particular object, you simply call the synchronize method on that object; in this case, on the object held in the lock field on Drop.
Note that we also make use of a Scala-ism in the Drop class in the definition of the await() method: The cond parameter is a block of code waiting to be evaluated rather than evaluated prior to being passed in to the method. Formally in Scala, this is known as "call-by-name"; here it serves as a useful way of capturing the conditional-waiting logic that had to be repeated twice (once in put, once in take) in the Java version.
Finally, in main(), you create the Drop instance, instantiate two threads, kick them off with start(), and then simply fall off of the end ofmain(), trusting that the JVM will have started those two threads before you finish with main(). (In production code, this probably shouldn't be taken for granted, but for a simple example like this, it's going to be OK 99.99 percent of the time. Caveat emptor.)
However, having said all that, the same basic problem remains: Programmers still have to worry way too much about the issues of signaling and coordinating the two threads. While some of the Scala-isms might make the syntax easier to live with, it's not really an incredibly compelling win so far.

Scala concurrency, v2

A quick look at the Scala Library Reference reveals an interesting package: scala.concurrency. This package contains a number of different concurrency constructs, including the first one we're going to make use of, the MailBox class.
As its name implies, MailBox is essentially the Drop by itself, a single-slot buffer that holds a piece of data until it has been retrieved. However, the big advantage of MailBox is that it completely encapsulates the details of the sending and receiving behind a combination of pattern-matching and case classes, making it more flexible than the simple Drop (or the Drop's big multi-slot data-holding brother,java.util.concurrent.BoundedBuffer).
Listing 4. ProdConSample, v2 (Scala)
package com.tedneward.scalaexamples.scala.V2
{
  import concurrent.{MailBox, ops}

  object ProdConSample
  {
    class Producer(drop : Drop)
      extends Runnable
    {
      val importantInfo : Array[String] = Array(
        "Mares eat oats",
        "Does eat oats",
        "Little lambs eat ivy",
        "A kid will eat ivy too"
      );
    
      override def run() : Unit =
      {
        importantInfo.foreach((msg) => drop.put(msg))
        drop.put("DONE")
      }
    }
    
    class Consumer(drop : Drop)
      extends Runnable
    {
      override def run() : Unit =
      {
        var message = drop.take()
        while (message != "DONE")
        {
          System.out.format("MESSAGE RECEIVED: %s%n", message)
          message = drop.take()
        }
      }
    }

    class Drop
    {
      private val m = new MailBox()
      
      private case class Empty()
      private case class Full(x : String)
      
      m send Empty()  // initialization
      
      def put(msg : String) : Unit =
      {
        m receive
        {
          case Empty() =>
            m send Full(msg)
        }
      }
      
      def take() : String =
      {
        m receive
        {
          case Full(msg) =>
            m send Empty(); msg
        }
      }
    }
  
    def main(args : Array[String]) : Unit =
    {
      // Create Drop
      val drop = new Drop()
      
      // Spawn Producer
      new Thread(new Producer(drop)).start();
      
      // Spawn Consumer
      new Thread(new Consumer(drop)).start();
    }
  }
}
The only difference here between v2 and v1 is in the implementation of Drop, which now makes use of the MailBox class to handle the blocking and signaling of messages coming in and being removed from the Drop. (We could have rewritten Producer and Consumer to use the MailBoxdirectly, but for simplicity's sake, I assume that we want to keep the Drop API consistent across all the examples.) Using a MailBox is a bit different from the classic BoundedBuffer (Drop) that we've been using, so let's walk through that code in detail.
MailBox has two basic operations: send and receive. The receiveWithin method is simply a timeout-based version of receiveMailBoxtakes messages that can be of any type whatsoever. The send() method essentially drops the message into the mailbox, notifying any pending receivers immediately if it's of a type they care about, and appending it to a linked list of messages for later retrieval. The receive() method blocks until a message appropriate to the function block that's passed in to it is received.
Therefore, in this situation, we create two case classes, one containing nothing (Empty) that indicates the MailBox is empty and one containing the data (Full) with the message data in it.
  • The put method, because it is putting data into the Drop, calls receive() on the MailBox looking for an Empty instance, thus blocking untilEmpty has been sent. At this point, it sends a Full instance to the MailBox containing the new data.
  • The take method, because it is removing data from the Drop, calls receive() on the MailBox looking for a Full instance, extracts the message (again thanks to pattern-matching's ability to extract values from inside the case class and bind them to local variables), and sends anEmpty instance to the MailBox.
No explicit locking required, and no thinking about monitors.

Scala concurrency, v3

In fact, we can shorten the code up considerably if it turns out that Producer and Consumer don't really have to be full-fledged classes at all (which is the case here) — both are essentially thin wrappers around the Runnable.run() method, which Scala can do away with entirely by using the scala.concurrent.ops object's spawn method, like in Listing 5:
Listing 5. ProdConSample, v3 (Scala)
package com.tedneward.scalaexamples.scala.V3
{
  import concurrent.MailBox
  import concurrent.ops._

  object ProdConSample
  {
    class Drop
    {
      private val m = new MailBox()
      
      private case class Empty()
      private case class Full(x : String)
      
      m send Empty()  // initialization
      
      def put(msg : String) : Unit =
      {
        m receive
        {
          case Empty() =>
            m send Full(msg)
        }
      }
      
      def take() : String =
      {
        m receive
        {
          case Full(msg) =>
            m send Empty(); msg
        }
      }
    }
  
    def main(args : Array[String]) : Unit =
    {
      // Create Drop
      val drop = new Drop()
      
      // Spawn Producer
      spawn
      {
        val importantInfo : Array[String] = Array(
          "Mares eat oats",
          "Does eat oats",
          "Little lambs eat ivy",
          "A kid will eat ivy too"
        );
        
        importantInfo.foreach((msg) => drop.put(msg))
        drop.put("DONE")
      }
      
      // Spawn Consumer
      spawn
      {
        var message = drop.take()
        while (message != "DONE")
        {
          System.out.format("MESSAGE RECEIVED: %s%n", message)
          message = drop.take()
        }
      }
    }
  }
}
The spawn method (imported via the ops object just at the top of the package block) takes a block of code (another by-name parameter example) and wraps it inside the run() method of an anonymously-constructed thread object. In fact, it's not too difficult to understand what spawn's definition looks like inside of the ops class:
Listing 6. scala.concurrent.ops.spawn()
  def spawn(p: => Unit) = {
    val t = new Thread() { override def run() = p }
    t.start()
  }
... which once again highlights the power of by-name parameters.
One drawback to the ops.spawn method is the basic fact that it was written in 2003 before the Java 5 concurrency classes had taken effect. In particular, the java.util.concurrent.Executor and its ilk were created to make things easier for developers to spawn threads without having to actually handle the details of creating thread objects directly. Fortunately, spawn's definition is simple enough to recreate in a custom library of your own, making use of Executor (or ExecutorService or ScheduledExecutorService) to do the actual launching of the thread.
In fact, Scala's support for concurrency goes well beyond the MailBox and ops classes; Scala also supports a similar concept called "Actors," which uses a similar kind of message-passing approach that the MailBox uses, but to a much greater degree and with much more flexibility. But that's for next time.

Conclusion

Scala provides two levels of support for concurrency, much as it does for other Java-related topics:
  • The first, full access to the underlying libraries (such as java.util.concurrent) and support for the "traditional" Java concurrency semantics (such as monitors and wait()/notifyAll()).
  • The second, a layer of abstraction on top of those basic mechanics, as exemplified by the MailBox class discussed in this article and the Actors library that we'll discuss in the next article in the series.
The goal is the same in both cases: to make it easier for developers to focus on the meat of the problem rather than having to think about the low-level details of concurrent programming (obviously the second approach achieves that better than the first, at least to those who aren't too deeply invested in thinking at the low-level primitives level).
One clear deficiency to the current Scala libraries, however, is the obvious lack of Java 5 support; the scala.concurrent.ops class should have operations like spawn that make use of the new Executor interfaces. It should also support versions of synchronized that make use of the new Lock interfaces. Fortunately, these are all library improvements that can be done at any point during Scala's life cycle without breaking existing code; they can even be done by Scala developers themselves without having to wait for Scala's core development team to provide it to them (all it takes is a little time).

Saturday 16 April 2016

Development and deployment of Spark applications with Scala, Eclipse, and sbt – Installation & configuration

The purpose of this tutorial is to setup the necessary environment for development and deployment of Spark applications with Scala. Specifically, we are going to use the Eclipse IDE for development of applications and deploy them with spark-submit. The glue that ties everything together is the sbtinteractive build tool. The sbt tool provides plugins used to:
  1. Create an Eclipse Scala project with Spark dependencies
  2. Create a jar assembly with all necessary dependencies so that it can be deployed and launched using spark-submit
The steps presented assume just a basic Linux installation with Java SE Development Kit 7. We are going to download, install, and configure the following software components:
  1. The latest sbt building tool
  2. Scala IDE for Eclipse
  3. Spark 1.4.1

Installation instructions

Installing sbt
sbt download and installation is straightforward, as shown in the commands below:
~$ wget https://dl.bintray.com/sbt/native-packages/sbt/0.13.8/sbt-0.13.8.tgz
~$ gunzip sbt-0.13.8.tgz
~$ tar -xvf sbt-0.13.8.tar
~$ export PATH=$PATH:~/sbt/bin
The last command adds the sbt executable into the PATH shell variable. Now we can call sbt from any directory to create and package our projects. The first time it runs it will need to fetch some data over the internet, so be patient!
We are not quite done with the sbt yet. We need to install two very important plugins.
sbteclipse plugin
sbteclipse is the sbt plugin for creating Eclipse project definitions.
Add sbteclipse to your plugin definition file (or create one if doesn’t exist). You can use either:
  • the global file (for version 0.13 and up) at ~/.sbt/0.13/plugins/plugins.sbt
  • the project-specific file at PROJECT_DIR/project/plugins.sbt
For the latest version add the following line in plugins.sbt:
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0") 
as shown below (use ^D to end the cat command):
~$ mkdir -p ~/.sbt/0.13/plugins # mkdir -p creates all necessary directories in the path in the given order
~$ cat >> ~/.sbt/0.13/plugins/plugins.sbt
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")
<ctrl>+D
~$
After installation, the next time we launch sbt we will be able to use the additional commandeclipse.
sbt-assembly plugin
sbt-assembly is an sbt plugin that creates a fat JAR of your project with all of its dependencies included. According to Spark documentation, if your code depends on other projects, you will need to package them alongside your application in order to distribute the code to a Spark cluster. This is why we need the sbt-assembly plugin. When creating assembly jars, list Spark and Hadoop asprovided dependencies; these need not be bundled, since they are provided by the cluster manager at runtime. Once you have an assembled jar, you can call the bin/spark-submit script as shown later below while passing your jar.
~$ mkdir -p ~/.sbt/0.13/plugins
~$ cat >> ~/.sbt/0.13/plugins/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
<ctrl>+D
~$
Installing Scala IDE for Eclipse
Downloading and installing the Scala IDE for Eclipse is also straightforward:
~$ wget http://downloads.typesafe.com/scalaide-pack/4.1.1-vfinal-luna-211-20150728/scala-SDK-4.1.1-vfinal-2.11-linux.gtk.x86_64.tar.gz
~$ gunzip scala-SDK-4.1.1-vfinal-2.11-linux.gtk.x86_64.tar.gz
~$ tar -xvf scala-SDK-4.1.1-vfinal-2.11-linux.gtk.x86_64.tar
~$ ~/eclipse/eclipse # this runs Eclipse IDE
As you can see from the figure below, a new menu item named Scala is added in the classic Eclipse menu bar:
post_eclipse_ide
Installing Spark 1.4.1 (this may take a while)
Instructions for downloading and building Spark are provided here. There are several options available; since Spark is packaged with a self-contained Maven installation to ease building and deployment of Spark from source (located under the build/ directory), we choose this option. Notice that we build Spark with the latest Scala 2.11 (included in the Eclipse Scala IDE we have just downloaded in the previous step):
~$ wget http://www.apache.org/dyn/closer.cgi/spark/spark-1.4.1/spark-1.4.1.tgz
~$ gzunip spark-1.4.1.tgz
~$ tar -xvf spark-1.4.1.tar
~$ cd spark-1.4.1/
~spark-1.4.1/$ build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Dscala-2.11 -DskipTests clean package
...
...
~spark-1.4.1/$ export PATH=$PATH:~/spark-1.4.1/bin # make all Spark binaries accessible
As with sbt above, we include the last command so as to make Spark binaries accessible from everywhere.
Having installed all the necessary components, we now proceed to demonstrate the creation of a simple application.

Creating a sample application (sbt package)

The task now is to create a self contained Scala/Spark application using sbt and the Eclipse IDE.
Creating sample sbt project
For this demonstration, we will create a very simple Spark application in Scala named SampleApp(creating a realistic application will be covered in a follow-up post). First we prepare the directory structure:
~$ mkdir SampleApp
~$ cd SampleApp
~/SampleApp$ mkdir -p src/main/scala # mandatory structure
In the directory~/SampleApp/src/main/scala we create the following Scala file SampleApp.scala (using just a text editor for now):
/* SampleApp.scala:
   This application simply counts the number of lines that contain "val" from itself
 */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
 
object SampleApp {
  def main(args: Array[String]) {
    val txtFile = "/home/osboxes/SampleApp/src/main/scala/SampleApp.scala"
    val conf = new SparkConf().setAppName("Sample Application")
    val sc = new SparkContext(conf)
    val txtFileLines = sc.textFile(txtFile , 2).cache()
    val numAs = txtFileLines .filter(line => line.contains("val")).count()
    println("Lines with val: %s".format(numAs))
  }
}
In the directory ~/SampleApp we create a configuration file sample.sbt containing the following:
name := "Sample Project"
 
version := "1.0"
 
scalaVersion := "2.11.7"
 
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"
The resulting directory structure should be as shown below:
osboxes@osboxes:~/SampleApp$ find .
.
./sample.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SampleApp.scala
Use sbt to package and launch the sample application
We are now ready to package everything into a single jar and deploy using spark-submit. Notice that the sbt tool creates a hidden directory in the home folder ~/.ivy2/ that contains all cached jars used for packaging the application.
~/SampleApp$ sbt package
...
[info] Loading global plugins from /home/osboxes/.sbt/0.13/plugins
[info] Set current project to Sample Project (in build file:/home/osboxes/SampleApp/)
...
...
[info] Compiling 1 Scala source to /home/osboxes/SampleApp/target/scala-2.11/classes...
[info] Packaging /home/osboxes/SampleApp/target/scala-2.11/sample-project_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 15 s, completed 30-Jul-2015 18:55:17
~/SampleApp$
Notice that the result of the packaging is the file sample-project_2.11-1.0.jar. This is deployed as follows:
~/SampleApp$ spark-submit --class "SampleApp" --master local[2] target/scala-2.11/sample-project_2.11-1.0.jar
...
...
Lines with val: 6
~/SampleApp$
We can easily verify that the number of lines in our simple script containing “val” are indeed six (fiveval assignments plus one occurrence in the println command argument).
Use sbt to create an Eclipse project
In order to create an Eclipse project for this sample application, we issue the following sbtcommand:
~/SampleApp$ sbt eclipse # this choice was installed with the sbteclipse plugin
[info] Loading global plugins from /home/osboxes/.sbt/0.13/plugins
[info] Set current project to Sample Project (in build file:/home/osboxes/SampleApp/)
[info] About to create Eclipse project files for your project(s).
[info] Successfully created Eclipse project files for project(s):
[info] Sample Project
~/SampleApp$
Now the Eclipse project is created inside the ~/SampleApp directory. We use Eclipse to import an existing project:
post_eclipse_import
Select Browse to seach for the ~/SampleApp directory.
post_eclipse_import_select
Do not check the option Copy projects into workspace
post_eclipse_import_finish
The result is the complete project tree in the Package Explorer of Eclipse. All Spark and Hadoop related dependencies have been automatically imported from sbt. Now you can editSampleApp.scala directly from Eclipse using code completion features, syntactic highlighters and more.
post_eclipse_import_done
Run the sample application from Eclipse
Source code editing using Eclipse can be real fun! Code completion, refactoring, smart indenter, code formatting, syntax highlighting – you name it, Eclipse provides it! But what about running the application? We can do that too, with a little configuration and a minor addition in the Scala source code.
From the Eclipse menu bar select Run -> Run Configurations. On the left panel right click on Scala Application and select New. This opens the Create, manage, and run configurations window:
post_eclipse_run_config
Enter the name of the class we want to deploy – in this case it is SampleApp. Then press Apply and the run configuration is ready to go. The last step is to modify the source code to reflect the Spark runtime configuration. In this example it suffices to set the master URL for launching to "local[2]". This will run the application locally in a standalone mode.
val conf = new SparkConf().setAppName("Sample Application").setMaster("local[2]")
Now we are ready to launch the application from Eclipse by selecting Run->Sample Application:
post_eclipse_run_example
From this point onwards, we can use the Eclipse IDE to further develop our application and run some test instances during the process. When we are confident with our code, we can switch to sbtpackaging/deployment and run our application in systems containing a Spark 1.4.1 installation. The developing cycle can be as follows:
  1. Use Eclipse to modify the project and test it
  2. Use the sbt package to create the final jar
  3. Deploy using spark-submit
  4. Go to step 1, if necessary, and refine further