First steps with Spark NLP

On how to utilise one of the most popular Spark NLP libraries from JohnSnow Labs.

What is NLP?

NLP (Natural Language Processing) is all about analysing human language, in the form of unstructured text or audio recordings, to be able to understand the contextual nuances of the language enclosed in a piece of data. With NLP, we can convert information stored in books, blog posts, and audio files into valuable insights.

NLP, like all the other fields of machine learning, has been growing in popularity in recent years. Extracting information from unstructured textual or audio data is essential for applications like a chatbot, advanced text search, extracting facts about the data, categorisation, etc.

Common functions an NLP library provides are:

  • sentence detection — one of the basic blocks of any NLP library, which decides the beginning and end of the sentences,
  • tokenization — way of separating a piece of text into smaller units,
  • stemming — the process of reducing a word to its word stem that affixes to suffixes and prefixes,
  • lemmatization — similar to stemming but with word meaning taken into account, e.g. the word 'better' lemmatized to its root word 'good',
  • POS tagging, or Part Of Speech tagging — categorising words in a text in correspondence with a particular part of speech, depending on the definition of the word and its context,
  • NER, or Named Entity Recognition — locating and classifying named entities in unstructured text into pre-defined categories such as medical codes, monetary values, etc.,
  • dependency parsing — analysing dependencies between the words of a sentence to find out its grammatical structure,
  • text matching — estimating semantic similarity between two text pieces,
  • chunking — extracting phrases from unstructured text to identify constituents (noun groups, verbs), uses POS tags as input,
  • spell-checking — helps with finding possible spelling problems,
  • sentiment detection — analyses the data whether there is positive, negative, or neutral meaning.

Fortunately, there are more than a few options available if you are processing data on Spark. We will look into some of those below and will try to use one for sentiment analysis together with Spark streaming.

NLP In Spark

The Spark NLP library from John Snow Labs is apparently the most popular choice when talking about any NLP processing in Spark but you can use other, sometimes simpler, built-in algorithms to extract some information from your unstructured textual data. For simple NLP tasks as well as for our sentiment analysis problem, you can actually use a lot of existing Spark MLlib algorithms to train your model with some help of many available functions used normally for feature extraction, transformations, and selection, such as Tokenizer, StopWordsRemover, CountVectorizer to name just a few.

Popular algorithms to build the model with textual data are Naive Bayes, Logistic Regression, Decision Trees, Random Forest, and Gradient Boosting Trees.

You are not constrained by the libraries given to you by MLlib, and of course, you can use other, built especially for NLP problems, popular libraries like NLTK, TextBlob, Gensim, Stanford Core NLP, Fasttext, and possibly many more.

To train and test your model, you would need to gather a considerable amount of annotated data. To get started, though, you can have a look into one of Kaggle’s competitions called “Natural Language Processing with Disaster Tweets” where you would find the annotated data and solutions provided by other competitors to compare.

Spark NLP library

Spark NLP is one of the most popular choices when it comes to NLP on Spark, not without a reason. It is a library that solves multiple NLP problems (actually, it provides implementations for all the aforementioned NLP functions) and it is easy to feed into your machine learning pipeline. Spark NLP takes advantage of transfer learning to improve its accuracy and because of that, it works well even with small amounts of data.

On top of all that, Spark NLP, unlike most other NLP libraries, was built natively on Apache Spark and TensorFlow.

You can quickly get a feel on how to use the library by using one of many pre-trained models, which is exactly what we are going to do for our sentiment analysis.

At the time of writing, there are 4312 models available for download and use from the Spark NLP models website.

Imaginary NLP Problem

From a plethora of models available, I have picked the one I could use with a data stream at hand, which was, in my case, the Twitter stream.

I will set up a simple Spark application to connect to Twitter and process the stream of tweets. In the stream processing step, we will filter the tweets by language (as we can process only English text with our model) and predict if the tweet belongs to one of the following categories: joy, sadness, fear, and surprise.

Setup

To process the stream of tweets, we need to be able to fetch them in the first place. To do that, you need to set up an application on your Twitter developer account page. Copy over the API key and API secret as well as Access token and secret — you will need them for your Spark application to connect with Twitter and fetch the stream of tweets.

