Friday, February 25, 2022

5. Spark internal execution :

 5. Spark internal execution :






3.3. Spark (as a processing engine) - 2> Notebook (With Dataframe) in DataBricks

 3.3. Spark (as a processing engine) - 2> Notebook (With Dataframe) in DataBricks

 

With Databricks

  1. Create the Cluster.

    Select the runtime as 6.6 and give a cluster name. Finally , click on Create Cluster.

    confirm by clicking "clusters" page

  1. upload the file. upload file "people_with_header.csv" , it will be uploaded to the path "/FileStore/tables/people_with_header.csv" ( please make note of the path).Click on "Data" , and then "Add Data" . Drop the file "people_with_header.csv" in "Drop files to upload"

  1. Import sparkSQL Demo.html . Click on "workspace" and import from there.

 

 


4. Spark (as a processing engine) - Spark SQL (Dataframe)

4. Spark (as a processing engine) - Spark SQL (Dataframe)


Reading From CSV Files

Automatic Schema Inference

  1. Start spark shell
spark-shell --master local[1]

2. Observe people_with_header.csv from Spark\Spark SQL\Labs\CSV. Upload this file to hdfs. Please check VM guide for more details

(base) [hands-on@localhost demos]$ hdfs dfs -put people_with_header.csv
(base) [hands-on@localhost demos]$ hdfs dfs -ls
-rw-r--r--   1 hands-on hadoop       people_with_header.csv

3. Please execute the below commands one by one in the spark shell created in #1

val empcrm = spark.read.
                     format("csv"). // by default ,parquet. File will be loaded by assuming the parquet format and might fail if the file is not parquet
                          //format("com.databricks.spark.csv").
                          option("header", "true"). // otherwise, header will be considered as a record, it will add new header with names as _C1 etc ..
                          option("inferSchema", "true"). // If true, Spark will determine the column data types. Else , the "age" will be considered as string
                          load("people_with_header.csv")

4. Describe the schema

scala> empcrm.show
+----------+---------+------+---+
|first_name|last_name|gender|age|
+----------+---------+------+---+
|      Erin|  Shannon|     F| 42|
|    Norman| Lockwood|     M| 81|
|    Miguel|     Ruiz|     M| 64|
|  Rosalita|  Ramirez|     F| 14|
|      Ally|   Garcia|     F| 39|
|    Claire|  McBride|     F| 23|
|   Abigail| Cottrell|     F| 75|
|       Jos|   Rivera|     M| 59|
|      Ravi| Dasgupta|     M| 25|
|      Ravi|  Shannon|     F| 42|
|   Shannon|  McBride|     F| 23|
|   Abigail|  McBride|     F| 23|
+----------+---------+------+---+


scala> empcrm.printSchema
root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)

5. Retrieve the columns

scala> empcrm.select($"first_name", $"age").show(5)
+----------+---+
|first_name|age|
+----------+---+
|      Erin| 42|
|    Norman| 81|
|    Miguel| 64|
|  Rosalita| 14|
|      Ally| 39|
+----------+---+
only showing top 5 rows

// aliases

scala> empcrm.select($"first_name",
     | $"age",
     | ($"age" > 60),
     |   ($"age" > 60).alias("older"),
     | ($"age" + 10).alias("inc10")).show(5)
+----------+---+----------+-----+-----+
|first_name|age|(age > 60)|older|inc10|
+----------+---+----------+-----+-----+
|      Erin| 42|     false|false|   52|
|    Norman| 81|      true| true|   91|
|    Miguel| 64|      true| true|   74|
|  Rosalita| 14|     false|false|   24|
|      Ally| 39|     false|false|   49|
+----------+---+----------+-----+-----+
only showing top 5 rows

6. Register temp table so that we can access using SQL queries.

// register temp table
scala> empcrm.createOrReplaceTempView("EMPCRM")

scala> spark.sql("SELECT first_name, age, age> 60 as older FROM EMPCRM ").show(false)
+----------+---+-----+
|first_name|age|older|
+----------+---+-----+
|Erin      |42 |false|
|Norman    |81 |true |
|Miguel    |64 |true |
|Rosalita  |14 |false|
|Ally      |39 |false|
|Claire    |23 |false|
|Abigail   |75 |true |
|Jos       |59 |false|
|Ravi      |25 |false|
|Ravi      |42 |false|
|Shannon   |23 |false|
|Abigail   |23 |false|
+----------+---+-----+


7. Filter the rows

//filter
//These are different syntactic sugar do the same thing.
scala> empcrm.filter($"age" > 60).select("first_name","age").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman    |81 |
|Miguel    |64 |
|Abigail   |75 |
+----------+---+

