Sunday, 15 April 2018

Spark-Transformation-1

Narrow transformations are the result of map, filter and such that is from the data from a single partition

An output RDD has partitions with records that originate from a single partition in the parent RDD. Only a limited subset of partitions used to calculate the result.



Transformations with (usually) Narrow dependencies:

  • map
  • mapValues
  • flatMap
  • filter
  • mapPartitions
  • mapPartitionsWithIndex

Wide Transformations: In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent RDD. The partition may live in many partitions of parent RDD. Wide transformations are the result of groupbyKey() and reducebyKey().

Each partition of the parent RDD may be used by multiple child partitions


Transformations with (usually) Wide dependencies: (might cause a shuffle)

  • cogroup
  • groupWith
  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • combineByKey
  • distinct
  • intersection
  • repartition
  • coalesce


1.map()


The map function iterates over every line in RDD and split into new RDD. Using map() transformation we take in any function, and that function is applied to every element/line of RDD.




from pyspark import SparkContext,SparkConf
FileA=sc.textFile("/FileStore/tables/file1.txt")
FileB=sc.textFile("/FileStore/tables/file2.txt")
FileC=sc.textFile("/FileStore/tables/file3.txt")
FileAB=FileA.union(FileB)
FileABC=FileAB.union(FileC)
FileABCflatten=FileABC.map(lambda x:x.split(" "))
FileABCflatten.collect()


Here, each word within line gets separated by spaces and considered into a tuple, as shown by square brackets.



2.flatMap()


flatMap() function, to each input element, we have many elements in an output RDD. The most simple use of flatMap() is to split each input string into words.



from pyspark import SparkContext,SparkConf
FileA=sc.textFile("/FileStore/tables/file1.txt")
FileB=sc.textFile("/FileStore/tables/file2.txt")
FileC=sc.textFile("/FileStore/tables/file3.txt")
FileAB=FileA.union(FileB)
FileABC=FileAB.union(FileC)
FileABCflatten=FileABC.flatMap(lambda x:x.split(" "))
FileABCflatten.collect()

Here all words are separated by spaces and put into one list.


What is the difference between map and flatMap?


3.MapPartition


Return a new RDD by applying a function to each partition of this RDD.



Why partition?


Parallelism is the key feature of any distributed system where operations are done by dividing the data into multiple parallel partitions. The same operation is performed on the partitions simultaneously which helps achieve fast data processing with a spark. Map and Reduce operations can be effectively applied in parallel in apache spark by dividing the data into multiple partitions. A copy of each partition within an RDD is distributed across several workers running on different nodes of a cluster so that in case of failure of a single worker the RDD still remains available.


mapPartitions() can be used as an alternative to map() & foreach(). 


mapPartitions() is called once for each Partition unlike map() & foreach() which is called for each element in the RDD. The main advantage being that, we can do initialization on Per-Partition basis instead of per-element basis(as done by map() & foreach())



from pyspark import SparkContext,SparkConf
def func1(num):  yield sum(num)

list1=[1,2,5,3,7,9,2,2]
Rdd1=sc.parallelize(list1,3)
List2=(Rdd1.mapPartitions(func1))
List2.collect()


4.mapPartitionsWithIndex

Return a new RDD by applying a function to each partition of this RDD,
while tracking the index of the original partition

5.Filter


Return a new RDD containing only the elements that satisfy a predicate.
from pyspark import SparkContext,SparkConf 
#Filter common words in another file 
FileA=sc.textFile("/FileStore/tables/file2.txt")#separate all words by spacesFile1=FileA.flatMap(lambda x:x.split(" ")).flatMap(lambda x:x.split(".")).flatMap(lambda x:x.split(","))#filter common wordsFile2=File1.filter(lambda x:x=='Kafka')File2.collect()#filter words apart from KafkaFile3=File1.filter(lambda x:x!='Kafka')File3.collect()




6.Union()


The union is basically used to merge two RDDs together if they have the same structure.



