//Sentiment analysis algorithm for Spark+Cassandra //Sentiment analysis with inverted index (clustered version): object SaInvSparkCluster { //tweet definition for Cassandra case class TweetType(tweet_id: Int,user_screen_name: String,user_language: String, user_friends_count: Int,user_statuses_count: Int, user_name: String, user_followers_count: Int,sender_location: String, date_time: String,message: String) //definition for sentiment values case class SentimentValues(pos: Double, neg: Double) //hashmap for words and corresponding indexes in sentimentMap var wordMap=HashMap[String,Set[Int]]() //index for sentiment values var sentimentMap=HashMap[Int,SentimentValues]() def main(args: Array[String]) { ... //create broadcast variables for inverted index var wordMapB=sc.broadcast(wordMap) var sentimentMapB=sc.broadcast(sentimentMap) ... val saInvTask=new SaInvTask() //process multiple commands at a time (received from Test client) val commands=text.flatMap(_.split("}")) commands.foreachRDD{x => val count=x.count() val arrayCommands=x.collect() //Cassandra connections are fetched from the SessionBuffer x.foreachPartition{partition => val session=SessionBuffer.getSession() partition.foreach(t => saInvTask.processCommand(t,session,wordMapB.value,sentimentMapB.value)) SessionBuffer.returnSession(session) }//foreachPartition() arrayCommands.foreach(c => reportEvent(c,measOutput)) }//foreachRDD() ssc.start() }//main class SaInvTask() extends java.io.Serializable { def processCommand(command: String,session: Session,wordMap:Map[String,Set[Int]],sentimentMap:HashMap[Int,SentimentValues]) { //parse tweet-id from command ... //execute work val tweetWords=get_tweet_content(tweetID,session) if(tweetWords.size > 0) perform_sa(tweetWords,tweetID,wordMap,sentimentMap) }//processCommand //Reads tweet content from Cassandra, and transforms it into a Set def get_tweet_content(tweet_id: Int,session: Session): Set[String]= { var str=""; val defaultRetStr="" var returnSet=Set[String]() var tweetCounter=tweet_id //get content of tweets from Cassandra val rs=session.execute("select * from tweets where tweet_id="+tweet_id+";") val resultList=rs.all(); val it=resultList.iterator(); if(it.hasNext){ //transform string to a Set ... }//if return returnSet } //Performs sentiment analysis def perform_sa(words: Set[String],tweet_id: Int,wordMap:Map[String,Set[Int]],sentimentMap:HashMap[Int,SentimentValues]) { var sumOfWords=0.0 //find sentiment score for each word from the memory map val it=words.iterator var value=0.0 while(it.hasNext){ value=find_word(it.next,wordMap,sentimentMap) sumOfWords=sumOfWords+value }//while }//perform_sa //Finds sentiment analysis score for a word from the inverted index def find_word(word: String,wordMap:Map[String,Set[Int]],sentimentMap:HashMap[Int,SentimentValues]): Double = { var doubleSum=0.0 var wordFound=0 var sentValues=SentimentValues(0,0) val defaultSet=Set[Int]() val defSValues=SentimentValues(0,0) //get indexes for word from wordMap val intSet=wordMap.getOrElse(word,defaultSet) val it=intSet.iterator //get sentiment values for word based on indexes while(it.hasNext){ sentValues=sentimentMap.getOrElse(it.next,defSValues) val value=sentValues.pos-sentValues.neg doubleSum+=value wordFound+=1 } if(wordFound==0) return 0.0 else{ val result=doubleSum/wordFound return result } }//find_word() }//SaInvTask }//SaInvSparkCluster // Example: Test client sends the following string over TCP socket to Spark. The call indicates the tweet identifier // to be considered for sentiment analysis operation: sentiment-analysis{tweet-id:5}