//or

scala> empcrm.filter($"age" > 60).select($"first_name",$"age").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman    |81 |
|Miguel    |64 |
|Abigail   |75 |
+----------+---+

8. Rather filter by using SQL query

scala> spark.sql("SELECT first_name, age FROM EMPCRM WHERE age > 60").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman    |81 |
|Miguel    |64 |
|Abigail   |75 |
+----------+---+

9. Order by age ( asc)

scala> :paste
// Entering paste mode (ctrl-D to finish)

empcrm.filter(empcrm("age") > 49)
  .select(empcrm("first_name"), empcrm("age"))
  //.orderBy(empcrm("age"), empcrm("first_name"))//ascending order
  .orderBy(empcrm("age"))
  .show(false)

// Exiting paste mode, now interpreting.

+----------+---+
|first_name|age|
+----------+---+
|Jos       |59 |
|Miguel    |64 |
|Abigail   |75 |
|Norman    |81 |
+----------+---+

scala> :paste
// Entering paste mode (ctrl-D to finish)

empcrm.filter(empcrm("age") > 49)
  .select(empcrm("first_name"), empcrm("age"))
  //.orderBy(empcrm("age"), empcrm("first_name"))//descending order
  .orderBy(empcrm("age").desc)
  .show(false)

// Exiting paste mode, now interpreting.

+----------+---+
|first_name|age|
+----------+---+
|Norman    |81 |
|Abigail   |75 |
|Miguel    |64 |
|Jos       |59 |
+----------+---+

10. groupby age

scala> empcrm.groupBy($"age").count().show // creates a structure with (age,count)
+---+-----+
|age|count|
+---+-----+
| 81|    1|
| 64|    1|
| 59|    1|
| 39|    1|
| 23|    3|
| 25|    1|
| 75|    1|
| 14|    1|
| 42|    2|
+---+-----+

11. Group by using SQL query

scala> spark.sql("SELECT age, count(age) FROM EMPCRM  GROUP BY age").show
+---+----------+
|age|count(age)|
+---+----------+
| 81|         1|
| 64|         1|
| 59|         1|
| 39|         1|
| 23|         3|
| 25|         1|
| 75|         1|
| 14|         1|
| 42|         2|
+---+----------+

12. Write the results to hdfs in parquet format

//finally store the results in parquet ( hadoop popular storage format)
scala> empcrm.write.parquet("empcrm")

//execute the following command in  a different shell
(base) [hands-on@localhost demos]$ hdfs dfs -ls empcrm

-rw-r--r--   1 hands-on hadoop         07:05 empcrm/_SUCCESS
-rw-r--r--   1 hands-on hadoop        empcrm/part-00000-227ffb4a-242b-4e42-9c29-cdf43830484d-c000.snappy.parquet

13. Read from parquet files

scala> val parquetempcrm = spark.read.parquet("empcrm")
parquetempcrm: org.apache.spark.sql.DataFrame = [first_name: string, last_name: string ... 2 more fields]

scala> parquetempcrm.show
+----------+---------+------+---+
|first_name|last_name|gender|age|
+----------+---------+------+---+
|      Erin|  Shannon|     F| 42|
|    Norman| Lockwood|     M| 81|
|    Miguel|     Ruiz|     M| 64|
|  Rosalita|  Ramirez|     F| 14|
|      Ally|   Garcia|     F| 39|
|    Claire|  McBride|     F| 23|
|   Abigail| Cottrell|     F| 75|
|       Jos|   Rivera|     M| 59|
|      Ravi| Dasgupta|     M| 25|
|      Ravi|  Shannon|     F| 42|
|   Shannon|  McBride|     F| 23|
|   Abigail|  McBride|     F| 23|
+----------+---------+------+---+

