From 07a39f281e34ed23a2259453d1eb7b985af79e80 Mon Sep 17 00:00:00 2001 From: aasthakittu <57712011+aasthakittu@users.noreply.github.com> Date: Sun, 28 Jun 2020 21:25:31 +0200 Subject: [PATCH] Create Kumari_Aastha_pyspark --- Kumari_Aastha_pyspark | 70 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 Kumari_Aastha_pyspark diff --git a/Kumari_Aastha_pyspark b/Kumari_Aastha_pyspark new file mode 100644 index 00000000..cf733af7 --- /dev/null +++ b/Kumari_Aastha_pyspark @@ -0,0 +1,70 @@ +'''Assuming data loading part as a one time process and scheduling the job to run on daily basis''' + +from pyspark.sql import SparkSession +import pyspark.sql.types as T +from pyspark.context import SparkContext +from pyspark.sql.session import SparkSession +from pyspark.sql import SQLContext +spark = SparkSession.builder.getOrCreate() +from pyspark.sql import functions as F +from pyspark.sql.functions import * +from pyspark.sql.functions import current_date +from pyspark.sql.functions import unix_timestamp,datediff,to_date +from pyspark.sql.types import DateType + + +def products(spark): + data = [ + ("Abc","2020-07-25", 37.50, 20.000), + ("Aged_Brie","2020-07-26", 37.51, 30.000), + ("Sulfurus","2020-07-24", 12.67, 10.000), + ("Backstage_Passes","2020-07-23", 37.50, 20.000), + ] + schema = T.StructType( + [ + T.StructField("Product_Name",T.StringType()), + T.StructField("date_of_epiry", T.StringType()), + T.StructField("price", T.DoubleType()), + T.StructField("quality", T.DoubleType()), + ] + ) + + return spark.createDataFrame(data, schema) + +def increse_quality(product): + p_currentdate=product.withColumn("cast_date_of_epiry",date_format('date_of_epiry',"yyyy-MM-dd")).withColumn("current_date",current_date()) + p_date_cal=p_currentdate.withColumn("days_left_for_epiry",datediff(to_date("cast_date_of_epiry"),to_date(unix_timestamp('current_date',"yyyyy-MM-dd").cast("timestamp")))) + + p_increase_quality=p_date_cal.filter(col("product_name").isin(['Aged_Brie','Backstage_Passes'])) + p_increase_quality_updated=p_increase_quality.withColumn("increased_updated_quality",col("quality")+1).filter('quality<50 and quality>0').drop("date_of_epiry") + p_increase_quality_cal = p_increase_quality_updated.withColumn("quality", + F.when(F.col("increased_updated_quality").isNull(), + 0) + .otherwise(F.col("increased_updated_quality"))) + return p_increase_quality_cal + +def decrease_quality(product): + p_currentdate = product.withColumn("cast_date_of_epiry", date_format('date_of_epiry', "yyyy-MM-dd")).withColumn( + "current_date", current_date()) + p_date_cal = p_currentdate.withColumn("days_left_for_epiry", datediff(to_date("cast_date_of_epiry"), to_date( + unix_timestamp('current_date', "yyyyy-MM-dd").cast("timestamp")))) + p_decrease_quality = p_date_cal.filter(col("product_name").isin(['Sulfurus'])) + p_decrease_quality_updated = p_decrease_quality.withColumn('decreased_updated_quality', F.when( + (p_decrease_quality.days_left_for_epiry == 0) | (p_decrease_quality.days_left_for_epiry < 0), + col("quality") / 2).otherwise(col("quality") - 1)).filter('quality<50 and quality>0').drop("date_of_epiry") + p_decrease_quality_updated_cal = p_decrease_quality_updated.withColumn('quality', lit( + p_decrease_quality_updated.decreased_updated_quality)) + return p_decrease_quality_updated_cal + +if __name__ == "__main__": + spark = SparkSession.builder.master("local[*]").getOrCreate() + + product=products(spark) + product.show() + + increse_quality=increse_quality(product) + increse_quality.show() + + decrease_quality=decrease_quality(product) + decrease_quality.show() +