|
import traceback
|
|
from pyspark.sql import SparkSession
|
|
from pyspark import SparkConf
|
|
from pyspark.sql.functions import col,regexp_replace, concat_ws, when, collect_list, lit, to_timestamp
|
|
from pyspark.sql.functions import year, month, date_format
|
|
from pyspark.sql import functions as F
|
|
from pyspark.sql.types import LongType,DecimalType,IntegerType,TimestampType,DoubleType
|
|
from pyspark.sql.functions import *
|
|
from pytz import timezone
|
|
from datetime import datetime,timedelta
|
|
from pyspark.sql.window import Window
|
|
import json
|
|
import sys
|
|
import logging
|
|
import datetime
|
|
import time
|
|
import os
|
|
import psycopg2
|
|
import requests
|
|
from requests.auth import HTTPBasicAuth
|
|
import base64
|
|
import functools
|
|
import boto3
|
|
|
|
# adding '/home/hadoop' path of emr master instance as our downloaded packages will be present at this path
|
|
sys.path.append('/home/hadoop')
|
|
|
|
curr_time = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
log_file_name = 'job_' + str(datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')) + '.log'
|
|
extra = {'log_file_name': log_file_name}
|
|
logger = logging.getLogger(__name__)
|
|
syslog = logging.FileHandler(log_file_name, mode='w')
|
|
formatter = logging.Formatter('%(log_file_name)s;%(asctime)s;%(levelname)s;%(message)s')
|
|
syslog.setFormatter(formatter)
|
|
logger.setLevel(logging.INFO)
|
|
logger.addHandler(syslog)
|
|
logger = logging.LoggerAdapter(logger, extra)
|
|
|
|
def read_config(config_path):
|
|
logger.info("Inside read config")
|
|
try:
|
|
# checking if config path provided as input is s3 path or file system path
|
|
if config_path[0:2] == 's3':
|
|
# read config file from s3
|
|
logger.info("Reading config file from S3")
|
|
s3 = boto3.resource('s3')
|
|
file_object = s3.Object(config_path.split('/')[2], '/'.join(config_path.split('/')[3:]))
|
|
file_content = file_object.get()['Body'].read().decode('utf-8')
|
|
# converting file content to json format
|
|
json_content = json.loads(file_content)
|
|
json_object = json.dumps(json_content)
|
|
else:
|
|
# reading config file from system
|
|
logger.info("Reading config file from path : " + config_path)
|
|
# converting file content to json format
|
|
json_content = json.load(open(config_path, 'r'))
|
|
json_object = json.dumps(json_content)
|
|
logger.info("Input Config Details:")
|
|
logger.info(json_object)
|
|
return json_content
|
|
except Exception as e:
|
|
raise Exception("Error reading config.")
|
|
|
|
def get_secret(secret):
|
|
secret_name = secret
|
|
region_name = "ap-south-1"
|
|
|
|
session = boto3.session.Session()
|
|
client = session.client(
|
|
service_name='secretsmanager',
|
|
region_name=region_name,
|
|
)
|
|
|
|
try:
|
|
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
|
|
except ClientError as e:
|
|
if e.response['Error']['Code'] == 'ResourceNotFoundException':
|
|
print("The requested secret " + secret_name + " was not found")
|
|
elif e.response['Error']['Code'] == 'InvalidRequestException':
|
|
print("The request was invalid due to:", e)
|
|
elif e.response['Error']['Code'] == 'InvalidParameterException':
|
|
print("The request had invalid params:", e)
|
|
elif e.response['Error']['Code'] == 'DecryptionFailure':
|
|
print("The requested secret can't be decrypted using the provided KMS key:", e)
|
|
elif e.response['Error']['Code'] == 'InternalServiceError':
|
|
print("An error occurred on service side:", e)
|
|
else:
|
|
# Secrets Manager decrypts the secret value using the associated KMS CMK
|
|
# Depending on whether the secret was a string or binary, only one of these fields will be populated
|
|
if 'SecretString' in get_secret_value_response:
|
|
text_secret_data = get_secret_value_response['SecretString']
|
|
return text_secret_data
|
|
else:
|
|
binary_secret_data = get_secret_value_response['SecretBinary']
|
|
return binary_secret_data
|
|
logger.info("Secret manager read complete")
|
|
|
|
def create_spark_session(config):
|
|
logger.info("Inside create spark session")
|
|
try:
|
|
conf = SparkConf()
|
|
|
|
# setting spark configuration properties provided in config file
|
|
spark_conf = dict(config['spark_properties'])
|
|
for key in spark_conf.keys():
|
|
conf.set(key, spark_conf[key])
|
|
logger.info("Secret manager read")
|
|
if 'application_name' in list(config.keys()):
|
|
if config['application_name'] != '':
|
|
app_name = config['application_name']
|
|
else:
|
|
app_name = 'DefaultApp'
|
|
else:
|
|
app_name = 'DefaultApp'
|
|
logger.info("Secret manager read start")
|
|
# creating spark session
|
|
spark = SparkSession.builder.config(conf=conf).appName(app_name).enableHiveSupport().getOrCreate()
|
|
spark.sparkContext.setLogLevel("ERROR")
|
|
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
|
|
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead",'LEGACY')
|
|
spark.conf.set("spark.sql.legacy.timeParserPolicy",'CORRECTED')
|
|
spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite",'CORRECTED')
|
|
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite",'CORRECTED')
|
|
spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead",'CORRECTED')
|
|
spark.conf.set("spark.sql.shuffle.partitions",100)
|
|
logger.info("Spark session object created")
|
|
return spark
|
|
except Exception as e:
|
|
raise Exception("Error in Spark Session Creation.")
|
|
|
|
def read_file(spark,config,table):
|
|
readOptions = {
|
|
'hoodie.datasource.query.type': 'incremental',
|
|
'hoodie.datasource.hive_sync.support_timestamp': 'true'
|
|
}
|
|
path = config['Paths'][table]
|
|
df=spark.read.format("hudi").load(path)
|
|
df =df.withColumn('_hoodie_commit_time',to_timestamp(F.concat(F.substring(col('_hoodie_commit_time'),1,4),F.lit('-'),\
|
|
F.substring(col('_hoodie_commit_time'),5,2),F.lit('-'),\
|
|
F.substring(col('_hoodie_commit_time'),7,2),F.lit(' '),\
|
|
F.substring(col('_hoodie_commit_time'),9,2),F.lit(':'),\
|
|
F.substring(col('_hoodie_commit_time'),11,2),F.lit(':'),\
|
|
F.substring(col('_hoodie_commit_time'),13,2)\
|
|
)))
|
|
return df
|
|
|
|
def get_max_audit_batch(conn,job_name, config):
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT COALESCE(MAX(COALESCE(BATCH_ID,0)),0)+1 FROM "+config['audit_table'])
|
|
result = cur.fetchall()[0][0]
|
|
logger.info("Maximum batch id in Audit Table is :"+str(result))
|
|
return result
|
|
|
|
def read_max_update_date(conn, job_name, table, config):
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT MAX(max_update_date) from "+config['audit_table']+" WHERE mart_table_name = '"+job_name+"' AND src_table_name = '"+table+"'")
|
|
query_results = cur.fetchall()
|
|
except Exception as e:
|
|
print("Database connection failed due to {}".format(e))
|
|
raise Exception("Error reading audit table.")
|
|
return query_results
|
|
logger.info("Reading max of max_update_date from audit table complete")
|
|
|
|
def insert_max_update_date(spark,conn, job_name, table, max_update_date,source_reference_date, max_batch_id, config):
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute("INSERT INTO "+config['audit_table']+"(mart_table_name, src_table_name, max_update_date, load_timestamp,source_reference_date,batch_id) VALUES ('"+str(job_name)+"', '"+str(table)+"', '"+str(max_update_date)+"', SYSDATE ,'"+str(source_reference_date)+"' as source_reference_date,cast('"+str(max_batch_id)+"' as int) as batch_id)")
|
|
|
|
except Exception as e:
|
|
print("Database connection failed due to {}".format(e))
|
|
raise Exception("Error Updating audit table.")
|
|
logger.info("Inserting max max_update_date into audit table complete")
|
|
|
|
def write_file(spark,conn,redshift_iam_role,resultdf_path, config, table_name):
|
|
#Writing resultant data into incr table using copy command
|
|
logger.info("write data to redshift started")
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(f"""Truncate table int.{table_name};commit;""" )
|
|
sql="""COPY %s FROM '%s' credentials 'aws_iam_role=%s' FORMAT PARQUET; commit;""" % \
|
|
(f"int.{table_name}", resultdf_path,redshift_iam_role)
|
|
cur.execute(sql)
|
|
|
|
except Exception as e:
|
|
print("Database connection failed due to {}".format(e))
|
|
raise Exception("Error Inserting target table.")
|
|
print("write complete")
|
|
logger.info("upsert data to rds completed")
|
|
|
|
def main():
|
|
logger.info("Inside main function")
|
|
if len(sys.argv) != 2:
|
|
logger.info(len(sys.argv))
|
|
logger.info("Command line arguments : " + str(sys.argv))
|
|
logger.info("Incorrect command line arguments.")
|
|
exit(1)
|
|
|
|
config = {}
|
|
spark = ''
|
|
job_status = ''
|
|
|
|
try:
|
|
# reading json config file
|
|
logger.info("Calling function to read config file")
|
|
config = read_config(sys.argv[1])
|
|
logger.info("Calling function to create Spark session object")
|
|
#creating spark session
|
|
spark = create_spark_session(config)
|
|
logger.info("Calling function to read input file")
|
|
start_time = datetime.datetime.now(timezone("Asia/Kolkata")).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
#creating redshift database connection
|
|
redshift_secret = get_secret(config['redshift_secret'])
|
|
redshift_secret = json.loads(redshift_secret)
|
|
redshift_user = redshift_secret['username']
|
|
redshift_pwd = redshift_secret['password']
|
|
redshift_host = redshift_secret['host']
|
|
redshift_port = str(redshift_secret['port'])
|
|
redshift_dbname = redshift_secret['dbname']
|
|
#creating database connection
|
|
redshift_conn=psycopg2.connect(dbname=redshift_dbname, host=redshift_host, port=redshift_port, user=redshift_user, password=redshift_pwd)
|
|
redshift_dburl = "jdbc:postgresql://"+redshift_host+":"+redshift_port+"/"+redshift_dbname
|
|
cur = redshift_conn.cursor()
|
|
max_batch_id = get_max_audit_batch(redshift_conn, config['application_name'], config)
|
|
|
|
INSERT_CODE_1
|
|
|
|
#writing from parquet to table in database
|
|
write_file(spark, redshift_conn, config['redshift_iam_role'],config['incr2df_path'],config, config['incr2df'])
|
|
write_file(spark, redshift_conn, config['redshift_iam_role'],config['resultdf_path'],config, config['resultdf'])
|
|
|
|
INSERT_CODE_2
|
|
|
|
print('Run Successful')
|
|
print('End of Code')
|
|
|
|
|
|
except Exception as e:
|
|
#job gets error
|
|
job_status = 'Failed'
|
|
print(e)
|
|
|
|
finally:
|
|
spark.catalog.clearCache()
|
|
redshift_conn.commit()
|
|
redshift_conn.close()
|
|
spark.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# calling main function
|
|
logger.info("Calling main function")
|
|
main() |