Schema Inference programmatically

  1. Start spark shell as above.
    spark-shell --master local[1]
  1. Define the schema . Enter the command with (:paste) mode or execute the command highlighted in bold.
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    // schema inference programmtically
    val schema = "first_name STRING, last_name STRING, gender STRING , age INT" // please be careful in giving the schema in the same order as we expect it to be
    val empcrm = spark.read
                         .format("csv") // by default ,parquet. File will be loaded by 
    assuming the parquet format and might fail if the file is not parquet
                                .option("header", "true") // otherwise, header will be considered as a record, it will add new header with names as _C1 etc ..
                              //option("inferSchema", "true"). // If true, Spark will determine the column data types. Else , the "age" will be considered as string
                              .schema(schema)
                              .load("people_with_header.csv")
    
    // Exiting paste mode, now interpreting.
    
    schema: String = first_name STRING, last_name STRING, gender STRING , age INT
    empcrm: org.apache.spark.sql.DataFrame = [first_name: string, last_name: string ... 2 more fields]
  1. Repeat the above commands as above from step #4.
    scala> empcrm.show
    +----------+---------+------+---+
    |first_name|last_name|gender|age|
    +----------+---------+------+---+
    |      Erin|  Shannon|     F| 42|
    |    Norman| Lockwood|     M| 81|
    |    Miguel|     Ruiz|     M| 64|
    |  Rosalita|  Ramirez|     F| 14|
    |      Ally|   Garcia|     F| 39|
    |    Claire|  McBride|     F| 23|
    |   Abigail| Cottrell|     F| 75|
    |       Jos|   Rivera|     M| 59|
    |      Ravi| Dasgupta|     M| 25|
    |      Ravi|  Shannon|     F| 42|
    |   Shannon|  McBride|     F| 23|
    |   Abigail|  McBride|     F| 23|
    +----------+---------+------+---+

Reading From Text Files

