'''Assuming data loading part as a one time process and scheduling the job to run on daily basis''' from pyspark.sql import SparkSession import pyspark.sql.types as T from pyspark.context import SparkContext from pyspark.sql.session import SparkSession from pyspark.sql import SQLContext spark = SparkSession.builder.getOrCreate() from pyspark.sql import functions as F from pyspark.sql.functions import * from pyspark.sql.functions import current_date from pyspark.sql.functions import unix_timestamp,datediff,to_date from pyspark.sql.types import DateType def products(spark): data = [ ("Abc","2020-07-25", 37.50, 20.000), ("Aged_Brie","2020-07-26", 37.51, 30.000), ("Sulfurus","2020-07-24", 12.67, 10.000), ("Backstage_Passes","2020-07-23", 37.50, 20.000), ] schema = T.StructType( [ T.StructField("Product_Name",T.StringType()), T.StructField("date_of_epiry", T.StringType()), T.StructField("price", T.DoubleType()), T.StructField("quality", T.DoubleType()), ] ) return spark.createDataFrame(data, schema) def increse_quality(product): p_currentdate=product.withColumn("cast_date_of_epiry",date_format('date_of_epiry',"yyyy-MM-dd")).withColumn("current_date",current_date()) p_date_cal=p_currentdate.withColumn("days_left_for_epiry",datediff(to_date("cast_date_of_epiry"),to_date(unix_timestamp('current_date',"yyyyy-MM-dd").cast("timestamp")))) p_increase_quality=p_date_cal.filter(col("product_name").isin(['Aged_Brie','Backstage_Passes'])) p_increase_quality_updated=p_increase_quality.withColumn("increased_updated_quality",col("quality")+1).filter('quality<50 and quality>0').drop("date_of_epiry") p_increase_quality_cal = p_increase_quality_updated.withColumn("quality", F.when(F.col("increased_updated_quality").isNull(), 0) .otherwise(F.col("increased_updated_quality"))) return p_increase_quality_cal def decrease_quality(product): p_currentdate = product.withColumn("cast_date_of_epiry", date_format('date_of_epiry', "yyyy-MM-dd")).withColumn( "current_date", current_date()) p_date_cal = p_currentdate.withColumn("days_left_for_epiry", datediff(to_date("cast_date_of_epiry"), to_date( unix_timestamp('current_date', "yyyyy-MM-dd").cast("timestamp")))) p_decrease_quality = p_date_cal.filter(col("product_name").isin(['Sulfurus'])) p_decrease_quality_updated = p_decrease_quality.withColumn('decreased_updated_quality', F.when( (p_decrease_quality.days_left_for_epiry == 0) | (p_decrease_quality.days_left_for_epiry < 0), col("quality") / 2).otherwise(col("quality") - 1)).filter('quality<50 and quality>0').drop("date_of_epiry") p_decrease_quality_updated_cal = p_decrease_quality_updated.withColumn('quality', lit( p_decrease_quality_updated.decreased_updated_quality)) return p_decrease_quality_updated_cal if __name__ == "__main__": spark = SparkSession.builder.master("local[*]").getOrCreate() product=products(spark) product.show() increse_quality=increse_quality(product) increse_quality.show() decrease_quality=decrease_quality(product) decrease_quality.show()