In this article we will see how to perform SQL & Hive operations to Analyze data in pyspark.
As we know, Pyspark is the Python API for Spark. Spark SQL is a framework which runs on spark. Spark SQL is a Spark module for structured data processing and the use of Spark SQL is to execute SQL queries either SQL or HiveQL.
SQLContext allows us to connect the different data sources to write and read the data. Spark SQL also reads and writes data which is stored in hive.
Hive is used for Handling Big-Data. We can store multiple tables records into a single table using hiveQL. In this blog we also see how multiple table records are stored into a single table..
SQLContext is the entry point of all relational functionality in spark.
Code Snippet :
from pyspark.sql import SQLContext,SparkContext,HiveContext
sc = SparkSession.builder.appName(“SQl_Hive”).getOrCreate()
sqlContext = SQLContext(sc)
Now let's see how to load the data and read data using SQLContext. Here it is as shown below In the First Line we are reading the data using the sqlcontext. In the Second line we Register the data frame as a Table. Now we can read the data with the help of SQL queries.
Code Snippet :
# Create the DataFrame
df_employee = sqlcontext.read.csv("WA_Fn-UseC_-HR-Employee-Attrition.csv",inferSchema=True,header=True)
df_employee.registerTempTable("df_employee")
Using the following command we can read data from the table.
Code snippet :
#Read all the data from dataset
sqlcontext.sql("SELECT * FROM df_new").show()
As we can see using SQLContext we can read the data and perform various SQL operations to extract the information from the table. We can execute any SQL queries to grab the information into the table like Join queries, aggregate functions, Sub Queries etc.
Output :
Let's check whose employee Monthly income is highest among all other employees from each department by performing the SQL queries on the table.
Code Snippet :
sqlContext.sql("SELECT EmployeeNumber,Department,MonthlyIncome FROM df_new WHERE MonthlyIncome IN (SELECT MAX(MonthlyIncome) FROM df_new GROUP BY Department);").show()
As shown below we can see the output is those employees whose monthly income is highest in their department. The employee number, employee department which he belongs to and his Monthly income.
Output :
We can also create a hive table in pyspark using the following command named employee with the fields employee_number, Department, Job_role, Age, Gender, Monthly_income, Over_time, Marital_status. For creating the table we have to import SparkContext and HiveContext.
Code Snippet :
from pyspark import SparkContext,HiveContext
sc = SparkContext(“appName = “Hive_operations”)
hive_context = HiveContext(sc)
Using the following command We are creating the table
Code snippet :
hive_context.sql("CREATE TABLE employee1 (Employee_Number INT, Department STRING, Job_Roll STRING, Age INT,\
Gender STRING ,Monthly_Income INT, Over_time STRING, Marital_Status STRING) row format delimited \
fields terminated BY ',' tblproperties('skip.header.line.count'='0') stored AS textfile; ")
hive_context.sql("select * from employee1").show()
We can see Empty table is created
Output :
Now we are loading the data into the table from the .csv file using a hive query and the file is present in our local system.
Code Snippet :
# load the data in hive table
hive_context.sql("LOAD DATA LOCAL INPATH 'employee1.csv' INTO TABLE employee1")
Output :
Now we will see how to merge two tables into a single table. Suppose we have two tables with the same structure we want to all record in a single table using a hive query it is possible. We have two hive table employee1 and employee2 using the following command we merge two tables. We can also merge multiple tables into a single table.
Code snippet :
# create a new table and concatenate data from employee1 and employee2 table
hive_context.sql("create table employee as (select * from employee1 union all select * from employee2)")
Here we can see all records from the employee1 and employee2 tables are merged into a single table.
Output :
Conclusion :
In this article we are seen How to perform the some basic operation using SQLContext and HiveContext using Pyspark.
Thank you
Comments