from pyspark import SparkContext,SparkConf
List1=range(1,20,10)
List2=range(20,40,10)
Rdd1=sc.parallelize(List1)Rdd2=sc.parallelize(List2)
Rdd1.union(Rdd2).collect()





7.intersection()


Intersection gives you the common terms or objects from the two RDDS.



from pyspark import SparkContext,SparkConf
List1=[1,2,4,6,8,90,3,2]
List2=[4,5,6,7,9,2,1,2,90]
Rdd1=sc.parallelize(List1)Rdd2=sc.parallelize(List2)#gives common from both RDD

Rdd1.intersection(Rdd2).collect()




8.distinct()


This transformation is used to get rid of any ambiguities. As the name suggests it picks out the lines from the RDD that are unique.




from pyspark import SparkContext, SparkConf
List1=[1,2,4,6,8,90,3,2]
List2=[4,5,6,7,9,2,1,2,90]
Rdd1=sc.parallelize(List1)Rdd2=sc.parallelize(List2)#gives all from both RDDRdd2=Rdd1.union(Rdd2)#gives distinct from RDDRdd2.distinct().collect()





9.join()


This transformation joins two RDDs based on a common key.

from pyspark import SparkContext,SparkConf
List1=[(1,"ABC"),(2,"AB"),(3,"DEF")]
List2=[(1,"ABCD"),(4,"AB"),(3,"DEF")]
Rdd1=sc.parallelize(List1)Rdd2=sc.parallelize(List2)#gives all from both RDD for common keysRdd3=Rdd1.join(Rdd2)Rdd3.collect()



leftOuterJoin:This gives all from left Rdd and common from right Rdd



rightOuterJoin: This gives all from right Rdd and common from Left Rdd









SPARK Actions


Actions Actions refer to an operation which also applies to RDD, that instructs Spark to perform computation and send the result back to the driver.

1.Collect()

The action collect() is the common and simplest operation that returns our entire RDDs content to driver program.


from pyspark import SparkContext,SparkConf
FileA=sc.textFile("/FileStore/tables/file1.txt")
FileA.collect()




2.count()

Action count() returns the number of elements in RDD.

from pyspark import SparkContext,SparkConf 

FileA=sc.textFile("/FileStore/tables/file1.txt")
#find our occurence of 'Data' word in file
File2=FileA.flatMap(lambda x:x.split(" ")).filter(lambda x:x=='Data')
File2.count()




3.take()

The action take(n) returns n number of elements from RDD. It tries to cut the number of partition it accesses, so it represents a biased collection.


from pyspark import SparkContext,SparkConf
FileA=sc.textFile("/FileStore/tables/file1.txt")
#find our occurence of 'Data' word in file
File2=FileA.flatMap(lambda x:x.split(" "))
File2.take(5)




4.first()

returns the first element from the list.

from pyspark import SparkContext,SparkConf
FileA=sc.textFile("/FileStore/tables/file1.txt")
#find our occurence of 'Data' word in file
File2=FileA.flatMap(lambda x:x.split(" "))
File2.first()



5.reduce()

A reduce action is used for aggregating all the elements of RDD by applying pairwise user function.


from pyspark import SparkContext,SparkConf
#find out sum of all numbers in RDD
List1=sc.parallelize(range(1,50),3)
Sum1=List1.reduce(lambda x,y :x+y)
print('Sum is :' ,Sum1)




6.countByKey()

Return a map of keys and counts of their occurrences in the RDD.


from pyspark import SparkContext,SparkConf
#Count common words in the file
FileA=sc.textFile("/FileStore/tables/file1.txt")
#separate all words by spaces
File1=FileA.flatMap(lambda x:x.split(" ")).flatMap(lambda x:x.split(".")).flatMap(lambda x:x.split(","))
#create key with value 1 for wach word
File2=File1.map(lambda x:(x,1))
#use countByKey to find out commom words with number of occurence
File3=File2.countByKey()
print(File3)

7.saveAsTextFile(path)

Save the RDD to the filesystem indicated in the path.This works on RDD , not on list.
Ex. countByKey action gives result into list and saveAsTextFile can not work on list
So, in such case, we have to use reduceByKey which gives result in RDD and can be saved in text format.