There are 2 Twitter libraries we are going to use, one is a very popular Twitter4j library, and the other is dstream-twitter for converting the data into Spark DStream.

There are multiple ways you can pass credentials to establish a connection, for simplicity, though, we store all the credentials into a text file and read that file later to set the system property when starting up our Spark application.

twitter.txt file contents:

consumerKey pp2-----------asdfORE
consumerSecret c1zUj----------------abadaKgeYn
accessToken 14655125-rvTp--------------------pp
accessTokenSecret A---------------------vxO

Setting up system properties:

def setupTwitter(): Unit = {
    import scala.io.Source

    val lines = Source.fromFile("data/twitter.txt")
    for (line <- lines.getLines) {
      val fields = line.split(" ")
      if (fields.length == 2) {
        System.setProperty("twitter4j.oauth." + fields(0), fields(1))
      }
    }
    lines.close()
  }

The rest of the setup is about changing the default logging level, setting up the Spark session and Spark context:

    Logger.getLogger("org").setLevel(Level.ERROR)
    setupTwitter()

    // Create a SparkContext using every core of the local machine
    //    val sc = new SparkContext("local[*]", "IsolationForest")
    val spark = SparkSession
      .builder
      .appName("Spark NLP Example")
      .master("local[*]")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
    ssc.checkpoint("data/checkpoint/")

Now, we can create our pipeline which we will use later to get estimations while processing the stream of tweets:

  val documentAssembler = new DocumentAssembler()
      .setInputCol("text")
      .setOutputCol("document")
    val use = UniversalSentenceEncoder.pretrained("tfhub_use")
      .setInputCols(Array("document"))
      .setOutputCol("sentence_embeddings")
    val document_classifier = ClassifierDLModel.pretrained("classifierdl_use_emotion", "en")
      .setInputCols(Array("sentence_embeddings"))
      .setOutputCol("class")
    val pipeline = new Pipeline().setStages(Array(documentAssembler, use, document_classifier))

As you can see, we are using classifierdl_use_emotion for our pre-trained classifier name. Please refer to classifierdl_use_emption_en for more details on the classifier itself.

Having our model ready to make estimations, we can move on to creating and preprocessing our stream of tweets:

    // Create a DStream from Twitter using our streaming context
    val tweets = TwitterUtils.createStream(ssc, None)

    // Now filter and extract the text of each status update into DStreams using map()
    val statuses = tweets.filter(_.getLang == "en").filter(s => s.getText.length > 10).map { status =>
      status.getText
    }

We filter the tweets by language and remove all the tweets with the status text shorter than 10 characters, just to get some meaningful results.

The heart of our application, as well as its last stage, is the actual processing of the stream elements:

    statuses.foreachRDD { rdd =>
      import spark.implicits._
      import org.apache.spark.sql.functions._
      val data = rdd.toDF("text")
      val result = pipeline.fit(data).transform(data)
      val prediction = result.select("text", "class")
      val exploded = prediction.withColumn("tmp", explode(col("class"))).select("text", "tmp.*")
      exploded.select("text","result").show(false)
    }

    ssc.start()
    ssc.awaitTermination()
    // Stop the session
    spark.stop()

We use the built-in function foreachRDD and pass a single tweet into the earlier created pipeline. Once the prediction is obtained, we explode the object into a different dataset so we extract only valuable information like the final result together with the text being classified.

Example results look like the following:

|this song is fire and a jam and good. its a bop. and it kicks and slaps                                                         
|joy     |

|RT @martinplaut: And guns to Khartoum to undermine the Sudanese government - apparently                                          
|fear    |

|RT @Tr3ybull: Hubby you weren’t meant to find out this way… Trey told me he never uses condoms I tried to stop it… I swear 🥺🤭  
|surprise|

|RT @miey_salena: stay single until you’re appreciated.                                                                          
|sadness |

Utilising some of the pre-built models from the Spark NLP library is a breeze and a pleasure. In a couple of minutes, you can build quite an accurate solution that can handle copious amounts of data processed in a streaming fashion on your Spark cluster.

More on Machine Learning:

Blog Comments powered by Disqus.
Find more articles like this in Blog section

E-Book: Data Times: Big Data and ML news

Find out more