Wednesday, 11 April 2018

Spark Exercise 1



I have practiced few SPARK exercise which I got from itversity and many more sites.

Problem Scenario 1


  • Data set URL 201(I have used customed from this URL)
  • Choose language of your choice Python or Scala
  • Data is available in HDFS file system under /public/crime/csv
  • You can check properties of files using hadoop fs -ls -h /public/crime/csv
  • Structure of data (ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location)
  • File format - text file
  • Delimiter - “,”
  • Get monthly count of primary crime type, sorted by month in ascending and number of crimes per type in descending order
  • Store the result in HDFS path /user/<YOUR_USER_ID>/solutions/solution01/crimes_by_type_by_month
  • Output File Format: TEXT
  • Output Columns: Month in YYYYMM format, crime count, crime type
  • Output Delimiter: \t (tab delimited)
  • Output Compression: gzip



Core SPARK



from pyspark import SparkConf,SparkContext

def crimedate(record):
  Cdate=record.split(" ")[0]
  Ctime=record.split(" ")[1]
  
  CDt=Cdate.split("/")[0]
  CMonth=Cdate.split("/")[1]
  Cyear=Cdate.split("/")[2]
  extractYearMonth= int(Cyear + CMonth)
  return  extractYearMonth

#get data into RDD
crimeData = sc.textFile("/FileStore/tables/CRIME_REPORT-f91ce.txt")
crimeData.count()

#get year,month and crime type in RDD
CrimeTypeperMonth = crimeData.map(lambda crime:((crimedate(crime.split(",")[2]),crime.split(",")[5]),1))
#for crime in CrimeTypeperMonth.take(20):print(crime)

#Count by key(year,month,crime type)  
CrimeCountperMonthperType=CrimeTypeperMonth.reduceByKey(lambda total,revenue:total+revenue)  
#for i in CrimeCountperMonthperType.take(10):print(i)
  
CrimekeyChange=CrimeCountperMonthperType.map(lambda x:(((x[0][0]),-x[1]),x[0][1]))
#for i in CrimeDailyData.take(10):print(i)

sortedCrimeData=CrimekeyChange.sortByKey()
#for i in sortedCrimeData.take(10):print(i)

CrimeCountperMonthperType=sortedCrimeData.map(lambda crime: (str(crime[0][0])+"\t"+str(-crime[0][1])+"\t"+crime[1]))
for i in CrimeCountperMonthperType.take(10):print(i)

#write into files with 2 partition and Gzip compression
CrimeDataperMonth=("/FileStore/tables/")
CrimeCountperMonthperType.repartition(2).saveAsTextFile(path="CrimeDataperMonth",compressionCodecClass="org.apache.h#adoop.io.compress.GzipCodec")



SPARK SQL

from pyspark import SparkConf,SparkContext
from pyspark.sql import Row

def crimedate(record):
  Cdate=record.split(" ")[0]
  Ctime=record.split(" ")[1]
  
  CDt=Cdate.split("/")[0]
  CMonth=Cdate.split("/")[1]
  CYear=Cdate.split("/")[2]
  yearmonth= int(CYear + CMonth)
  return yearmonth


#get data into RDD
crimeData = sc.textFile("/FileStore/tables/CRIME_REPORT-f91ce.txt")
crimeData.count()

#get year,month and crime type in RDD
CrimeTypeperMonth = crimeData.map(lambda crime:(crimedate(crime.split(",")[2]),crime.split(",")[5],1))

#for crime in CrimeTypeperMonth.take(20):print(crime)

CrimeDataDF=sqlContext.createDataFrame(CrimeTypeperMonth).toDF("CrimeDate","CrimeType","Count")
CrimeDataDF.registerTempTable('crimeTB')

#result=sqlContext.sql("select * from crimeTB")
#result.show()

resultDF=sqlContext.sql("select crimeDate, count(1) crimeCntperMonthperType, CrimeType from crimeTB group by crimeDate, crimeType order by crimeDate, crimeCntperMonthperType desc")
resultDF.show()

CrimeperMonth1=("/FileStore/tables/")
resultDFtoRDD=resultDF.rdd.map(lambda row:str(row[0])+("\t")+str(row[1])+("\t")+str(row[2]))

resultDFtoRDD.repartition(2).saveAsTextFile(path="CrimeperMonth1",compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")



No comments:

Post a Comment

Spark-Transformation-1

Narrow transformations are the result of map, filter and such that is from the data from a single partition An output RDD has partitions...