from pyspark import SparkContext,SparkConf
#Count common words in file
FileA=sc.textFile("/FileStore/tables/file1.txt")
#separate all words by spaces
File1=FileA.flatMap(lambda x:x.split(" ")).flatMap(lambda x:x.split(".")).flatMap(lambda x:x.split(","))
#create key with value 1 for wach word
File2=File1.map(lambda x:(x,1))
#use countByKey to find out commom words with number of occurence
#print(File2.countByKey())
File3=File2.reduceByKey(lambda a,b:a+b)
#File3.collect()
#save result in text file
File3.saveAsTextFile("/FileStore/tables/countByKey2.txt")
print("File has been saved")
When we try to read file , it gives result as follows



8.takeSample
Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

from pyspark import SparkContext,SparkConf
List1=range(1,20)
Rdd1=sc.parallelize(List1)print(Rdd1.takeSample(False,10,2))



9.takeOrdered
Return the first n elements of the RDD using either their natural order or a custom comparator.

10.foreach

foreach(println)word fine in scala.But with pyspark, it does not work.
We have to use actions like collect,first,take or print as follows



Saturday, 14 April 2018

What is difference between Map and flatMap function

Map and flatMap are similar in the way that they take a line from input RDD and apply a function on that line. The key difference between map() and flatMap() is map() returns only one element, while flatMap() can return a list of elements.


1.map()

The map function iterates over every line in RDD and split into new RDD. Using map() transformation we take in any function, and that function is applied to every element/line of RDD.




from pyspark import SparkContext,SparkConf
FileA=sc.textFile("/FileStore/tables/file1.txt")
FileB=sc.textFile("/FileStore/tables/file2.txt")
FileC=sc.textFile("/FileStore/tables/file3.txt")
FileAB=FileA.union(FileB)
FileABC=FileAB.union(FileC)
FileABCflatten=FileABC.map(lambda x:x.split(" "))
FileABCflatten.collect()


Here, you can see, each line has been split by spaces and put into tuples.


2.flatMap()

flatMap() function, to each input element, we have many elements in an output RDD. The most simple use of flatMap() is to split each input string into words.


from pyspark import SparkContext,SparkConf
FileA=sc.textFile("/FileStore/tables/file1.txt")
FileB=sc.textFile("/FileStore/tables/file2.txt")
FileC=sc.textFile("/FileStore/tables/file3.txt")
FileAB=FileA.union(FileB)
FileABC=FileAB.union(FileC)
FileABCflatten=FileABC.flatMap(lambda x:x.split(" "))
FileABCflatten.collect()

Here, all lines are flattened and no tuples are considered.All words are split by spaces.

Wednesday, 11 April 2018

Spark Exercise 1



I have practiced few SPARK exercise which I got from itversity and many more sites.

Problem Scenario 1


  • Data set URL 201(I have used customed from this URL)
  • Choose language of your choice Python or Scala
  • Data is available in HDFS file system under /public/crime/csv
  • You can check properties of files using hadoop fs -ls -h /public/crime/csv
  • Structure of data (ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location)
  • File format - text file
  • Delimiter - “,”
  • Get monthly count of primary crime type, sorted by month in ascending and number of crimes per type in descending order
  • Store the result in HDFS path /user/<YOUR_USER_ID>/solutions/solution01/crimes_by_type_by_month
  • Output File Format: TEXT
  • Output Columns: Month in YYYYMM format, crime count, crime type
  • Output Delimiter: \t (tab delimited)
  • Output Compression: gzip



Core SPARK



from pyspark import SparkConf,SparkContext

def crimedate(record):
  Cdate=record.split(" ")[0]
  Ctime=record.split(" ")[1]
  
  CDt=Cdate.split("/")[0]
  CMonth=Cdate.split("/")[1]
  Cyear=Cdate.split("/")[2]
  extractYearMonth= int(Cyear + CMonth)
  return  extractYearMonth

#get data into RDD
crimeData = sc.textFile("/FileStore/tables/CRIME_REPORT-f91ce.txt")
crimeData.count()