Automatic Schema Inference

  1. start spark shell
    spark-shell --master local[*]
  1. observe customers.txt from Spark SQL\Labs\TextFile. This file is already available in the hdfs.
    (base) [hands-on@localhost demos]$ hdfs dfs -put customers.txt
    
    (base) [hands-on@localhost demos]$ hdfs dfs -ls
    
    Found 4 items
    drwxr-xr-x   - hands-on hadoop         .sparkStaging
    -rw-r--r--   1 hands-on hadoop      customers.txt
    -rw-r--r--   1 hands-on hadoop      hamlet.txt
    -rw-r--r--   1 hands-on hadoop     u.data
  1. Create the SQLContext first from the existing Spark Context
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  1. Run the following commands
    // Import statement to implicitly convert an RDD to a DataFrame ( as the text file didnot have a structure)
    import sqlContext.implicits._
    // Create a custom class to represent the Customer
    case class Customer(customer_id: Int, name: String,city: String, state: String, zip_code: String)
    // Create a DataFrame of Customer objects from the data set text file.
    val dfCustomers = sc.textFile("customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt,p(1), p(2), p(3), p(4))).toDF()
    dfCustomers.show
    // Register DataFrame as a table.
    dfCustomers.registerTempTable("customers")
    // Display the content of DataFrame
    dfCustomers.show()
    // Print the DF schema
    dfCustomers.printSchema()
    // Select customer name column
    dfCustomers.select("name").show()
    // Select customer name and city columns
    dfCustomers.select("name", "city").show()
    // Select a customer by id
    dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()
    // Count the customers by zip code
    dfCustomers.groupBy("zip_code").count().show()
    
    
    scala> import sqlContext.implicits._
    import sqlContext.implicits._
    
    scala> case class Customer(customer_id: Int, name: String,city: String, state: String, zip_code: String)
    defined class Customer
    
    scala> val dfCustomers = sc.textFile("customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt,p(1), p(2), p(3), p(4))).toDF()
    dfCustomers: org.apache.spark.sql.DataFrame = [customer_id: int, name: string ... 3 more fields]
    
    scala> dfCustomers.show
    +-----------+---------------+------------+-----+--------+
    |customer_id|           name|        city|state|zip_code|
    +-----------+---------------+------------+-----+--------+
    |        100|     John Smith|      Austin|   TX|   78727|
    |        200|    Joe Johnson|      Dallas|   TX|   75201|
    |        300|      Bob Jones|     Houston|   TX|   77028|
    |        400|     Andy Davis| San Antonio|   TX|   78227|
    |        500| James Williams|      Austin|   TX|   78727|
    +-----------+---------------+------------+-----+--------+
    
    scala> dfCustomers.registerTempTable("customers")
    warning: there was one deprecation warning; re-run with -deprecation for details
    
    scala> dfCustomers.show()
    +-----------+---------------+------------+-----+--------+
    |customer_id|           name|        city|state|zip_code|
    +-----------+---------------+------------+-----+--------+
    |        100|     John Smith|      Austin|   TX|   78727|
    |        200|    Joe Johnson|      Dallas|   TX|   75201|
    |        300|      Bob Jones|     Houston|   TX|   77028|
    |        400|     Andy Davis| San Antonio|   TX|   78227|
    |        500| James Williams|      Austin|   TX|   78727|
    +-----------+---------------+------------+-----+--------+
    
    
    scala> dfCustomers.printSchema()
    root
     |-- customer_id: integer (nullable = false)
     |-- name: string (nullable = true)
     |-- city: string (nullable = true)
     |-- state: string (nullable = true)
     |-- zip_code: string (nullable = true)
    
    
    scala> dfCustomers.select("name").show()
    +---------------+
    |           name|
    +---------------+
    |     John Smith|
    |    Joe Johnson|
    |      Bob Jones|
    |     Andy Davis|
    | James Williams|
    +---------------+
    
    
    scala> dfCustomers.select("name", "city").show()
    +---------------+------------+
    |           name|        city|
    +---------------+------------+
    |     John Smith|      Austin|
    |    Joe Johnson|      Dallas|
    |      Bob Jones|     Houston|
    |     Andy Davis| San Antonio|
    | James Williams|      Austin|
    +---------------+------------+
    
    
    scala> dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show()
    +-----------+---------------+-------+-----+--------+
    |customer_id|           name|   city|state|zip_code|
    +-----------+---------------+-------+-----+--------+
    |        500| James Williams| Austin|   TX|   78727|
    +-----------+---------------+-------+-----+--------+
    
    
    scala> dfCustomers.groupBy("zip_code").count().show()
    +--------+-----+
    |zip_code|count|
    +--------+-----+
    |   75201|    1|
    |   78227|    1|
    |   78727|    2|
    |   77028|    1|
    +--------+-----+
    
    
    scala>

Reading from file with the schema ( manual - Programmatically Specifying the Schema)

In the above example, the schema is inferred using the reflection. We can

also programmatically specify the schema of the dataset. This is useful

when the custom classes cannot be defined ahead of time because the

structure of data is encoded in a string.

Following code example shows how to specify the schema using the new

data type classes StructType, StringType, and StructField.

  1. Repeat the steps #1, #2, #3 if not already.
  1. Execute the commands
    //
    // Programmatically Specifying the Schema
    //
    // Create SQLContext from the existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // Create an RDD
    val rddCustomers = sc.textFile("customers.txt")
    // The schema is encoded in a string
    val schemaString = "customer_id name city state zip_code"
    // Import Spark SQL data types and Row.
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    // Generate the schema based on the string of schema
    val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    // Convert records of the RDD (rddCustomers) to Rows.
    val rowRDD = rddCustomers.map(_.split(",")).map(p =>Row(p(0).trim,p(1),p(2),p(3),p(4)))
    // Apply the schema to the RDD.
    val dfCustomers = sqlContext.createDataFrame(rowRDD,schema)
    // Register the DataFrames as a table.
    dfCustomers.registerTempTable("customers")
    // SQL statements can be run by using the sql methods against the "customers" table provided by sqlContext.
    val custNames = sqlContext.sql("SELECT name FROM customers")
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    custNames.map(t => "Name: " + t(0)).collect().foreach(println)
    // SQL statements can be run by using the sql methods provided by sqlContext.
    val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")
    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)
    
    
    scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    warning: there was one deprecation warning; re-run with -deprecation for details
    sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@41d1bea3
    
    scala> val rddCustomers = sc.textFile("customers.txt")
    rddCustomers: org.apache.spark.rdd.RDD[String] = customers.txt MapPartitionsRDD[39] at textFile at <console>:30
    
    
    
    scala> rddCustomers.collect
    res19: Array[String] = Array(100, John Smith, Austin, TX, 78727, 200, Joe Johnson, Dallas, TX, 75201, 300, Bob Jones, Houston, TX, 77028, 400, Andy Davis, San Antonio, TX, 78227, 500, James Williams, Austin, TX, 78727)
    
    scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    warning: there was one deprecation warning; re-run with -deprecation for details
    sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@57484e17
    
    scala> val rddCustomers = sc.textFile("customers.txt")
    rddCustomers: org.apache.spark.rdd.RDD[String] = customers.txt MapPartitionsRDD[41] at textFile at <console>:30
    
    scala> val schemaString = "customer_id name city state zip_code"
    schemaString: String = customer_id name city state zip_code
    
    scala> import org.apache.spark.sql._
    import org.apache.spark.sql._
    
    scala> import org.apache.spark.sql.types._
    import org.apache.spark.sql.types._
    
    scala> val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    schema: org.apache.spark.sql.types.StructType = StructType(StructField(customer_id,StringType,true), StructField(name,StringType,true), StructField(city,StringType,true), StructField(state,StringType,true), StructField(zip_code,StringType,true))
    
    scala> val rowRDD = rddCustomers.map(_.split(",")).map(p =>Row(p(0).trim,p(1),p(2),p(3),p(4)))
    rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[43] at map at <console>:37
    
    scala> val dfCustomers = sqlContext.createDataFrame(rowRDD,schema)
    dfCustomers: org.apache.spark.sql.DataFrame = [customer_id: string, name: string ... 3 more fields]
    
    scala> dfCustomers.registerTempTable("customers")
    warning: there was one deprecation warning; re-run with -deprecation for details
    
    scala> val custNames = sqlContext.sql("SELECT name FROM customers")
    custNames: org.apache.spark.sql.DataFrame = [name: string]
    
    scala> custNames.map(t => "Name: " + t(0)).collect().foreach(println)
    Name:  John Smith
    Name:  Joe Johnson
    Name:  Bob Jones
    Name:  Andy Davis
    Name:  James Williams
    
    scala> val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code")
    customersByCity: org.apache.spark.sql.DataFrame = [name: string, zip_code: string]
    
    scala> customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println)
     Joe Johnson, 75201
     Bob Jones, 77028
     Andy Davis, 78227
     John Smith, 78727
     James Williams, 78727

