6.1. Kafka : Run
Saturday, February 26, 2022
Friday, February 25, 2022
3.3. Spark (as a processing engine) - 2> Notebook (With Dataframe) in DataBricks
3.3. Spark (as a processing engine) - 2> Notebook (With Dataframe) in DataBricks
With Databricks
- Create the Cluster.
Select the runtime as 6.6 and give a cluster name. Finally , click on Create Cluster.
confirm by clicking "clusters" page
- upload
the file. upload file "people_with_header.csv" , it will be uploaded to
the path "/FileStore/tables/people_with_header.csv" ( please make note
of the path).Click on "Data" , and then "Add Data" . Drop the file
"people_with_header.csv" in "Drop files to upload"
- Import sparkSQL Demo.html . Click on "workspace" and import from there.
4. Spark (as a processing engine) - Spark SQL (Dataframe)
4. Spark (as a processing engine) - Spark SQL (Dataframe)
Reading From CSV Files
Automatic Schema Inference
- Start spark shell
spark-shell --master local[1]2. Observe people_with_header.csv from Spark\Spark SQL\Labs\CSV. Upload this file to hdfs. Please check VM guide for more details
(base) [hands-on@localhost demos]$ hdfs dfs -put people_with_header.csv
(base) [hands-on@localhost demos]$ hdfs dfs -ls
-rw-r--r-- 1 hands-on hadoop people_with_header.csv3. Please execute the below commands one by one in the spark shell created in #1
val empcrm = spark.read.
format("csv"). // by default ,parquet. File will be loaded by assuming the parquet format and might fail if the file is not parquet
//format("com.databricks.spark.csv").
option("header", "true"). // otherwise, header will be considered as a record, it will add new header with names as _C1 etc ..
option("inferSchema", "true"). // If true, Spark will determine the column data types. Else , the "age" will be considered as string
load("people_with_header.csv")4. Describe the schema
scala> empcrm.show
+----------+---------+------+---+
|first_name|last_name|gender|age|
+----------+---------+------+---+
| Erin| Shannon| F| 42|
| Norman| Lockwood| M| 81|
| Miguel| Ruiz| M| 64|
| Rosalita| Ramirez| F| 14|
| Ally| Garcia| F| 39|
| Claire| McBride| F| 23|
| Abigail| Cottrell| F| 75|
| Jos| Rivera| M| 59|
| Ravi| Dasgupta| M| 25|
| Ravi| Shannon| F| 42|
| Shannon| McBride| F| 23|
| Abigail| McBride| F| 23|
+----------+---------+------+---+
scala> empcrm.printSchema
root
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- gender: string (nullable = true)
|-- age: integer (nullable = true)5. Retrieve the columns
scala> empcrm.select($"first_name", $"age").show(5)
+----------+---+
|first_name|age|
+----------+---+
| Erin| 42|
| Norman| 81|
| Miguel| 64|
| Rosalita| 14|
| Ally| 39|
+----------+---+
only showing top 5 rows
// aliases
scala> empcrm.select($"first_name",
| $"age",
| ($"age" > 60),
| ($"age" > 60).alias("older"),
| ($"age" + 10).alias("inc10")).show(5)
+----------+---+----------+-----+-----+
|first_name|age|(age > 60)|older|inc10|
+----------+---+----------+-----+-----+
| Erin| 42| false|false| 52|
| Norman| 81| true| true| 91|
| Miguel| 64| true| true| 74|
| Rosalita| 14| false|false| 24|
| Ally| 39| false|false| 49|
+----------+---+----------+-----+-----+
only showing top 5 rows6. Register temp table so that we can access using SQL queries.
// register temp table
scala> empcrm.createOrReplaceTempView("EMPCRM")
scala> spark.sql("SELECT first_name, age, age> 60 as older FROM EMPCRM ").show(false)
+----------+---+-----+
|first_name|age|older|
+----------+---+-----+
|Erin |42 |false|
|Norman |81 |true |
|Miguel |64 |true |
|Rosalita |14 |false|
|Ally |39 |false|
|Claire |23 |false|
|Abigail |75 |true |
|Jos |59 |false|
|Ravi |25 |false|
|Ravi |42 |false|
|Shannon |23 |false|
|Abigail |23 |false|
+----------+---+-----+
7. Filter the rows
//filter
//These are different syntactic sugar do the same thing.
scala> empcrm.filter($"age" > 60).select("first_name","age").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman |81 |
|Miguel |64 |
|Abigail |75 |
+----------+---+
//or
scala> empcrm.filter($"age" > 60).select($"first_name",$"age").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman |81 |
|Miguel |64 |
|Abigail |75 |
+----------+---+8. Rather filter by using SQL query
scala> spark.sql("SELECT first_name, age FROM EMPCRM WHERE age > 60").show(false)
+----------+---+
|first_name|age|
+----------+---+
|Norman |81 |
|Miguel |64 |
|Abigail |75 |
+----------+---+9. Order by age ( asc)
scala> :paste
// Entering paste mode (ctrl-D to finish)
empcrm.filter(empcrm("age") > 49)
.select(empcrm("first_name"), empcrm("age"))
//.orderBy(empcrm("age"), empcrm("first_name"))//ascending order
.orderBy(empcrm("age"))
.show(false)
// Exiting paste mode, now interpreting.
+----------+---+
|first_name|age|
+----------+---+
|Jos |59 |
|Miguel |64 |
|Abigail |75 |
|Norman |81 |
+----------+---+
scala> :paste
// Entering paste mode (ctrl-D to finish)
empcrm.filter(empcrm("age") > 49)
.select(empcrm("first_name"), empcrm("age"))
//.orderBy(empcrm("age"), empcrm("first_name"))//descending order
.orderBy(empcrm("age").desc)
.show(false)
// Exiting paste mode, now interpreting.
+----------+---+
|first_name|age|
+----------+---+
|Norman |81 |
|Abigail |75 |
|Miguel |64 |
|Jos |59 |
+----------+---+10. groupby age
scala> empcrm.groupBy($"age").count().show // creates a structure with (age,count)
+---+-----+
|age|count|
+---+-----+
| 81| 1|
| 64| 1|
| 59| 1|
| 39| 1|
| 23| 3|
| 25| 1|
| 75| 1|
| 14| 1|
| 42| 2|
+---+-----+11. Group by using SQL query
scala> spark.sql("SELECT age, count(age) FROM EMPCRM GROUP BY age").show
+---+----------+
|age|count(age)|
+---+----------+
| 81| 1|
| 64| 1|
| 59| 1|
| 39| 1|
| 23| 3|
| 25| 1|
| 75| 1|
| 14| 1|
| 42| 2|
+---+----------+12. Write the results to hdfs in parquet format
//finally store the results in parquet ( hadoop popular storage format)
scala> empcrm.write.parquet("empcrm")
//execute the following command in a different shell
(base) [hands-on@localhost demos]$ hdfs dfs -ls empcrm
-rw-r--r-- 1 hands-on hadoop 07:05 empcrm/_SUCCESS
-rw-r--r-- 1 hands-on hadoop empcrm/part-00000-227ffb4a-242b-4e42-9c29-cdf43830484d-c000.snappy.parquet13. Read from parquet files
scala> val parquetempcrm = spark.read.parquet("empcrm")
parquetempcrm: org.apache.spark.sql.DataFrame = [first_name: string, last_name: string ... 2 more fields]
scala> parquetempcrm.show
+----------+---------+------+---+
|first_name|last_name|gender|age|
+----------+---------+------+---+
| Erin| Shannon| F| 42|
| Norman| Lockwood| M| 81|
| Miguel| Ruiz| M| 64|
| Rosalita| Ramirez| F| 14|
| Ally| Garcia| F| 39|
| Claire| McBride| F| 23|
| Abigail| Cottrell| F| 75|
| Jos| Rivera| M| 59|
| Ravi| Dasgupta| M| 25|
| Ravi| Shannon| F| 42|
| Shannon| McBride| F| 23|
| Abigail| McBride| F| 23|
+----------+---------+------+---+Schema Inference programmatically
- Start spark shell as above.
spark-shell --master local[1]
- Define the schema . Enter the command with (:paste) mode or execute the command highlighted in bold.
scala> :paste // Entering paste mode (ctrl-D to finish) // schema inference programmtically val schema = "first_name STRING, last_name STRING, gender STRING , age INT" // please be careful in giving the schema in the same order as we expect it to be val empcrm = spark.read .format("csv") // by default ,parquet. File will be loaded by assuming the parquet format and might fail if the file is not parquet .option("header", "true") // otherwise, header will be considered as a record, it will add new header with names as _C1 etc .. //option("inferSchema", "true"). // If true, Spark will determine the column data types. Else , the "age" will be considered as string .schema(schema) .load("people_with_header.csv") // Exiting paste mode, now interpreting. schema: String = first_name STRING, last_name STRING, gender STRING , age INT empcrm: org.apache.spark.sql.DataFrame = [first_name: string, last_name: string ... 2 more fields]
- Repeat the above commands as above from step #4.
scala> empcrm.show +----------+---------+------+---+ |first_name|last_name|gender|age| +----------+---------+------+---+ | Erin| Shannon| F| 42| | Norman| Lockwood| M| 81| | Miguel| Ruiz| M| 64| | Rosalita| Ramirez| F| 14| | Ally| Garcia| F| 39| | Claire| McBride| F| 23| | Abigail| Cottrell| F| 75| | Jos| Rivera| M| 59| | Ravi| Dasgupta| M| 25| | Ravi| Shannon| F| 42| | Shannon| McBride| F| 23| | Abigail| McBride| F| 23| +----------+---------+------+---+
Reading From Text Files
Automatic Schema Inference
- start spark shell
spark-shell --master local[*]
- observe
customers.txtfromSpark SQL\Labs\TextFile. This file is already available in the hdfs.(base) [hands-on@localhost demos]$ hdfs dfs -put customers.txt (base) [hands-on@localhost demos]$ hdfs dfs -ls Found 4 items drwxr-xr-x - hands-on hadoop .sparkStaging -rw-r--r-- 1 hands-on hadoop customers.txt -rw-r--r-- 1 hands-on hadoop hamlet.txt -rw-r--r-- 1 hands-on hadoop u.data
- Create the SQLContext first from the existing Spark Context
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- Run the following commands
// Import statement to implicitly convert an RDD to a DataFrame ( as the text file didnot have a structure) import sqlContext.implicits._ // Create a custom class to represent the Customer case class Customer(customer_id: Int, name: String,city: String, state: String, zip_code: String) // Create a DataFrame of Customer objects from the data set text file. val dfCustomers = sc.textFile("customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt,p(1), p(2), p(3), p(4))).toDF() dfCustomers.show // Register DataFrame as a table. dfCustomers.registerTempTable("customers") // Display the content of DataFrame dfCustomers.show() // Print the DF schema dfCustomers.printSchema() // Select customer name column dfCustomers.select("name").show() // Select customer name and city columns dfCustomers.select("name", "city").show() // Select a customer by id dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show() // Count the customers by zip code dfCustomers.groupBy("zip_code").count().show() scala> import sqlContext.implicits._ import sqlContext.implicits._ scala> case class Customer(customer_id: Int, name: String,city: String, state: String, zip_code: String) defined class Customer scala> val dfCustomers = sc.textFile("customers.txt").map(_.split(",")).map(p => Customer(p(0).trim.toInt,p(1), p(2), p(3), p(4))).toDF() dfCustomers: org.apache.spark.sql.DataFrame = [customer_id: int, name: string ... 3 more fields] scala> dfCustomers.show +-----------+---------------+------------+-----+--------+ |customer_id| name| city|state|zip_code| +-----------+---------------+------------+-----+--------+ | 100| John Smith| Austin| TX| 78727| | 200| Joe Johnson| Dallas| TX| 75201| | 300| Bob Jones| Houston| TX| 77028| | 400| Andy Davis| San Antonio| TX| 78227| | 500| James Williams| Austin| TX| 78727| +-----------+---------------+------------+-----+--------+ scala> dfCustomers.registerTempTable("customers") warning: there was one deprecation warning; re-run with -deprecation for details scala> dfCustomers.show() +-----------+---------------+------------+-----+--------+ |customer_id| name| city|state|zip_code| +-----------+---------------+------------+-----+--------+ | 100| John Smith| Austin| TX| 78727| | 200| Joe Johnson| Dallas| TX| 75201| | 300| Bob Jones| Houston| TX| 77028| | 400| Andy Davis| San Antonio| TX| 78227| | 500| James Williams| Austin| TX| 78727| +-----------+---------------+------------+-----+--------+ scala> dfCustomers.printSchema() root |-- customer_id: integer (nullable = false) |-- name: string (nullable = true) |-- city: string (nullable = true) |-- state: string (nullable = true) |-- zip_code: string (nullable = true) scala> dfCustomers.select("name").show() +---------------+ | name| +---------------+ | John Smith| | Joe Johnson| | Bob Jones| | Andy Davis| | James Williams| +---------------+ scala> dfCustomers.select("name", "city").show() +---------------+------------+ | name| city| +---------------+------------+ | John Smith| Austin| | Joe Johnson| Dallas| | Bob Jones| Houston| | Andy Davis| San Antonio| | James Williams| Austin| +---------------+------------+ scala> dfCustomers.filter(dfCustomers("customer_id").equalTo(500)).show() +-----------+---------------+-------+-----+--------+ |customer_id| name| city|state|zip_code| +-----------+---------------+-------+-----+--------+ | 500| James Williams| Austin| TX| 78727| +-----------+---------------+-------+-----+--------+ scala> dfCustomers.groupBy("zip_code").count().show() +--------+-----+ |zip_code|count| +--------+-----+ | 75201| 1| | 78227| 1| | 78727| 2| | 77028| 1| +--------+-----+ scala>
Reading from file with the schema ( manual - Programmatically Specifying the Schema)
In the above example, the schema is inferred using the reflection. We can
also programmatically specify the schema of the dataset. This is useful
when the custom classes cannot be defined ahead of time because the
structure of data is encoded in a string.
Following code example shows how to specify the schema using the new
data type classes StructType, StringType, and StructField.
- Repeat the steps #1, #2, #3 if not already.
- Execute the commands
// // Programmatically Specifying the Schema // // Create SQLContext from the existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create an RDD val rddCustomers = sc.textFile("customers.txt") // The schema is encoded in a string val schemaString = "customer_id name city state zip_code" // Import Spark SQL data types and Row. import org.apache.spark.sql._ import org.apache.spark.sql.types._ // Generate the schema based on the string of schema val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD (rddCustomers) to Rows. val rowRDD = rddCustomers.map(_.split(",")).map(p =>Row(p(0).trim,p(1),p(2),p(3),p(4))) // Apply the schema to the RDD. val dfCustomers = sqlContext.createDataFrame(rowRDD,schema) // Register the DataFrames as a table. dfCustomers.registerTempTable("customers") // SQL statements can be run by using the sql methods against the "customers" table provided by sqlContext. val custNames = sqlContext.sql("SELECT name FROM customers") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. custNames.map(t => "Name: " + t(0)).collect().foreach(println) // SQL statements can be run by using the sql methods provided by sqlContext. val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println) scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) warning: there was one deprecation warning; re-run with -deprecation for details sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@41d1bea3 scala> val rddCustomers = sc.textFile("customers.txt") rddCustomers: org.apache.spark.rdd.RDD[String] = customers.txt MapPartitionsRDD[39] at textFile at <console>:30 scala> rddCustomers.collect res19: Array[String] = Array(100, John Smith, Austin, TX, 78727, 200, Joe Johnson, Dallas, TX, 75201, 300, Bob Jones, Houston, TX, 77028, 400, Andy Davis, San Antonio, TX, 78227, 500, James Williams, Austin, TX, 78727) scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) warning: there was one deprecation warning; re-run with -deprecation for details sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@57484e17 scala> val rddCustomers = sc.textFile("customers.txt") rddCustomers: org.apache.spark.rdd.RDD[String] = customers.txt MapPartitionsRDD[41] at textFile at <console>:30 scala> val schemaString = "customer_id name city state zip_code" schemaString: String = customer_id name city state zip_code scala> import org.apache.spark.sql._ import org.apache.spark.sql._ scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala> val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) schema: org.apache.spark.sql.types.StructType = StructType(StructField(customer_id,StringType,true), StructField(name,StringType,true), StructField(city,StringType,true), StructField(state,StringType,true), StructField(zip_code,StringType,true)) scala> val rowRDD = rddCustomers.map(_.split(",")).map(p =>Row(p(0).trim,p(1),p(2),p(3),p(4))) rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[43] at map at <console>:37 scala> val dfCustomers = sqlContext.createDataFrame(rowRDD,schema) dfCustomers: org.apache.spark.sql.DataFrame = [customer_id: string, name: string ... 3 more fields] scala> dfCustomers.registerTempTable("customers") warning: there was one deprecation warning; re-run with -deprecation for details scala> val custNames = sqlContext.sql("SELECT name FROM customers") custNames: org.apache.spark.sql.DataFrame = [name: string] scala> custNames.map(t => "Name: " + t(0)).collect().foreach(println) Name: John Smith Name: Joe Johnson Name: Bob Jones Name: Andy Davis Name: James Williams scala> val customersByCity = sqlContext.sql("SELECT name,zip_code FROM customers ORDER BY zip_code") customersByCity: org.apache.spark.sql.DataFrame = [name: string, zip_code: string] scala> customersByCity.map(t => t(0) + "," + t(1)).collect().foreach(println) Joe Johnson, 75201 Bob Jones, 77028 Andy Davis, 78227 John Smith, 78727 James Williams, 78727
3.2. Spark (as a processing engine) - 2> Spark-submit (With RDD) And as a ScalaProject.JAR
3.2. Spark (as a processing engine) - 2> Spark-submit (With RDD) with ScalaProject.JAR
spark-submit -> need JAR to be executed, so we need to create project and create JAR out it and supply to spark-submit command.
package com.sparkcore
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
object wordCountDemo {
def main(args: Array[String]) {
// creating spark context
val spark = SparkSession.builder().config(new SparkConf().setAppName("Demo WordCount"))
.getOrCreate()
val sc = spark.sparkContext
// define the processing logic - transformations, joins etc..
val file = sc.textFile(args(0))
val tokenized = file.flatMap( x => x.split(" ")) // file.flatMap(_.split(" "))
val tokenizedMap = tokenized.map(x => (x,1)) // tokenized.map(_,1)
val reduced = tokenizedMap.reduceByKey( (x,y) => (x+y)) // //tokenizedMap.reduceByKey(_+_)
// save the processed data to hdfs , console
reduced.saveAsTextFile("wordCountSparkSubmit")
}
}
/** Find the movies with the most ratings. */
object PopularMovies {
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
//Logger.getLogger("org").setLevel(Level.ERROR)
//@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
System.out.println("building spark context")
val spark = SparkSession
.builder
//.master("local[*]")
.appName("Popular Movies")
//.config("spark.sql.warehouse.dir", ".")
.getOrCreate()
// Create a SparkContext using every core of the local machine
//val sc = new SparkContext("local[*]", "PopularMovies")
val sc = spark.sparkContext
// Read in each rating line
val lines = sc.textFile("moviesSimple.data")
// Map to (movieID, 1) tuples
val movies = lines.map(x => (x.split("\t")(1).toInt, 1))
// Count up all the 1's for each movie
val movieCounts = movies.reduceByKey( (x, y) => x + y )
// Flip (movieID, count) to (count, movieID)
val flipped = movieCounts.map( x => (x._2, x._1) )
// Sort
val sortedMovies = flipped.sortByKey()
// Collect and print results
val results = sortedMovies.collect()
results.foreach(println)
}
}Tuesday, February 22, 2022
3.1. Spark (as a processing engine) - 1> Spark-shell (With RDD)
3.1. Spark (as a processing engine) - 1> Spark-shell (With RDD)
val inputFileRDD = sc.textFile("wordcountSimple.txt")
inputFileRDD.collect
val tokenizedRDD = inputFileRDD.flatMap(x => x.split(" ")) //x is each element or row
//OR inputFileRDD.flatMap(_.split(" "))
tokenizedRDD.collect
val tokenizedMap = tokenizedRDD.map(x => (x,1) ) //get tuple to get (abc,1) (def,1) etc
//OR tokenizedRDD.map((_, 1))
tokenizedMap.collect
val reducedRDD = tokenizedMap.reduceByKey( (x,y) => (x+y) ) //group all the values belongs to same key
//OR tokenizedMap.reduceByKey(_ + _)
//press tab to see exact method names (tokenizedMap.reduc
reducedRDD.collect
reducedRDD.saveAsTextFile("sparkWordCount") //sparkWordCount is folder, under that it stores as part-<< >> files
----------------
hdfs dfs -put u.data
From hands-on
--------------------
spark-shell --master local[*] (1st enter to -> scala> prompt)
val lines = sc.textFile("u.data")
val movies = lines.map(x => (x.split("\t")(1).toInt, 1)) //to take 2nd column and return as (column2, 1) for all elements
movies.take(5)
val movieCounts = movies.reduceByKey( (x, y) => x + y )
movieCounts.collect
Monday, February 21, 2022
1. Hadoop (HDFS + MapReduce) : HDFS (as a storage)
1. Hadoop (HDFS + MapReduce) : HDFS (as a storage)
For the following componenets, please download based on the platform/OS.
Oracle Virtual Box https://www.virtualbox.org/wiki/Downloads
SPARK OVM file(6GB) - https://drive.google.com/file/d/1p4C4j1qxq1tUMIdq-5xlvYSFoUX-qwar/view?usp=sharing
JDK 1.8 https://www.oracle.com/in/java/technologies/javase/javase-jdk8-downloads.html
SCP client such as winscp https://winscp.net/eng/download.php ( windows )
SSH client such mobaxterm https://mobaxterm.mobatek.net/download.html (windows )
Intellij IDea Community https://www.jetbrains.com/idea/download/
Eclipse https://www.eclipse.org/downloads/
6.1. Kafka : Run
6.1. Kafka : Run
-
1. Hadoop (HDFS + MapReduce) : HDFS (as a storage) For the following componenets, please download based on the platform/OS. Oracle Virtual ...
-
3.3. Spark (as a processing engine) - 2> Notebook (With Dataframe) in DataBricks With Databricks Create the Cluster. Select the runti...
-
3.2. Spark (as a processing engine) - 2> Spark-submit (With RDD) with ScalaProject.JAR spark-submit -> need JAR to be executed, so ...