I have practiced few SPARK exercise which I got from itversity and many more sites.
Problem Scenario 1
- Data set URL 201(I have used customed from this URL)
- Choose language of your choice Python or Scala
- Data is available in HDFS file system under /public/crime/csv
- You can check properties of files using
hadoop fs -ls -h /public/crime/csv
- Structure of data (ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location)
- File format - text file
- Delimiter - “,”
- Get monthly count of primary crime type, sorted by month in ascending and number of crimes per type in descending order
- Store the result in HDFS path /user/<YOUR_USER_ID>/solutions/solution01/crimes_by_type_by_month
- Output File Format: TEXT
- Output Columns: Month in YYYYMM format, crime count, crime type
- Output Delimiter: \t (tab delimited)
- Output Compression: gzip
Core SPARK
from pyspark import SparkConf,SparkContext
def crimedate(record):
Cdate=record.split(" ")[0]
Ctime=record.split(" ")[1]
CDt=Cdate.split("/")[0]
CMonth=Cdate.split("/")[1]
Cyear=Cdate.split("/")[2]
extractYearMonth= int(Cyear + CMonth)
return extractYearMonth
#get data into RDD
crimeData = sc.textFile("/FileStore/tables/CRIME_REPORT-f91ce.txt")
crimeData.count()
#get year,month and crime type in RDD
CrimeTypeperMonth = crimeData.map(lambda crime:((crimedate(crime.split(",")[2]),crime.split(",")[5]),1))
#for crime in CrimeTypeperMonth.take(20):print(crime)
#Count by key(year,month,crime type)
CrimeCountperMonthperType=CrimeTypeperMonth.reduceByKey(lambda total,revenue:total+revenue)
#for i in CrimeCountperMonthperType.take(10):print(i)
CrimekeyChange=CrimeCountperMonthperType.map(lambda x:(((x[0][0]),-x[1]),x[0][1]))
#for i in CrimeDailyData.take(10):print(i)
sortedCrimeData=CrimekeyChange.sortByKey()
#for i in sortedCrimeData.take(10):print(i)
CrimeCountperMonthperType=sortedCrimeData.map(lambda crime: (str(crime[0][0])+"\t"+str(-crime[0][1])+"\t"+crime[1]))
for i in CrimeCountperMonthperType.take(10):print(i)
#write into files with 2 partition and Gzip compression
CrimeDataperMonth=("/FileStore/tables/")
CrimeCountperMonthperType.repartition(2).saveAsTextFile(path="CrimeDataperMonth",compressionCodecClass="org.apache.h#adoop.io.compress.GzipCodec")
SPARK SQL
from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
def crimedate(record):
Cdate=record.split(" ")[0]
Ctime=record.split(" ")[1]
CDt=Cdate.split("/")[0]
CMonth=Cdate.split("/")[1]
CYear=Cdate.split("/")[2]
yearmonth= int(CYear + CMonth)
return yearmonth
#get data into RDD
crimeData = sc.textFile("/FileStore/tables/CRIME_REPORT-f91ce.txt")
crimeData.count()
#get year,month and crime type in RDD
CrimeTypeperMonth = crimeData.map(lambda crime:(crimedate(crime.split(",")[2]),crime.split(",")[5],1))
#for crime in CrimeTypeperMonth.take(20):print(crime)
CrimeDataDF=sqlContext.createDataFrame(CrimeTypeperMonth).toDF("CrimeDate","CrimeType","Count")
CrimeDataDF.registerTempTable('crimeTB')
#result=sqlContext.sql("select * from crimeTB")
#result.show()
resultDF=sqlContext.sql("select crimeDate, count(1) crimeCntperMonthperType, CrimeType from crimeTB group by crimeDate, crimeType order by crimeDate, crimeCntperMonthperType desc")
resultDF.show()
CrimeperMonth1=("/FileStore/tables/")
resultDFtoRDD=resultDF.rdd.map(lambda row:str(row[0])+("\t")+str(row[1])+("\t")+str(row[2]))
resultDFtoRDD.repartition(2).saveAsTextFile(path="CrimeperMonth1",compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
No comments:
Post a Comment