3.2. Spark (as a processing engine) - 2> Spark-submit (With RDD) And as a ScalaProject.JAR

 3.2. Spark (as a processing engine) - 2> Spark-submit (With RDD) with ScalaProject.JAR

spark-submit -> need JAR to be executed, so we need to create project and create JAR out it and supply to spark-submit command.




package com.sparkcore

import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf

object wordCountDemo {

def main(args: Array[String]) {

// creating spark context
val spark = SparkSession.builder().config(new SparkConf().setAppName("Demo WordCount"))
.getOrCreate()
val sc = spark.sparkContext


// define the processing logic - transformations, joins etc..
val file = sc.textFile(args(0))
val tokenized = file.flatMap( x => x.split(" ")) // file.flatMap(_.split(" "))
val tokenizedMap = tokenized.map(x => (x,1)) // tokenized.map(_,1)
val reduced = tokenizedMap.reduceByKey( (x,y) => (x+y)) // //tokenizedMap.reduceByKey(_+_)



// save the processed data to hdfs , console
reduced.saveAsTextFile("wordCountSparkSubmit")

}

}








/** Find the movies with the most ratings. */
object PopularMovies {

/** Our main function where the action happens */
def main(args: Array[String]) {

// Set the log level to only print errors
//Logger.getLogger("org").setLevel(Level.ERROR)
//@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
System.out.println("building spark context")
val spark = SparkSession
.builder
//.master("local[*]")
.appName("Popular Movies")
//.config("spark.sql.warehouse.dir", ".")
.getOrCreate()
// Create a SparkContext using every core of the local machine
//val sc = new SparkContext("local[*]", "PopularMovies")
val sc = spark.sparkContext

// Read in each rating line
val lines = sc.textFile("moviesSimple.data")

// Map to (movieID, 1) tuples
val movies = lines.map(x => (x.split("\t")(1).toInt, 1))

// Count up all the 1's for each movie
val movieCounts = movies.reduceByKey( (x, y) => x + y )

// Flip (movieID, count) to (count, movieID)
val flipped = movieCounts.map( x => (x._2, x._1) )

// Sort
val sortedMovies = flipped.sortByKey()

// Collect and print results
val results = sortedMovies.collect()

results.foreach(println)
}

}












Tuesday, February 22, 2022

3.1. Spark (as a processing engine) - 1> Spark-shell (With RDD)

3.1. Spark (as a processing engine) - 1> Spark-shell (With RDD)

 



 

 




val inputFileRDD = sc.textFile("wordcountSimple.txt")
inputFileRDD.collect

val tokenizedRDD = inputFileRDD.flatMap(x => x.split(" "))   //x is each element or row
//OR               inputFileRDD.flatMap(_.split(" "))


tokenizedRDD.collect

val tokenizedMap = tokenizedRDD.map(x => (x,1) )   //get tuple to get (abc,1) (def,1) etc
//OR               tokenizedRDD.map((_, 1))


tokenizedMap.collect

val reducedRDD = tokenizedMap.reduceByKey( (x,y) => (x+y) )   //group all the values belongs to same key
//OR             tokenizedMap.reduceByKey(_ + _)
//press tab to see exact method names (tokenizedMap.reduc



reducedRDD.collect

reducedRDD.saveAsTextFile("sparkWordCount")  //sparkWordCount is folder, under that it stores as part-<< >> files



From hands-on
----------------

hdfs dfs -put u.data

From hands-on
--------------------

spark-shell --master local[*]  (1st enter to ->  scala> prompt)
val lines = sc.textFile("u.data")
val movies = lines.map(x => (x.split("\t")(1).toInt, 1))    //to take 2nd column and return as (column2, 1) for all elements
movies.take(5)
val movieCounts = movies.reduceByKey( (x, y) => x + y )
movieCounts.collect

 

6.1. Kafka : Run

  6.1. Kafka : Run