#get year,month and crime type in RDD
CrimeTypeperMonth = crimeData.map(lambda crime:((crimedate(crime.split(",")[2]),crime.split(",")[5]),1))
#for crime in CrimeTypeperMonth.take(20):print(crime)

#Count by key(year,month,crime type)  
CrimeCountperMonthperType=CrimeTypeperMonth.reduceByKey(lambda total,revenue:total+revenue)  
#for i in CrimeCountperMonthperType.take(10):print(i)
  
CrimekeyChange=CrimeCountperMonthperType.map(lambda x:(((x[0][0]),-x[1]),x[0][1]))
#for i in CrimeDailyData.take(10):print(i)

sortedCrimeData=CrimekeyChange.sortByKey()
#for i in sortedCrimeData.take(10):print(i)

CrimeCountperMonthperType=sortedCrimeData.map(lambda crime: (str(crime[0][0])+"\t"+str(-crime[0][1])+"\t"+crime[1]))
for i in CrimeCountperMonthperType.take(10):print(i)

#write into files with 2 partition and Gzip compression
CrimeDataperMonth=("/FileStore/tables/")
CrimeCountperMonthperType.repartition(2).saveAsTextFile(path="CrimeDataperMonth",compressionCodecClass="org.apache.h#adoop.io.compress.GzipCodec")



SPARK SQL

from pyspark import SparkConf,SparkContext
from pyspark.sql import Row

def crimedate(record):
  Cdate=record.split(" ")[0]
  Ctime=record.split(" ")[1]
  
  CDt=Cdate.split("/")[0]
  CMonth=Cdate.split("/")[1]
  CYear=Cdate.split("/")[2]
  yearmonth= int(CYear + CMonth)
  return yearmonth


#get data into RDD
crimeData = sc.textFile("/FileStore/tables/CRIME_REPORT-f91ce.txt")
crimeData.count()

#get year,month and crime type in RDD
CrimeTypeperMonth = crimeData.map(lambda crime:(crimedate(crime.split(",")[2]),crime.split(",")[5],1))

#for crime in CrimeTypeperMonth.take(20):print(crime)

CrimeDataDF=sqlContext.createDataFrame(CrimeTypeperMonth).toDF("CrimeDate","CrimeType","Count")
CrimeDataDF.registerTempTable('crimeTB')

#result=sqlContext.sql("select * from crimeTB")
#result.show()

resultDF=sqlContext.sql("select crimeDate, count(1) crimeCntperMonthperType, CrimeType from crimeTB group by crimeDate, crimeType order by crimeDate, crimeCntperMonthperType desc")
resultDF.show()

CrimeperMonth1=("/FileStore/tables/")
resultDFtoRDD=resultDF.rdd.map(lambda row:str(row[0])+("\t")+str(row[1])+("\t")+str(row[2]))

