Friday, February 25, 2022

3.2. Spark (as a processing engine) - 2> Spark-submit (With RDD) And as a ScalaProject.JAR

 3.2. Spark (as a processing engine) - 2> Spark-submit (With RDD) with ScalaProject.JAR

spark-submit -> need JAR to be executed, so we need to create project and create JAR out it and supply to spark-submit command.




package com.sparkcore

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

object wordCountDemo {

def main(args: Array[String]) {

// creating spark context
val spark = SparkSession.builder().config(new SparkConf().setAppName("Demo WordCount"))
.getOrCreate()
val sc = spark.sparkContext


// define the processing logic - transformations, joins etc..
val file = sc.textFile(args(0))
val tokenized = file.flatMap( x => x.split(" ")) // file.flatMap(_.split(" "))
val tokenizedMap = tokenized.map(x => (x,1)) // tokenized.map(_,1)
val reduced = tokenizedMap.reduceByKey( (x,y) => (x+y)) // //tokenizedMap.reduceByKey(_+_)



// save the processed data to hdfs , console
reduced.saveAsTextFile("wordCountSparkSubmit")

}

}








/** Find the movies with the most ratings. */
object PopularMovies {

/** Our main function where the action happens */
def main(args: Array[String]) {

// Set the log level to only print errors
//Logger.getLogger("org").setLevel(Level.ERROR)
//@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
System.out.println("building spark context")
val spark = SparkSession
.builder
//.master("local[*]")
.appName("Popular Movies")
//.config("spark.sql.warehouse.dir", ".")
.getOrCreate()
// Create a SparkContext using every core of the local machine
//val sc = new SparkContext("local[*]", "PopularMovies")
val sc = spark.sparkContext

// Read in each rating line
val lines = sc.textFile("moviesSimple.data")

// Map to (movieID, 1) tuples
val movies = lines.map(x => (x.split("\t")(1).toInt, 1))

// Count up all the 1's for each movie
val movieCounts = movies.reduceByKey( (x, y) => x + y )

// Flip (movieID, count) to (count, movieID)
val flipped = movieCounts.map( x => (x._2, x._1) )

// Sort
val sortedMovies = flipped.sortByKey()

// Collect and print results
val results = sortedMovies.collect()

results.foreach(println)
}

}












No comments:

Post a Comment

6.1. Kafka : Run

  6.1. Kafka : Run