resultDFtoRDD.repartition(2).saveAsTextFile(path="CrimeperMonth1",compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")



Sunday, 8 April 2018

Parquet file-Read and Write


How to read parquet file

from pyspark import SparkContext, SparkConf

#read parquet file
parquetFile1=sqlContext.read.parquet("StudentId5A.parquet") 

#register Temp Table from this file
parquetFile1.registerTempTable("Student1TB")

#apply sql query on tis table
result1=sqlContext.sql("select * from Student1TB")

result1.show()


How to write into parquet file

Stuctured dataframe with column names can not be saves as Text file , as text file can not handle column stucture.

So, save as parquet file as follows:

Result1=sqlContext.sql("select * from StudentTB where Id>5 ")
Result1.show()

#save as parquet file from sql result table

#Result1.write.save("DBFS:///FileStore/tables/StudentID5.txt")   OR
Result1.write.parquet("DBFS:///FileStore/tables/StudentId5A.parquet")


How to create RDD,dataframe and table from parallelize list

from pyspark import SparkContext, SparkConf

#list of values

Student=[(1,"jonny","2000","Physics"),
(2,"jack","4000","Bio"),
(3,"dinarys","3000","Chemistry"),
(4,"john","5000","Bio"),
(5,"theon","3000","English"),
(6,"bran","5000","Finance"),
(7,"aarya","2000","Physics"),
(8,"sansa","2000","Physics"),
(9,"Shital","1000","Bio")]

#create RDD -Resilient Distributed Dataset from local list which is unstructured values
StudentRDD=sc.parallelize(Student)

#crate dataframe with structure along with column names from RDD
StudentDF=sqlContext.createDataFrame(StudentRDD).toDF("Id","Name","Salary","Dept")

#Register Temp Table from dataframe
StudentDF.registerTempTable('StudentTB')

#apply sql queries on Temp Table
result=sqlContext.sql("select * from StudentTB")
result.show()


Wordcount sample exercise in Spark

ffrom pyspark import SparkContext, SparkConf

#Reads an input set of text documents.
File1RDD=sc.textFile("/FileStore/tables/string1.txt")

#separate each word by spaces
Word=File1RDD.flatMap(lambda x: x.split(" "))

#apply function for each word with count 1
Word1=Word.map(lambda y:(y,1))

#apply function reducebykey for common words and thr count
Word2=Word1.reduceByKey(lambda x,y:x+y)

#change places as , count and word
Word3=Word2.map(lambda x:(x[1],x[0]))

#sort by key
Word3.sortByKey(False).collect()

#save in text file
#Word2.saveAsTextFile("/FileStore/tables/wordcount1.txt")





Sorted wordcount




For Applicationn writing commands Refer:
http://www.geoinsyssoft.com/pyspark-wordcount-arogram/

SPARK-Windows Value Functions

#create dataframe from RDD and create Table from dataframe

from pyspark import SparkContext, SparkConf,HiveContext
%sql

Employee=[(1,"jonny","2000","sales"),
(2,"jack","4000","finance"),
(3,"dinarys","3000","acc"),
(4,"john","5000","hr"),
(5,"theon","3000","sales"),
(6,"bran","5000","finance"),
(7,"john","2000","acc"),
(9,"johny","2000","acc"),
(8,"sansa","2000","hr")]


EmployeeRDD=sc.parallelize(Employee)
EmployeeDF=sqlContext.createDataFrame(EmployeeRDD).toDF('Eid','Emp_Name','Emp_Salary','Dept')
EmployeeDF.registerTempTable('EmployeeTB')

result1=sqlContext.sql("select * from EmployeeTB limit 3")
result1.show()


There are total 4 value functions:
1) Lead
2) Lag
3) First Value
4) Last Value

1) Lead:
The LEAD() window function returns the value for the row after the current row in a partition. If no row exists, null is returned.


ResultLead=sqlContext.sql("select Eid,Emp_Name,Emp_Salary,Dept,Lead(Emp_salary,1,0) over (partition by Dept order by Emp_Salary) as LeadSal from EmployeeTB")
ResultLead.show()

2.lag()

The LAG() window function returns the value for the row before the current row in a partition. If no row exists, null is returned.

ResultLag=sqlContext.sql("select Eid,Emp_Name,Emp_Salary,Dept,Lag(Emp_salary,1,0) over (partition by Dept order by Emp_Salary) as LagSal from EmployeeTB")
ResultLag.show()

3.First_value()

The FIRST_VALUE window function returns the value of the specified expression with respect to the first row in the window frame.

ResultFirst=sqlContext.sql("select Eid,Emp_Name,Emp_Salary,Dept,first_value(Emp_salary) over (partition by Dept order by Emp_Salary asc) as FirstSal from EmployeeTB")
ResultFirst.show()


4.Last_value()

The LAST_VALUE window function returns the value of the specified expression with respect to the last row in the window frame.

ResultLast=sqlContext.sql("select Eid,Emp_Name,Emp_Salary,Dept,last_value(Emp_salary) over (partition by Dept ) as LastSal from EmployeeTB")
ResultLast.show()



Spark-Transformation-1

Narrow transformations are the result of map, filter and such that is from the data from a single partition An output RDD has partitions...