diff --git "a/pages/5SOURCE TO TARGET MAPPING.py" "b/pages/5SOURCE TO TARGET MAPPING.py" --- "a/pages/5SOURCE TO TARGET MAPPING.py" +++ "b/pages/5SOURCE TO TARGET MAPPING.py" @@ -16,1336 +16,1349 @@ import sdv from sdv.metadata import MultiTableMetadata from collections import defaultdict import pymssql + +############ +from streamlit_app import sidebar + genai.configure(api_key='AIzaSyCeY8jSHKW6t0OSDRjc2VAfBvMunVrff2w') # Create a GenerativeModel instance model = genai.GenerativeModel( model_name='models/gemini-1.5-flash' ) -def read_excel(path, sheet): - df = pd.read_excel(path, sheet_name = sheet, dtype = 'str') - return df - -def split_join_condition(join_condition): - conditions = [] - condition = '' - bracket_count = 0 - - for char in join_condition: - if char == '(': - bracket_count += 1 - elif char == ')': - bracket_count -+ 1 - if char == ',' and bracket_count == 0: +###### +def main(): + # st.title('PAGE TITLE') # Change this for each page + sidebar() +######## + def read_excel(path, sheet): + df = pd.read_excel(path, sheet_name = sheet, dtype = 'str') + return df + + def split_join_condition(join_condition): + conditions = [] + condition = '' + bracket_count = 0 + + for char in join_condition: + if char == '(': + bracket_count += 1 + elif char == ')': + bracket_count -+ 1 + if char == ',' and bracket_count == 0: + conditions.append(condition.strip()) + condition = '' + else: + condition += char + if condition: conditions.append(condition.strip()) - condition = '' - else: - condition += char - if condition: - conditions.append(condition.strip()) - - return conditions - -def join_incr(join_conditions): - join = [] - join_pattern = re.compile(r'(\w+\.\w+)\s*=\s*(\w+\w.\w+)', re.IGNORECASE) - for join_condition in join_conditions: - parts = re.split(r'\sAND\s|\sOR\s', join_condition, flags = re.IGNORECASE) - temp = [x.strip() for x in parts if join_pattern.match(x.strip())] - join.append(' AND '.join(temp)) - return join - -def generate_sql(temp_table): - proc_query = [] - base_table = None - - source_table_schema = 'MAIN.GOLD' - temp_table_schema = 'MAIN.GOLD' - base_pk = [] - - join_fields = set() - - for _,row in df.iterrows(): - source_table = row['Source Table'] - primary_key = row['Primary Key'] - source_column = row['Source Column'] - alias = row['Alias'] - joining_keys = row['Joining Keys'] - - if not base_table: - if primary_key == 'Y': - base_table = source_table - base_pk.append(joining_keys) - if pd.notna(joining_keys): - keys = [x.strip() for x in joining_keys.split(',')] - for x in keys: - if x not in join_fields: - join_fields.add(x) + return conditions - unique_cols = ['Source Table', 'Joining Keys', 'Primary Key', 'Join Type','Join Tables','Join Condition'] - unique_df = df.drop_duplicates(subset = unique_cols) + def join_incr(join_conditions): + join = [] + join_pattern = re.compile(r'(\w+\.\w+)\s*=\s*(\w+\w.\w+)', re.IGNORECASE) + for join_condition in join_conditions: + parts = re.split(r'\sAND\s|\sOR\s', join_condition, flags = re.IGNORECASE) + temp = [x.strip() for x in parts if join_pattern.match(x.strip())] + join.append(' AND '.join(temp)) + return join - incremantal_mapping = {} - incr_joins = {} - - for _,row in unique_df.iterrows(): - - source_table = row['Source Table'] - source_column = row['Source Column'] - joining_keys = row['Joining Keys'] - primary_key = row['Primary Key'] - direct_derived = row['Direct/Derived'] - join_type = row['Join Type'] - join_tables = row['Join Tables'] - join_condition = row['Join Condition'] - - if source_table == base_table: - if primary_key == 'Y': + def generate_sql(temp_table): + proc_query = [] + base_table = None + + source_table_schema = 'MAIN.GOLD' + temp_table_schema = 'MAIN.GOLD' + base_pk = [] + + join_fields = set() + + for _,row in df.iterrows(): + source_table = row['Source Table'] + primary_key = row['Primary Key'] + source_column = row['Source Column'] + alias = row['Alias'] + joining_keys = row['Joining Keys'] + + if not base_table: + if primary_key == 'Y': + base_table = source_table + base_pk.append(joining_keys) + + if pd.notna(joining_keys): + keys = [x.strip() for x in joining_keys.split(',')] + for x in keys: + if x not in join_fields: + join_fields.add(x) + + unique_cols = ['Source Table', 'Joining Keys', 'Primary Key', 'Join Type','Join Tables','Join Condition'] + unique_df = df.drop_duplicates(subset = unique_cols) + + incremantal_mapping = {} + incr_joins = {} + + for _,row in unique_df.iterrows(): + + source_table = row['Source Table'] + source_column = row['Source Column'] + joining_keys = row['Joining Keys'] + primary_key = row['Primary Key'] + direct_derived = row['Direct/Derived'] + join_type = row['Join Type'] + join_tables = row['Join Tables'] + join_condition = row['Join Condition'] + + if source_table == base_table: + if primary_key == 'Y': + key = (source_table, joining_keys, join_type, join_tables, join_condition) + key1 = source_table + else: + continue + else: key = (source_table, joining_keys, join_type, join_tables, join_condition) key1 = source_table - else: - continue - else: - key = (source_table, joining_keys, join_type, join_tables, join_condition) - key1 = source_table - if pd.notna(direct_derived) and pd.notna(source_table) and pd.notna(source_column): - if key not in incremantal_mapping: - incremantal_mapping[key] = { - 'source_table': source_table, - 'joining_keys':joining_keys, - 'join_type': join_type, - 'join_tables': join_tables, - 'join_condition': join_condition - } - if key1 not in incr_joins: - if pd.notna(direct_derived) and direct_derived == 'DERIVED': - incr_joins[key1] = { + if pd.notna(direct_derived) and pd.notna(source_table) and pd.notna(source_column): + if key not in incremantal_mapping: + incremantal_mapping[key] = { + 'source_table': source_table, + 'joining_keys':joining_keys, 'join_type': join_type, - 'join_tables': ', '.join([x.strip() for x in join_tables.split(',') if x != base_table]), + 'join_tables': join_tables, 'join_condition': join_condition } - incremental_df = pd.DataFrame(incremantal_mapping.values()) - incr_join_grps = incremental_df.groupby(['source_table']) - proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.{temp_table}_INCR;') - - incr_table_join_info = {} - for _,row in incremental_df.iterrows(): - source_table = row['source_table'] - - if source_table != base_table: - joining_keys = row['joining_keys'] - join_type = row['join_type'] - join_tables = [x.strip() for x in row['join_tables'].split(',')] - index = join_tables.index(source_table) - join_condition = [x.strip() for x in row['join_condition'].split(',')][0:index] - incr_table_join_info[source_table] = ', '.join(join_condition) - - incr_query = [] - incr_cols = '' - incr_tables = [] - incr_join = {} - - for _, group in incr_join_grps: + if key1 not in incr_joins: + if pd.notna(direct_derived) and direct_derived == 'DERIVED': + incr_joins[key1] = { + 'join_type': join_type, + 'join_tables': ', '.join([x.strip() for x in join_tables.split(',') if x != base_table]), + 'join_condition': join_condition + } + incremental_df = pd.DataFrame(incremantal_mapping.values()) + incr_join_grps = incremental_df.groupby(['source_table']) + proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.{temp_table}_INCR;') - for table in _.split(): - if base_table != table: - - join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')] - join_keys = [t.strip() for t in ','.join(base_pk).split(',')] - join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')] - join_cond = split_join_condition(incr_table_join_info[table]) - join_condition = join_incr(join_cond) - source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] - - join_key_list = [] - for x in join_keys: - join_key_list.append(f'{base_table}.{x}') - join_key = ', '.join(join_key_list) - - for y in source_table: - sql = f""" - INSERT INTO {temp_table_schema}.{temp_table}_INCR - ( - SELECT {join_key}, {table_details_mapping[y][0]}, {table_details_mapping[y][1]}, '{y}', 1, CURRENT_TIMESTAMP - FROM {source_table_schema}.{base_table} {base_table}""" - - incr_join_text = '' - for i in range(len(join_condition)): - sql += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' - incr_join_text += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' - incr_join[y] = incr_join_text - - sql += f""" - WHERE COALESCE({join_tables[i+1]}.operation,'NA') <> 'D' - AND TO_TIMESTAMP( CAST(SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),1,4) || '-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),5,2) ||'-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),7,2) || ' ' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),9,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),11,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{y}') - );""" + incr_table_join_info = {} + for _,row in incremental_df.iterrows(): + source_table = row['source_table'] + + if source_table != base_table: + joining_keys = row['joining_keys'] + join_type = row['join_type'] + join_tables = [x.strip() for x in row['join_tables'].split(',')] + index = join_tables.index(source_table) + join_condition = [x.strip() for x in row['join_condition'].split(',')][0:index] + incr_table_join_info[source_table] = ', '.join(join_condition) + + incr_query = [] + incr_cols = '' + incr_tables = [] + incr_join = {} - incr_query.append(sql) - incr_tables.append(y) + for _, group in incr_join_grps: - else: - source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] - join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')] + for table in _.split(): + if base_table != table: + + join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')] + join_keys = [t.strip() for t in ','.join(base_pk).split(',')] + join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')] + join_cond = split_join_condition(incr_table_join_info[table]) + join_condition = join_incr(join_cond) + source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] + + join_key_list = [] + for x in join_keys: + join_key_list.append(f'{base_table}.{x}') + join_key = ', '.join(join_key_list) + + for y in source_table: + sql = f""" + INSERT INTO {temp_table_schema}.{temp_table}_INCR + ( + SELECT {join_key}, {table_details_mapping[y][0]}, {table_details_mapping[y][1]}, '{y}', 1, CURRENT_TIMESTAMP + FROM {source_table_schema}.{base_table} {base_table}""" + + incr_join_text = '' + for i in range(len(join_condition)): + sql += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' + incr_join_text += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' + incr_join[y] = incr_join_text + + sql += f""" + WHERE COALESCE({join_tables[i+1]}.operation,'NA') <> 'D' + AND TO_TIMESTAMP( CAST(SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),1,4) || '-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),5,2) ||'-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),7,2) || ' ' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),9,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),11,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{y}') + );""" + + incr_query.append(sql) + incr_tables.append(y) - join_key_list = [] - for x in join_keys: - join_key_list.append(f'{base_table}.{x}') - join_key = ', '.join(join_key_list) + else: + source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] + join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')] + + join_key_list = [] + for x in join_keys: + join_key_list.append(f'{base_table}.{x}') + join_key = ', '.join(join_key_list) + + incr_cols = join_key + sql = f""" + INSERT INTO {temp_table_schema}.{temp_table}_INCR + ( + SELECT {join_key}, {table_details_mapping[base_table][0]}, {table_details_mapping[base_table][1]}, '{base_table}', 1, CURRENT_TIMESTAMP + FROM {source_table_schema}.{base_table} {base_table} + WHERE COALESCE(operation,'NA') <> 'D' + AND TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{base_table}') + );""" + proc_query.append(sql) + incr_tables.append(base_table) + + proc_query.append('\n'.join(incr_query)) + proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.INCR1_{temp_table};') + + sql = f""" + INSERT INTO {temp_table_schema}.INCR1_{temp_table} + ( + SELECT DISTINCT {incr_cols.replace(f'{base_table}.', '')} + FROM {temp_table_schema}.{temp_table}_INCR + );""" + + proc_query.append(sql) - incr_cols = join_key + incr_table_dict = {} + for table in incr_tables: + if table == base_table: + incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}' + else: + p = [x for x in incr_join[table].split('\n\t') if len(x) > 1] + if len(p) == 1: + incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}' + else: + incr_table_dict[table] = f'{source_table_schema}.{table}' + + s = [] + for table in incr_tables: + incr2_sql_list = [] + + if table == base_table: + for key in incr_cols.replace(f'{base_table}.', '').split(','): + incr2_sql_list.append(f"{base_table}.{key} = A.{key}") + incr2_sql_join = ' AND '.join(incr2_sql_list) + sql = f""" - INSERT INTO {temp_table_schema}.{temp_table}_INCR - ( - SELECT {join_key}, {table_details_mapping[base_table][0]}, {table_details_mapping[base_table][1]}, '{base_table}', 1, CURRENT_TIMESTAMP - FROM {source_table_schema}.{base_table} {base_table} - WHERE COALESCE(operation,'NA') <> 'D' - AND TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{base_table}') - );""" + CREATE TABLE {temp_table_schema}.INCR2_{table} + AS + SELECT + {table}.* + FROM + {source_table_schema}.{table} {table} + INNER JOIN + {temp_table_schema}.INCR1_{temp_table} A ON {incr2_sql_join}; """ + proc_query.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};') proc_query.append(sql) - incr_tables.append(base_table) - - proc_query.append('\n'.join(incr_query)) - proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.INCR1_{temp_table};') - - sql = f""" - INSERT INTO {temp_table_schema}.INCR1_{temp_table} - ( - SELECT DISTINCT {incr_cols.replace(f'{base_table}.', '')} - FROM {temp_table_schema}.{temp_table}_INCR - );""" - - proc_query.append(sql) - - incr_table_dict = {} - for table in incr_tables: - if table == base_table: - incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}' - else: - p = [x for x in incr_join[table].split('\n\t') if len(x) > 1] - if len(p) == 1: - incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}' + else: - incr_table_dict[table] = f'{source_table_schema}.{table}' - - s = [] - for table in incr_tables: - incr2_sql_list = [] - - if table == base_table: - for key in incr_cols.replace(f'{base_table}.', '').split(','): - incr2_sql_list.append(f"{base_table}.{key} = A.{key}") - incr2_sql_join = ' AND '.join(incr2_sql_list) - + + p = [x for x in incr_join[table].split('\n\t') if len(x) > 1] + if len(p) == 1: + sql = f""" + CREATE TABLE {temp_table_schema}.INCR2_{table} + AS + SELECT + {table}.* + FROM + {temp_table_schema}.INCR2_{base_table} {base_table} {incr_join[table]};""" + s.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};') + s.append(sql) + + for x in s: + proc_query.append(x) + + select_clause = [] + from_clause = [] + where_clause = [] + + for _,row in df.iterrows(): + field_name = row['Field_Name'] + source_table = row['Source Table'] + source_column = row['Source Column'] + joining_keys = row['Joining Keys'] + primary_key = row['Primary Key'] + direct_derived = row['Direct/Derived'] + join_type = row['Join Type'] + join_tables = row['Join Tables'] + join_condition = row['Join Condition'] + column_operation = row['Column Operations'] + alias = row['Alias'] + granularity = row['Granularity'] + filter_condition = row['Filter Condition'] + clauses = row['Clauses'] + ordering = row['Ordering'] + + if pd.notna(direct_derived): + if pd.notna(column_operation): + if len(column_operation.split()) == 1: + select_expr = f'{column_operation.upper()}({source_table}.{source_column})' + else: + select_expr = column_operation + else: + if pd.notna(source_table): + select_expr = f'{source_table}.{source_column}' + else: + select_expr = source_column + + if source_column not in join_fields: + if pd.notna(alias): + select_expr += f' AS {alias}' + else: + if pd.notna(column_operation) and pd.notna(source_column): + select_expr += f' AS {source_column}' + + if direct_derived.upper() == 'DIRECT': + select_clause.append(select_expr) + elif direct_derived.upper() == 'DERIVED_BASE': + select_clause.append(select_expr) + + if pd.notna(filter_condition): + where_clause.append(filter_condition) + + select_query = ',\n\t'.join(select_clause) + sql_query = f"CREATE TABLE {temp_table_schema}.{base_table}_BASE\nAS \n\tSELECT \n\t{select_query} \nFROM\n\t{incr_table_dict[base_table]} {base_table}" + if where_clause: + sql_query += f"\nWHERE {' AND'.join(where_clause)}" + sql_query += ';' + proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{base_table}_BASE;") + proc_query.append(sql_query) + + df['Clauses'].fillna('', inplace = True) + df['Ordering'].fillna('', inplace = True) + c = 1 + temp_base_table = f'{base_table}_BASE' + grp_cols = ['Join Condition', 'Clauses', 'Ordering'] + join_grps = df[df['Direct/Derived'] == 'DERIVED'].groupby(['Join Condition', 'Clauses', 'Ordering']) + temp_tables_sql = [] + for (join_condition,clauses,ordering), group in join_grps: + if pd.notna(group['Direct/Derived'].iloc[0]): + if group['Direct/Derived'].iloc[0].upper() == 'DERIVED': + join_tables = [t.strip() for t in group['Join Tables'].iloc[0].split(',')] + join_keys = [t.strip() for t in group['Joining Keys'].iloc[0].split(',')] + join_type = [t.strip() for t in group['Join Type'].iloc[0].split(',')] + join_condition = split_join_condition(group['Join Condition'].iloc[0]) + temp_table_name = f"TEMP_{group['Source Table'].iloc[0]}" + source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')] + alias = [t.strip() for t in (','.join(group['Alias'])).split(',')] + source_table = [t.strip() for t in (','.join(group['Source Table'])).split(',')] + + base_cols = [] + for join_key in join_keys: + base_cols.append(f'{join_tables[0]}.{join_key}') + + for s_table,col,alias in zip(source_table,source_column,alias): + if pd.notna(group['Column Operations'].iloc[0]): + if len(group['Column Operations'].iloc[0].split()) == 1: + select_expr = f"{group['Column Operations'].iloc[0].upper()}({s_table}.{col})" + else: + select_expr = group['Column Operations'].iloc[0] + else: + if pd.notna(s_table): + select_expr = f"{s_table}.{col}" + else: + select_expr = col + + if alias: + select_expr += f" AS {alias}" + base_cols.append(select_expr) + + if ordering: + base_cols.append(f"{ordering} AS RN") + + sql = ',\n\t\t'.join(base_cols) + + join_sql = f"SELECT \n\t\t{sql} \nFROM\n\t{incr_table_dict[base_table]} {join_tables[0]}" + for i in range(len(join_type)): + join_sql += f'\n\t{join_type[i]} JOIN {incr_table_dict[join_tables[i+1]]} {join_tables[i+1]} ON {join_condition[i]}' + if clauses: + join_sql += f'\n\t{clauses}' + join_sql += ';' + + proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{temp_table_name};") + proc_query.append(f"CREATE TABLE {temp_table_schema}.{temp_table_name}\nAS \n\t{join_sql}") + + granularity = [t.strip() for t in group['Granularity'].iloc[0].split(',')] + + sql = [] + for key in join_keys: + sql.append(f"A.{key} = B.{key}") + + temp_cols = [] + temp_cols.append('A.*') + + source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')] + alias = [t.strip() for t in (','.join(group['Alias'])).split(',')] + + for col,alias in zip(source_column,alias): + select_expr = f"B.{col}" + if alias: + select_expr = f"B.{alias}" + else: + select_expr = f"B.{col}" + temp_cols.append(select_expr) + + temp_select_query = ',\n\t\t'.join(temp_cols) + + proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.TEMP_{temp_table}_{c};") + + base_sql = f"CREATE TABLE {temp_table_schema}.TEMP_{temp_table}_{c}\nAS \n\tSELECT \n\t\t{temp_select_query} \nFROM\n\t{temp_table_schema}.{temp_base_table} AS A" + base_sql += f"\n\tLEFT OUTER JOIN {temp_table_schema}.{temp_table_name} B ON {' AND '.join(sql)}" + + if '1:1' in granularity and len(ordering) > 1: + base_sql += f" AND B.RN = 1" + base_sql += ';' + + temp_base_table = f'TEMP_{temp_table}_{c}' + c += 1 + proc_query.append(base_sql) + + fin_table_name = temp_table + fin_table_cols = [] + + for _,row in df.iterrows(): + field_name = row['Field_Name'] + source_table = row['Source Table'] + source_column = row['Source Column'] + alias = row['Alias'] + + if pd.notna(row['Direct/Derived']): + if (source_column in join_fields): + fin_table_cols.append(f'{source_column} AS "{field_name}"') + else: + fin_table_cols.append(f'"{field_name}"') + + fin_table_cols = ',\n\t\t'.join(fin_table_cols) + fin_sql = f"INSERT INTO {temp_table_schema}.{fin_table_name}\n\tSELECT \n\t\t{fin_table_cols} \nFROM\n\t{temp_table_schema}.TEMP_{temp_table}_{c-1};" + + + condition_col = '_'.join(incr_cols.replace(f'{base_table}.', '').split(',')) + proc_query.append(f"DELETE FROM {temp_table_schema}.{fin_table_name}\nWHERE {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} IN (SELECT {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} FROM {temp_table_schema}.INCR1_{temp_table});") + proc_query.append(fin_sql) + + for table in incr_tables: sql = f""" - CREATE TABLE {temp_table_schema}.INCR2_{table} - AS + INSERT INTO audit.reportingdb_audit_tbl_{temp_table} + ( SELECT - {table}.* - FROM - {source_table_schema}.{table} {table} - INNER JOIN - {temp_table_schema}.INCR1_{temp_table} A ON {incr2_sql_join}; """ - proc_query.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};') + '{temp_table}' as mart_table_name, + '{table}' as src_table_name, + coalesce( max(TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS')),(select max(max_update_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) max_update_date, + CURRENT_TIMESTAMP as load_timestamp, + coalesce(max(prev_updt_ts),(select max(source_reference_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) AS source_reference_date, + max(nvl(batch_number,0))+1 + FROM {temp_table_schema}.{temp_table}_INCR where table_name = '{table}' + );""" proc_query.append(sql) - else: - - p = [x for x in incr_join[table].split('\n\t') if len(x) > 1] - if len(p) == 1: - sql = f""" - CREATE TABLE {temp_table_schema}.INCR2_{table} - AS - SELECT - {table}.* - FROM - {temp_table_schema}.INCR2_{base_table} {base_table} {incr_join[table]};""" - s.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};') - s.append(sql) - - for x in s: - proc_query.append(x) - - select_clause = [] - from_clause = [] - where_clause = [] + return base_table, base_pk, proc_query, incr_join_grps, incr_table_join_info, incr_join, temp_table_schema + + def create_df(query, table_df_mapping, table_usage_count): + script = [] + query = ' '.join(query.split()).strip() + match = re.match(r'CREATE TABLE (\w+\.\w+\.\w+) AS (SELECT .+)', query, re.IGNORECASE) + source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE) + source_tables = [table for pair in source_tables for table in pair if table] + + if not match: + raise ValueError('Invalid SQL') + table_name = match.group(1).split('.')[2] + select_statement = match.group(2) + create_script = f'{table_name} = spark.sql(""" {select_statement} """)' + persist_script = f'{table_name} = {table_name}.persist()' + view_script = f'{table_name}.createOrReplaceTempView("{table_name}")' + + for table in source_tables: + create_script = create_script.replace(table, table_df_mapping[table]) + + script.append(f"\n\t\t######################---------Creating table {create_script.split('=')[0].strip()}-------############################") + script.append(create_script) + script.append(persist_script) + script.append(view_script) + script.append(f'''print("{create_script.split('=')[0].strip()} count: ", {create_script.split('=')[0].strip()}.count()''') + + if 'INCR2_' in table_name: + x = table_name.split('INCR2_')[1] + if x in table_details_mapping.keys(): + script.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################") + script.append(f"{x}_max_update_date = INCR2_{x}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]") + script.append(f"{x}_max_source_reference_date = INCR2_{x}.agg(max(to_timestamp('{table_details_mapping[x][1].replace(x+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]") + script.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{x}',{x}_max_update_date,{x}_max_source_reference_date, max_batch_id, config)") + script.append('\n') + + for table in source_tables: + table_usage_count[table.split('.')[2]] -= 1 - for _,row in df.iterrows(): - field_name = row['Field_Name'] - source_table = row['Source Table'] - source_column = row['Source Column'] - joining_keys = row['Joining Keys'] - primary_key = row['Primary Key'] - direct_derived = row['Direct/Derived'] - join_type = row['Join Type'] - join_tables = row['Join Tables'] - join_condition = row['Join Condition'] - column_operation = row['Column Operations'] - alias = row['Alias'] - granularity = row['Granularity'] - filter_condition = row['Filter Condition'] - clauses = row['Clauses'] - ordering = row['Ordering'] + for table in source_tables: + if table_usage_count[table.split('.')[2]] == 0 and 'INCR1_' not in table: + unpersist_script = f"{table.split('.')[2]}.unpersist()" + script.append(unpersist_script) + + return '\n\t\t'.join(script) - if pd.notna(direct_derived): - if pd.notna(column_operation): - if len(column_operation.split()) == 1: - select_expr = f'{column_operation.upper()}({source_table}.{source_column})' - else: - select_expr = column_operation - else: - if pd.notna(source_table): - select_expr = f'{source_table}.{source_column}' - else: - select_expr = source_column + def generate_spark(proc_query, incr_join_grps, base_table, base_pk, incr_table_join_info, incr_join, temp_table_schema): + table_usage_count = defaultdict(int) + table_df_mapping = {} + + for query in proc_query: + if 'CREATE TABLE' or 'DELETE' in query: + source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE) + source_tables = [table for pair in source_tables for table in pair if table] + for table in source_tables: + table_usage_count[table.split('.')[2]] += 1 + if 'DELETE' not in query: + table_df_mapping[table] = table.split('.')[2] + + script = [] + for query in proc_query: + if 'CREATE TABLE' in query: + script.append(create_df(query, table_df_mapping,table_usage_count)) + + spark_query = [] + spark_query.append("\t\t######################---------Reading source data -------############################") + for table in table_details_mapping.keys(): + spark_query.append(f'{table} = read_file(spark, config, \"{table}\").filter("{table_details_mapping[table][2]}")') + spark_query.append(f'{table} = {table}.persist()') + spark_query.append(f'{table}.createOrReplaceTempView("{table}")') + spark_query.append(f'print("{table} count: ", {table}.count()') + spark_query.append('\n') + + spark_query.append("\n\t\t######################---------Reading records-------############################") + for table in table_details_mapping.keys(): + spark_query.append(f"{table}_max_update_date = read_max_update_date(redshift_conn, config['application_name'],'{table}', config)") + spark_query.append(f'{table}_max_update_date = {table}_max_update_date[0][0]') + spark_query.append('\n') + + incr1_spark = [] + temp_incr1 = [] + for _, group in incr_join_grps: + for table in _.split(): + if base_table != table: + join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')] + join_keys = [t.strip() for t in ','.join(base_pk).split(',')] + join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')] + join_cond = split_join_condition(incr_table_join_info[table]) + join_condition = join_incr(join_cond) + source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] - if source_column not in join_fields: - if pd.notna(alias): - select_expr += f' AS {alias}' - else: - if pd.notna(column_operation) and pd.notna(source_column): - select_expr += f' AS {source_column}' - - if direct_derived.upper() == 'DIRECT': - select_clause.append(select_expr) - elif direct_derived.upper() == 'DERIVED_BASE': - select_clause.append(select_expr) + join_key_list = [] + for x in join_keys: + join_key_list.append(f'{base_table}.{x}') + join_key = ', '.join(join_key_list) - if pd.notna(filter_condition): - where_clause.append(filter_condition) - - select_query = ',\n\t'.join(select_clause) - sql_query = f"CREATE TABLE {temp_table_schema}.{base_table}_BASE\nAS \n\tSELECT \n\t{select_query} \nFROM\n\t{incr_table_dict[base_table]} {base_table}" - if where_clause: - sql_query += f"\nWHERE {' AND'.join(where_clause)}" - sql_query += ';' - proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{base_table}_BASE;") - proc_query.append(sql_query) - - df['Clauses'].fillna('', inplace = True) - df['Ordering'].fillna('', inplace = True) - c = 1 - temp_base_table = f'{base_table}_BASE' - grp_cols = ['Join Condition', 'Clauses', 'Ordering'] - join_grps = df[df['Direct/Derived'] == 'DERIVED'].groupby(['Join Condition', 'Clauses', 'Ordering']) - temp_tables_sql = [] - for (join_condition,clauses,ordering), group in join_grps: - if pd.notna(group['Direct/Derived'].iloc[0]): - if group['Direct/Derived'].iloc[0].upper() == 'DERIVED': - join_tables = [t.strip() for t in group['Join Tables'].iloc[0].split(',')] - join_keys = [t.strip() for t in group['Joining Keys'].iloc[0].split(',')] - join_type = [t.strip() for t in group['Join Type'].iloc[0].split(',')] - join_condition = split_join_condition(group['Join Condition'].iloc[0]) - temp_table_name = f"TEMP_{group['Source Table'].iloc[0]}" - source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')] - alias = [t.strip() for t in (','.join(group['Alias'])).split(',')] - source_table = [t.strip() for t in (','.join(group['Source Table'])).split(',')] - - base_cols = [] - for join_key in join_keys: - base_cols.append(f'{join_tables[0]}.{join_key}') - - for s_table,col,alias in zip(source_table,source_column,alias): - if pd.notna(group['Column Operations'].iloc[0]): - if len(group['Column Operations'].iloc[0].split()) == 1: - select_expr = f"{group['Column Operations'].iloc[0].upper()}({s_table}.{col})" - else: - select_expr = group['Column Operations'].iloc[0] - else: - if pd.notna(s_table): - select_expr = f"{s_table}.{col}" - else: - select_expr = col - - if alias: - select_expr += f" AS {alias}" - base_cols.append(select_expr) - - if ordering: - base_cols.append(f"{ordering} AS RN") - - sql = ',\n\t\t'.join(base_cols) - - join_sql = f"SELECT \n\t\t{sql} \nFROM\n\t{incr_table_dict[base_table]} {join_tables[0]}" - for i in range(len(join_type)): - join_sql += f'\n\t{join_type[i]} JOIN {incr_table_dict[join_tables[i+1]]} {join_tables[i+1]} ON {join_condition[i]}' - if clauses: - join_sql += f'\n\t{clauses}' - join_sql += ';' - - proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{temp_table_name};") - proc_query.append(f"CREATE TABLE {temp_table_schema}.{temp_table_name}\nAS \n\t{join_sql}") - - granularity = [t.strip() for t in group['Granularity'].iloc[0].split(',')] - - sql = [] - for key in join_keys: - sql.append(f"A.{key} = B.{key}") - - temp_cols = [] - temp_cols.append('A.*') - - source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')] - alias = [t.strip() for t in (','.join(group['Alias'])).split(',')] - - for col,alias in zip(source_column,alias): - select_expr = f"B.{col}" - if alias: - select_expr = f"B.{alias}" - else: - select_expr = f"B.{col}" - temp_cols.append(select_expr) - - temp_select_query = ',\n\t\t'.join(temp_cols) - - proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.TEMP_{temp_table}_{c};") - - base_sql = f"CREATE TABLE {temp_table_schema}.TEMP_{temp_table}_{c}\nAS \n\tSELECT \n\t\t{temp_select_query} \nFROM\n\t{temp_table_schema}.{temp_base_table} AS A" - base_sql += f"\n\tLEFT OUTER JOIN {temp_table_schema}.{temp_table_name} B ON {' AND '.join(sql)}" - - if '1:1' in granularity and len(ordering) > 1: - base_sql += f" AND B.RN = 1" - base_sql += ';' - - temp_base_table = f'TEMP_{temp_table}_{c}' - c += 1 - proc_query.append(base_sql) - - fin_table_name = temp_table - fin_table_cols = [] - - for _,row in df.iterrows(): - field_name = row['Field_Name'] - source_table = row['Source Table'] - source_column = row['Source Column'] - alias = row['Alias'] - - if pd.notna(row['Direct/Derived']): - if (source_column in join_fields): - fin_table_cols.append(f'{source_column} AS "{field_name}"') - else: - fin_table_cols.append(f'"{field_name}"') + for y in source_table: + sql = f"""SELECT {join_key} FROM {base_table} {base_table}""" + + incr_join_text = '' + i=0 + for i in range(len(join_condition)): + sql += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' + incr_join_text += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' + + sql += f''' WHERE {join_tables[i+1]}._hoodie_commit_time > cast('"""+str({join_tables[i+1]}_max_update_date)+"""' as timestamp)''' + temp_incr1.append(sql) - fin_table_cols = ',\n\t\t'.join(fin_table_cols) - fin_sql = f"INSERT INTO {temp_table_schema}.{fin_table_name}\n\tSELECT \n\t\t{fin_table_cols} \nFROM\n\t{temp_table_schema}.TEMP_{temp_table}_{c-1};" - - - condition_col = '_'.join(incr_cols.replace(f'{base_table}.', '').split(',')) - proc_query.append(f"DELETE FROM {temp_table_schema}.{fin_table_name}\nWHERE {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} IN (SELECT {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} FROM {temp_table_schema}.INCR1_{temp_table});") - proc_query.append(fin_sql) - - for table in incr_tables: - sql = f""" - INSERT INTO audit.reportingdb_audit_tbl_{temp_table} - ( - SELECT - '{temp_table}' as mart_table_name, - '{table}' as src_table_name, - coalesce( max(TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS')),(select max(max_update_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) max_update_date, - CURRENT_TIMESTAMP as load_timestamp, - coalesce(max(prev_updt_ts),(select max(source_reference_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) AS source_reference_date, - max(nvl(batch_number,0))+1 - FROM {temp_table_schema}.{temp_table}_INCR where table_name = '{table}' - );""" - proc_query.append(sql) + else: + source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] + join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')] + + join_key_list = [] + for x in join_keys: + join_key_list.append(f'{base_table}.{x}') + join_key = ', '.join(join_key_list) - return base_table, base_pk, proc_query, incr_join_grps, incr_table_join_info, incr_join, temp_table_schema - -def create_df(query, table_df_mapping, table_usage_count): - script = [] - query = ' '.join(query.split()).strip() - match = re.match(r'CREATE TABLE (\w+\.\w+\.\w+) AS (SELECT .+)', query, re.IGNORECASE) - source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE) - source_tables = [table for pair in source_tables for table in pair if table] - - if not match: - raise ValueError('Invalid SQL') - table_name = match.group(1).split('.')[2] - select_statement = match.group(2) - create_script = f'{table_name} = spark.sql(""" {select_statement} """)' - persist_script = f'{table_name} = {table_name}.persist()' - view_script = f'{table_name}.createOrReplaceTempView("{table_name}")' - - for table in source_tables: - create_script = create_script.replace(table, table_df_mapping[table]) - - script.append(f"\n\t\t######################---------Creating table {create_script.split('=')[0].strip()}-------############################") - script.append(create_script) - script.append(persist_script) - script.append(view_script) - script.append(f'''print("{create_script.split('=')[0].strip()} count: ", {create_script.split('=')[0].strip()}.count()''') - - if 'INCR2_' in table_name: - x = table_name.split('INCR2_')[1] - if x in table_details_mapping.keys(): - script.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################") - script.append(f"{x}_max_update_date = INCR2_{x}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]") - script.append(f"{x}_max_source_reference_date = INCR2_{x}.agg(max(to_timestamp('{table_details_mapping[x][1].replace(x+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]") - script.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{x}',{x}_max_update_date,{x}_max_source_reference_date, max_batch_id, config)") - script.append('\n') - - for table in source_tables: - table_usage_count[table.split('.')[2]] -= 1 - - for table in source_tables: - if table_usage_count[table.split('.')[2]] == 0 and 'INCR1_' not in table: - unpersist_script = f"{table.split('.')[2]}.unpersist()" - script.append(unpersist_script) - - return '\n\t\t'.join(script) - -def generate_spark(proc_query, incr_join_grps, base_table, base_pk, incr_table_join_info, incr_join, temp_table_schema): - table_usage_count = defaultdict(int) - table_df_mapping = {} - - for query in proc_query: - if 'CREATE TABLE' or 'DELETE' in query: - source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE) - source_tables = [table for pair in source_tables for table in pair if table] - for table in source_tables: - table_usage_count[table.split('.')[2]] += 1 - if 'DELETE' not in query: - table_df_mapping[table] = table.split('.')[2] + sql = f'''SELECT {join_key} FROM {base_table} {base_table} WHERE {base_table}._hoodie_commit_time > cast('"""+str({base_table}_max_update_date)+"""' as timestamp)''' + incr1_spark.append(sql) + for i in temp_incr1: + incr1_spark.append(i) + incr1_spark = '\nUNION\n'.join(incr1_spark) + spark_query.append("\n\t\t######################---------Creating INCR1-------############################") + spark_query.append(f'INCR1_{temp_table} = spark.sql(""" {incr1_spark} """)') + spark_query.append(f'\n\t\tINCR1_{temp_table} = INCR1_{temp_table}.dropDuplicates()') + spark_query.append(f'INCR1_{temp_table} = INCR1_{temp_table}.persist()') + spark_query.append(f'INCR1_{temp_table}.createOrReplaceTempView("INCR1_{temp_table}")') + spark_query.append(f'print("INCR1_{temp_table} count: ", INCR1_{temp_table}.count())') + + spark_query.append("\n\t\t######################---------Creating INCR2-------############################") + for table in table_details_mapping.keys(): + if table in incr_join.keys(): + p = [x for x in incr_join[table].split('\n\t') if len(x) > 1] + if len(p) > 1: + spark_query.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################") + spark_query.append(f"{table}_max_update_date = {table}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]") + spark_query.append(f"{table}_max_source_reference_date = {table}.agg(max(to_timestamp('{table_details_mapping[table][1].replace(table+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]") + spark_query.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{table}',{table}_max_update_date,{table}_max_source_reference_date, max_batch_id, config)") + spark_query.append('\n') + + for query in script: + spark_query.append(query) + spark_query.append('\n') + + spark_query1 = [] + spark_query1.append('\n') + for query in proc_query: + if f'{temp_table_schema}.{temp_table}\n' in query: + final_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE) + final_tables = [table.split('.')[2].strip() for pair in final_tables for table in pair if table and table.split('.')[2].strip() != temp_table][0] + if 'INCR1_' in final_tables: + spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['incr2df_path'])") + else: + spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['resultdf_path'])") + spark_query1.append(f'''cur.execute(""" {query} """)''') + spark_query1.append('\n') + + with open('template.txt') as file: + template = file.read() + + result = template.replace('INSERT_CODE_1', '\n\t\t'.join(spark_query)) + result = result.replace('INSERT_CODE_2', '\t\t'.join(spark_query1)) - script = [] - for query in proc_query: - if 'CREATE TABLE' in query: - script.append(create_df(query, table_df_mapping,table_usage_count)) + return result - spark_query = [] - spark_query.append("\t\t######################---------Reading source data -------############################") - for table in table_details_mapping.keys(): - spark_query.append(f'{table} = read_file(spark, config, \"{table}\").filter("{table_details_mapping[table][2]}")') - spark_query.append(f'{table} = {table}.persist()') - spark_query.append(f'{table}.createOrReplaceTempView("{table}")') - spark_query.append(f'print("{table} count: ", {table}.count()') - spark_query.append('\n') - spark_query.append("\n\t\t######################---------Reading records-------############################") - for table in table_details_mapping.keys(): - spark_query.append(f"{table}_max_update_date = read_max_update_date(redshift_conn, config['application_name'],'{table}', config)") - spark_query.append(f'{table}_max_update_date = {table}_max_update_date[0][0]') - spark_query.append('\n') - incr1_spark = [] - temp_incr1 = [] - for _, group in incr_join_grps: - for table in _.split(): - if base_table != table: - join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')] - join_keys = [t.strip() for t in ','.join(base_pk).split(',')] - join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')] - join_cond = split_join_condition(incr_table_join_info[table]) - join_condition = join_incr(join_cond) - source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] - - join_key_list = [] - for x in join_keys: - join_key_list.append(f'{base_table}.{x}') - join_key = ', '.join(join_key_list) + st.set_page_config(page_title='AUTOMATED SOURCE TO TARGET MAPPING', layout= 'wide') + st.markdown(""" + + """, unsafe_allow_html=True) + st.subheader('AUTOMATED SOURCE TO TARGET MAPPING') + mode= st.selectbox('Select Mode of Mapping',('Supervised Mapping(You Have Sufficient Sample Data in Target Template)', 'Unsupervised Mapping(You Do Not Have Sufficient Sample Data in Target Template)'), index=None,placeholder='Select category of table') + if mode == 'Supervised Mapping(You Have Sufficient Sample Data in Target Template)': + conn = pymssql.connect( "Server=sql-ext-dev-uks-001.database.windows.net;" + "Database=sqldb-ext-dev-uks-001;" + "UID=dbadmin;" + "PWD=mYpa$$w0rD" ) + query1="select * from INFORMATION_SCHEMA.TABLES where TABLE_SCHEMA='dbo' ORDER BY TABLE_NAME ASC" + table1=pd.read_sql_query(query1,con=conn) + st.session_state.table1_un= table1 + table1['TABLE_NAME']=table1['TABLE_NAME'].astype('str') + colsel1, colsel2= st.columns(2) + with colsel1: + table_selector=st.selectbox('SOURCE TABLE NAME',['TCM', 'TCVM','TEM', 'TPM', 'TPP', 'TPT', 'TRM', 'TSCM', 'TSM'],index=None,placeholder='Select table for automated column mapping') + with colsel2: + target_selector=st.selectbox('TARGET TABLE NAME',['POLICY_MAPPINGTARGET_TBL','FINANCE_MAAPINGTARGET_TBL','CUSTOMER_MASTER_TARGET'],index=None,placeholder='Select target table') + st.session_state.target_selector_un = target_selector + #migrate_opt=st.toggle('DO YOU ALSO WANT TO MIGRATE DATA TO TARGET TABLE') + if table_selector is not None and target_selector is not None: + btn=button('RUN',key='RUN_GENAI_UN') + if target_selector is not None and btn and f'{table_selector}_{target_selector}_map_un' not in st.session_state: + query2="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+table_selector+"]" + i_df = pd.read_sql_query(query2,con=conn) + # conn.close() + i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1) + st.session_state['source_data_un'] = i_df + #st.markdown('---') + # st.subheader('Souce Data Preview') + # st.dataframe(i_df) + query3="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"]" + tgt_df=pd.read_sql_query(query3,con=conn).reset_index(drop=True) + main_list=tgt_df.columns.to_list() + sub_list=['ID','LOADID','FILE_NAME'] + if any(main_list[i:i+len(sub_list)] == sub_list for i in range(len(main_list) - len(sub_list) + 1)): + tgt_df=tgt_df.drop(['ID','LOADID','FILE_NAME'],axis=1) + st.session_state.opt_un= list(tgt_df.columns) + st.session_state['target_data_un'] = tgt_df.head(20).reset_index() + # if tgt: + # # st.subheader('Target Table Preview') + # # st.write(tgt_df.sample(20).reset_index(drop=True)) + # # st.markdown('---') - for y in source_table: - sql = f"""SELECT {join_key} FROM {base_table} {base_table}""" - - incr_join_text = '' - i=0 - for i in range(len(join_condition)): - sql += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' - incr_join_text += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' - - sql += f''' WHERE {join_tables[i+1]}._hoodie_commit_time > cast('"""+str({join_tables[i+1]}_max_update_date)+"""' as timestamp)''' - temp_incr1.append(sql) + with st.spinner('Running data on neural network...'): + df=pd.read_csv('C:\\Applications\\MARCO POLO O AIML\\DATA CATALOG\\pages\\CUSTOMER_MASTER_TRAIN_1306.csv') #POLICY + cols=df.columns.tolist() + data=pd.DataFrame(columns=['DATA','LABEL']) + temp=pd.DataFrame(columns=['DATA','LABEL']) + for x in cols: + temp['DATA']=df[x] + temp['LABEL']=x + data=pd.concat([data,temp],ignore_index=True) + data['DATA']=data['DATA'].astype('string') + data['LABEL']=data['LABEL'].astype('string') + data=data.dropna() + data=data.reset_index(drop=True) + + + + + #FEATURE_EXTRACTION BAG OF CHARACTERS + vectorizer = CountVectorizer(analyzer='char_wb', ngram_range=(1, 3), min_df=1) + X=vectorizer.fit_transform(data['DATA']) + feature=pd.DataFrame(data=X.toarray(),columns=vectorizer.get_feature_names_out()) + data1=pd.concat([data,feature],axis=1) + + #FEATURE_SELECTION + from sklearn.feature_selection import chi2 + chi_x=data1.drop(['DATA','LABEL'],axis=1) + chi_y=data1['LABEL'] + chi_scores=chi2(chi_x,chi_y) + p_values=pd.Series(chi_scores[1],index=chi_x.columns) + p_values=p_values.sort_values(ascending=True).reset_index() + feature_chi=p_values['index'][:1000] + data2=data1[feature_chi.to_list()] + data2=pd.concat([data,data2],axis=1) + + #FEATURE EXTRACTION GENERAL + def count_digits(str1): + return len("".join(re.findall("\d+", str1))) + + def count_vowels(string): + vowels = "aeiouAEIOU" + count = 0 + for char in string: + if char in vowels: + count += 1 + return count + + def count_special_character(string): + special_characters = "!@#$%^&*()-+?_=,<>/" + special_char = 0 + for i in range(0, len(string)): + if (string[i] in special_characters): + special_char += 1 + return special_char + + def count_spaces(string): + spaces = 0 + for char in string: + if char == " ": + spaces += 1 + return spaces + + data2['LENGTH']=data2['DATA'].apply(lambda x:len(x)) + data2['digit_c']=data2['DATA'].apply(lambda x:count_digits(x)) + data2['vowel_c']=data2['DATA'].apply(lambda x:count_vowels(x)) + data2['spchar_c']=data2['DATA'].apply(lambda x:count_special_character(x)) + data2['space_c']=data2['DATA'].apply(lambda x:count_spaces(x)) + + chi_scores1=chi2(data2[['LENGTH','digit_c','vowel_c','spchar_c','space_c']],data2['LABEL']) + p_values1=pd.Series(chi_scores1[1],index=data2[['LENGTH','digit_c','vowel_c','spchar_c','space_c']].columns).sort_values(ascending=True).reset_index() + + #MODEL + import tensorflow as tf + from tensorflow.keras import layers + from tensorflow import keras + + from sklearn.model_selection import train_test_split + from ast import literal_eval + + train_df, test_df = train_test_split(data2,test_size=.1,stratify=data2['LABEL'].values) + val_df = test_df.sample(frac=0.5) + test_df.drop(val_df.index, inplace=True) + + terms = tf.ragged.constant(data2['LABEL'].values) + lookup = tf.keras.layers.StringLookup(output_mode="one_hot") + lookup.adapt(terms) + vocab = lookup.get_vocabulary() + + def invert_multi_hot(encoded_labels): + hot_indices = np.argwhere(encoded_labels == 1.0)[..., 0] + return np.take(vocab, hot_indices) + + max_seqlen = 150 + batch_size = 128 + padding_token = "" + auto = tf.data.AUTOTUNE + + feature_tf=data2.columns.tolist()[2:] + + def make_dataset(dataframe,feature,batch_size,is_train=True): + labels = tf.ragged.constant(dataframe["LABEL"].values) + label_binarized = lookup(labels).numpy() + dataset = tf.data.Dataset.from_tensor_slices( + (dataframe[feature].values, label_binarized) + ) + dataset = dataset.shuffle(batch_size * 10) if is_train else dataset + return dataset.batch(batch_size) + + train_dataset = make_dataset(train_df,feature_tf,batch_size, is_train=True) + validation_dataset = make_dataset(val_df,feature_tf,batch_size, is_train=False) + test_dataset = make_dataset(test_df,feature_tf,batch_size, is_train=False) + + + shallow_mlp_model = keras.Sequential( + [ + layers.Dense(512, activation="relu"), + layers.Dense(256, activation="relu"), + layers.Dense(lookup.vocabulary_size(), activation="softmax"), + ] + ) + + shallow_mlp_model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["CategoricalAccuracy"]) + epochs=20 + history = shallow_mlp_model.fit(train_dataset, validation_data=validation_dataset, epochs=epochs) + + #MODEL TEST + _, category_acc = shallow_mlp_model.evaluate(test_dataset) + + + #INPUT PREPROCESSING + + i_cols=i_df.columns + i_cols=i_df.columns.tolist() + i_data=pd.DataFrame(columns=['DATA','LABEL']) + i_temp=pd.DataFrame(columns=['DATA','LABEL']) + for x in i_cols: + i_temp['DATA']=i_df[x] + i_temp['LABEL']=x + i_data=pd.concat([i_data,i_temp],ignore_index=True) + i_data['DATA']=i_data['DATA'].astype('string') + i_data['LABEL']=i_data['LABEL'].astype('string') + i_data=i_data.dropna() + i_data=i_data.reset_index(drop=True) + i_X=vectorizer.transform(i_data['DATA']) + i_feature=pd.DataFrame(data=i_X.toarray(),columns=vectorizer.get_feature_names_out()) + i_data1=pd.concat([i_data,i_feature],axis=1) + i_data2=i_data1[feature_chi.to_list()] + i_data2=pd.concat([i_data,i_data2],axis=1) + i_data2['LENGTH']=i_data2['DATA'].apply(lambda x:len(x)) + i_data2['digit_c']=i_data2['DATA'].apply(lambda x:count_digits(x)) + i_data2['vowel_c']=i_data2['DATA'].apply(lambda x:count_vowels(x)) + i_data2['spchar_c']=i_data2['DATA'].apply(lambda x:count_special_character(x)) + i_data2['space_c']=i_data2['DATA'].apply(lambda x:count_spaces(x)) + i_run_dataset=tf.data.Dataset.from_tensor_slices((i_data2[feature_tf].values,lookup(tf.ragged.constant(i_data2["LABEL"].values)).numpy())).batch(649) + + + i_predicted_probabilities = shallow_mlp_model.predict(i_run_dataset) + i_predicted_labels = np.where(i_predicted_probabilities == i_predicted_probabilities.max(axis=1, keepdims=True), 1, 0) + i_predicted_label_df=pd.DataFrame(i_predicted_labels,columns=vocab) + i_predicted_label_df1=pd.concat([i_data,i_predicted_label_df],axis=1) + i_predicted_label_df1['PREDICTION']=i_predicted_label_df1[vocab].idxmax(axis=1) + i_result=i_predicted_label_df1[['DATA','LABEL','PREDICTION']] + + column_mapping=pd.DataFrame(columns=['source','target']) + temp_column_mapping=pd.DataFrame(columns=['source','target']) + for i in i_df.columns.to_list(): + temp_df1=i_result.loc[i_result['LABEL']==i] + temp_max=temp_df1['PREDICTION'].value_counts().idxmax() + temp_column_mapping.loc[0]=[i,temp_max] + column_mapping=pd.concat([column_mapping,temp_column_mapping],ignore_index=True) + not_null=i_df.count().reset_index() + tot_rows=i_df.shape[0] + not_null['not null percentage']=not_null[0]/tot_rows + coltobemodified=not_null[not_null['not null percentage']<.05]['index'].to_list() + column_mapping.loc[column_mapping['source'].isin(coltobemodified), 'target'] = '**TOO FEW COLUMN VALUES**' + st.success('Mapping completed successfully!') + st.session_state[f'{table_selector}_{target_selector}_map_un'] = column_mapping.copy() + # st.subheader('MAPPED COLUMN') + # st.dataframe(column_mapping) - else: - source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] - join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')] - join_key_list = [] - for x in join_keys: - join_key_list.append(f'{base_table}.{x}') - join_key = ', '.join(join_key_list) + if f'{table_selector}_{target_selector}_map_un' in st.session_state and btn: + taba, tabb, tabc = st.tabs(['Mappings Generated', 'Source Table Preview', 'Target Table Preview']) + with tabb: + st.subheader('Souce Data Preview') + with stylable_container( + key=f"source_container_with_border", + css_styles=""" + { + border: 1px solid white; + border-radius: 0.5rem; + padding: calc(1em - 1px); + width: 103%; /* Set container width to 100% */ + } + """ + ): + st.dataframe(st.session_state['source_data_un']) + with tabc: + st.subheader('Target Table Preview') + with stylable_container( + key=f"target_container_with_border", + css_styles=""" + { + border: 1px solid white; + border-radius: 0.5rem; + padding: calc(1em - 1px); + width: 103%; /* Set container width to 100% */ + } + """ + ): + st.write(st.session_state['target_data_un']) + with taba: + st.subheader("Mapping Generated:") + with stylable_container( + key=f"container_with_border", + css_styles=""" + { + border: 1px solid white; + border-radius: 0.5rem; + padding: calc(1em - 1px); + width: 103%; /* Set container width to 100% */ + } + """ + ): + + edited_map_df = st.data_editor( + st.session_state[f'{table_selector}_{target_selector}_map_un'], + column_config={ + "target": st.column_config.SelectboxColumn( + "Available Column Names", + help="Please Verify/Change the Target Column Mapping", + width="medium", + options=st.session_state.opt_un, + required=True, + ) + }, + hide_index=False, + num_rows = 'fixed', + use_container_width = True + ) + val = button("Validate", key="Val_un") + if val: + st.session_state[f'{table_selector}_{target_selector}_map_un'].update(edited_map_df) + dup= len(st.session_state[f'{table_selector}_{target_selector}_map_un'][st.session_state[f'{table_selector}_{target_selector}_map_un']['target'].duplicated()]) + if dup != 0: + dup_index= list(st.session_state[f'{table_selector}_{target_selector}_map_un'][st.session_state[f'{table_selector}_{target_selector}_map_un']['target'].duplicated(keep=False)].index) + dup_mess=str(dup_index[0]) + for val in dup_index[1:]: + dup_mess = dup_mess + f' and {str(val)}' + st.error(f"One to Many Column mapping Exists. Please Check Mapping Number: {dup_mess}") + else: + st.success("Mapping Validated! You can proceed for Mapping") + + migrate= st.button("Mapping") + if migrate: + st.subheader('Mapping PHASE') + m_queiry1="select count(*) as TARGET_COUNT_CURRENT from ["+ st.session_state.table1_un['TABLE_SCHEMA'][0]+"].["+st.session_state.target_selector_un+"]" + #st.write(m_queiry1) + old_count=pd.read_sql_query(m_queiry1,con=conn) + st.write('RECORDS IN TARGET TABLE BEFORE Mapping',old_count) + with st.spinner('Mapping in progress...'): + cursor1=conn.cursor() + q1='INSERT INTO ['+ st.session_state.table1_un['TABLE_SCHEMA'][0]+'].['+st.session_state.target_selector_un+'] ("' + q2=' select "' + for i,x in enumerate(st.session_state['source_data_un'].columns.values.tolist()): + t=st.session_state[f'{table_selector}_{target_selector}_map_un'].loc[st.session_state[f'{table_selector}_{target_selector}_map_un']['source']==x,'target'].values[0] + if i==len(st.session_state['source_data_un'].columns.values.tolist())-1: + q_temp1=t+'") ' + q_temp2=x+'" ' + else: + q_temp1=t+'", "' + q_temp2=x+'", "' + q1=q1+q_temp1 + q2=q2+q_temp2 + #q_temp='INSERT INTO ['+ table1['TABLE_SCHEMA'][0]+'].['+target_selector+'] ("'+t+'") select "'+x+'" from ['+ table1['TABLE_SCHEMA'][0]+'].['+table_selector+']' + # st.write(q) + q=q1+q2+' from ['+ st.session_state.table1_un['TABLE_SCHEMA'][0]+'].['+table_selector+']' + #st.write(q) + cursor1.execute(q) + conn.commit() + # m_query2="UPDATE ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"] SET ID=9999 WHERE ID IS NULL" + # # cur_time=datetime.datetime.now().time().strftime("%Y%m%d%H%M%S") + # m_query3="UPDATE ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"] SET LOADID='LOADEDBYAI' WHERE LOADID IS NULL" + # m_query4="UPDATE ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"] SET FILE_NAME='AUTOMATED_INSERT' WHERE FILE_NAME IS NULL" + # cursor1.execute(m_query2) + # cursor1.execute(m_query3) + # cursor1.execute(m_query4) + # conn.commit() + st.success('Mapping completed successfully!') + m_query5="select count(*) as TARGET_COUNT_AFTER_Mapping from ["+ st.session_state.table1_un['TABLE_SCHEMA'][0]+"].["+st.session_state.target_selector_un+"]" + new_count=pd.read_sql_query(m_query5,con=conn) + conn.close() + st.write('RECORDS IN TARGET TABLE AFTER Mapping',new_count) + + + + if mode == 'Unsupervised Mapping(You Do Not Have Sufficient Sample Data in Target Template)': + conn = pymssql.connect("Server=sql-ext-dev-uks-001.database.windows.net;" + "Database=sqldb-ext-dev-uks-001;" + "UID=dbadmin;" + "PWD=mYpa$$w0rD" ) + query1="select * from INFORMATION_SCHEMA.TABLES where TABLE_SCHEMA='dbo' ORDER BY TABLE_NAME ASC" + table1=pd.read_sql_query(query1,con=conn) + st.session_state.table1= table1 + table1['TABLE_NAME']=table1['TABLE_NAME'].astype('str') + #col2sel1, col2sel2 = st.columns(2) + + table_selector=st.multiselect('SOURCE TABLE NAME(S)',['TCM', 'TCVM','TEM', 'TPM', 'TPP', 'TPT', 'TRM', 'TSCM', 'TSM'],default=None,placeholder='Select table for automated column mapping') + + #target_selector=st.selectbox('TARGET TABLE NAME',['POLICY_MAPPINGTARGET_TBL','FINANCE_MAPPINGTARGET_TBL','CUSTOMER_MASTER_TARGET'],index=None,placeholder='Select target table') + + target_selector = st.file_uploader("UPLOAD TARGET METADATA FILE", type=['csv']) + tgt_name=None + mapping_df=None + if target_selector is not None: + mapping_df = pd.read_csv(target_selector) + tgt_name = target_selector.name + + required_columns = ['Field_Name', 'Primary Key'] + if all(col in mapping_df.columns for col in required_columns): + field_names = mapping_df['Field_Name'].tolist() + tgt_df = pd.DataFrame(columns=field_names) + + st.session_state.target_selector = target_selector + # mapping_selector = target_selector + # st.session_state.mapping_selector = mapping_selector + if mapping_df is not None: + st.session_state.mapping_df = mapping_df + + if table_selector is not None: + if len(table_selector)==1: + query2="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+str(table_selector[0])+"]" + i_df = pd.read_sql_query(query2,con=conn) + # conn.close() + if set(['ID','LOADID','FILE_NAME']).issubset(i_df.columns): + i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1) + elif len(table_selector)>1: + + dataframes = {} + col_names = [] + for tab in table_selector: + query2_2= "select * from [dbo].["+tab+"]" + dataframes[f'{tab}'] = pd.read_sql_query(query2_2,con=conn) + col_names = col_names + list(dataframes[f'{tab}'].columns) + + tab_names = table_selector + metadata = MultiTableMetadata() + metadata.detect_from_dataframes( + data= dataframes + ) + multi_python_dict = metadata.to_dict() + + rlist1=multi_python_dict['relationships'] + relationships=pd.DataFrame(columns=['PARENT TABLE','CHILD TABLE','PARENT PRIMARY KEY','CHILD FOREIGN KEY']) + for i in range(len(rlist1)): + rlist=rlist1[i] + nrow=pd.DataFrame({'PARENT TABLE':rlist['parent_table_name'],'CHILD TABLE':rlist['child_table_name'],'PARENT PRIMARY KEY':rlist['parent_primary_key'],'CHILD FOREIGN KEY':rlist['child_foreign_key']},index=[i]) + relationships=pd.concat([relationships,nrow],ignore_index=True) + + filtered_relationships = relationships[ + (relationships['PARENT TABLE'].isin(table_selector)) & + (relationships['CHILD TABLE'].isin(table_selector)) + ] + + i_df = pd.DataFrame() + + for _, row in filtered_relationships.iterrows(): + parent_table = row['PARENT TABLE'] + child_table = row['CHILD TABLE'] + parent_key = row['PARENT PRIMARY KEY'] + child_key = row['CHILD FOREIGN KEY'] + + if parent_table in dataframes and child_table in dataframes: + parent_df = dataframes[parent_table] + child_df = dataframes[child_table] + + left_joined_df = pd.merge( + parent_df, child_df, how='left', + left_on=parent_key, right_on=child_key, + suffixes=(f'_{parent_table}', f'_{child_table}') + ) - sql = f'''SELECT {join_key} FROM {base_table} {base_table} WHERE {base_table}._hoodie_commit_time > cast('"""+str({base_table}_max_update_date)+"""' as timestamp)''' - incr1_spark.append(sql) - for i in temp_incr1: - incr1_spark.append(i) - incr1_spark = '\nUNION\n'.join(incr1_spark) - spark_query.append("\n\t\t######################---------Creating INCR1-------############################") - spark_query.append(f'INCR1_{temp_table} = spark.sql(""" {incr1_spark} """)') - spark_query.append(f'\n\t\tINCR1_{temp_table} = INCR1_{temp_table}.dropDuplicates()') - spark_query.append(f'INCR1_{temp_table} = INCR1_{temp_table}.persist()') - spark_query.append(f'INCR1_{temp_table}.createOrReplaceTempView("INCR1_{temp_table}")') - spark_query.append(f'print("INCR1_{temp_table} count: ", INCR1_{temp_table}.count())') + for col in child_df.columns: + if col != child_key: + left_joined_df.rename( + columns={col: f'{col}_{parent_table}_{child_table}'}, inplace=True + ) - spark_query.append("\n\t\t######################---------Creating INCR2-------############################") - for table in table_details_mapping.keys(): - if table in incr_join.keys(): - p = [x for x in incr_join[table].split('\n\t') if len(x) > 1] - if len(p) > 1: - spark_query.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################") - spark_query.append(f"{table}_max_update_date = {table}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]") - spark_query.append(f"{table}_max_source_reference_date = {table}.agg(max(to_timestamp('{table_details_mapping[table][1].replace(table+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]") - spark_query.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{table}',{table}_max_update_date,{table}_max_source_reference_date, max_batch_id, config)") - spark_query.append('\n') + right_joined_df = pd.merge( + parent_df, child_df, how='left', + left_on=child_key, right_on=parent_key, + suffixes=(f'_{child_table}', f'_{parent_table}') + ) - for query in script: - spark_query.append(query) - spark_query.append('\n') + for col in child_df.columns: + if col != child_key: + left_joined_df.rename( + columns={col: f'{col}_{child_table}_{parent_table}'}, inplace=True + ) - spark_query1 = [] - spark_query1.append('\n') - for query in proc_query: - if f'{temp_table_schema}.{temp_table}\n' in query: - final_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE) - final_tables = [table.split('.')[2].strip() for pair in final_tables for table in pair if table and table.split('.')[2].strip() != temp_table][0] - if 'INCR1_' in final_tables: - spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['incr2df_path'])") - else: - spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['resultdf_path'])") - spark_query1.append(f'''cur.execute(""" {query} """)''') - spark_query1.append('\n') + i_df = pd.concat([i_df, left_joined_df, right_joined_df], ignore_index=True) - with open('template.txt') as file: - template = file.read() + i_df = i_df.loc[:, ~i_df.columns.duplicated()] - result = template.replace('INSERT_CODE_1', '\n\t\t'.join(spark_query)) - result = result.replace('INSERT_CODE_2', '\t\t'.join(spark_query1)) - - return result - - - -st.set_page_config(page_title='AUTOMATED SOURCE TO TARGET MAPPING', layout= 'wide') -st.markdown(""" - - """, unsafe_allow_html=True) -st.subheader('AUTOMATED SOURCE TO TARGET MAPPING') -mode= st.selectbox('Select Mode of Mapping',('Supervised Mapping(You Have Sufficient Sample Data in Target Template)', 'Unsupervised Mapping(You Do Not Have Sufficient Sample Data in Target Template)'), index=None,placeholder='Select category of table') -if mode == 'Supervised Mapping(You Have Sufficient Sample Data in Target Template)': - conn = pymssql.connect( "Server=sql-ext-dev-uks-001.database.windows.net;" - "Database=sqldb-ext-dev-uks-001;" - "UID=dbadmin;" - "PWD=mYpa$$w0rD" ) - query1="select * from INFORMATION_SCHEMA.TABLES where TABLE_SCHEMA='dbo' ORDER BY TABLE_NAME ASC" - table1=pd.read_sql_query(query1,con=conn) - st.session_state.table1_un= table1 - table1['TABLE_NAME']=table1['TABLE_NAME'].astype('str') - colsel1, colsel2= st.columns(2) - with colsel1: - table_selector=st.selectbox('SOURCE TABLE NAME',['TCM', 'TCVM','TEM', 'TPM', 'TPP', 'TPT', 'TRM', 'TSCM', 'TSM'],index=None,placeholder='Select table for automated column mapping') - with colsel2: - target_selector=st.selectbox('TARGET TABLE NAME',['POLICY_MAPPINGTARGET_TBL','FINANCE_MAAPINGTARGET_TBL','CUSTOMER_MASTER_TARGET'],index=None,placeholder='Select target table') - st.session_state.target_selector_un = target_selector - #migrate_opt=st.toggle('DO YOU ALSO WANT TO MIGRATE DATA TO TARGET TABLE') - if table_selector is not None and target_selector is not None: - btn=button('RUN',key='RUN_GENAI_UN') - if target_selector is not None and btn and f'{table_selector}_{target_selector}_map_un' not in st.session_state: - query2="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+table_selector+"]" - i_df = pd.read_sql_query(query2,con=conn) - # conn.close() - i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1) - st.session_state['source_data_un'] = i_df - #st.markdown('---') - # st.subheader('Souce Data Preview') - # st.dataframe(i_df) - query3="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"]" - tgt_df=pd.read_sql_query(query3,con=conn).reset_index(drop=True) - main_list=tgt_df.columns.to_list() - sub_list=['ID','LOADID','FILE_NAME'] - if any(main_list[i:i+len(sub_list)] == sub_list for i in range(len(main_list) - len(sub_list) + 1)): - tgt_df=tgt_df.drop(['ID','LOADID','FILE_NAME'],axis=1) - st.session_state.opt_un= list(tgt_df.columns) - st.session_state['target_data_un'] = tgt_df.head(20).reset_index() - # if tgt: - # # st.subheader('Target Table Preview') - # # st.write(tgt_df.sample(20).reset_index(drop=True)) - # # st.markdown('---') - - with st.spinner('Running data on neural network...'): - df=pd.read_csv('C:\\Applications\\MARCO POLO O AIML\\DATA CATALOG\\pages\\CUSTOMER_MASTER_TRAIN_1306.csv') #POLICY - cols=df.columns.tolist() - data=pd.DataFrame(columns=['DATA','LABEL']) - temp=pd.DataFrame(columns=['DATA','LABEL']) - for x in cols: - temp['DATA']=df[x] - temp['LABEL']=x - data=pd.concat([data,temp],ignore_index=True) - data['DATA']=data['DATA'].astype('string') - data['LABEL']=data['LABEL'].astype('string') - data=data.dropna() - data=data.reset_index(drop=True) - - - - - #FEATURE_EXTRACTION BAG OF CHARACTERS - vectorizer = CountVectorizer(analyzer='char_wb', ngram_range=(1, 3), min_df=1) - X=vectorizer.fit_transform(data['DATA']) - feature=pd.DataFrame(data=X.toarray(),columns=vectorizer.get_feature_names_out()) - data1=pd.concat([data,feature],axis=1) - - #FEATURE_SELECTION - from sklearn.feature_selection import chi2 - chi_x=data1.drop(['DATA','LABEL'],axis=1) - chi_y=data1['LABEL'] - chi_scores=chi2(chi_x,chi_y) - p_values=pd.Series(chi_scores[1],index=chi_x.columns) - p_values=p_values.sort_values(ascending=True).reset_index() - feature_chi=p_values['index'][:1000] - data2=data1[feature_chi.to_list()] - data2=pd.concat([data,data2],axis=1) - - #FEATURE EXTRACTION GENERAL - def count_digits(str1): - return len("".join(re.findall("\d+", str1))) - - def count_vowels(string): - vowels = "aeiouAEIOU" - count = 0 - for char in string: - if char in vowels: - count += 1 - return count - - def count_special_character(string): - special_characters = "!@#$%^&*()-+?_=,<>/" - special_char = 0 - for i in range(0, len(string)): - if (string[i] in special_characters): - special_char += 1 - return special_char - - def count_spaces(string): - spaces = 0 - for char in string: - if char == " ": - spaces += 1 - return spaces - - data2['LENGTH']=data2['DATA'].apply(lambda x:len(x)) - data2['digit_c']=data2['DATA'].apply(lambda x:count_digits(x)) - data2['vowel_c']=data2['DATA'].apply(lambda x:count_vowels(x)) - data2['spchar_c']=data2['DATA'].apply(lambda x:count_special_character(x)) - data2['space_c']=data2['DATA'].apply(lambda x:count_spaces(x)) - - chi_scores1=chi2(data2[['LENGTH','digit_c','vowel_c','spchar_c','space_c']],data2['LABEL']) - p_values1=pd.Series(chi_scores1[1],index=data2[['LENGTH','digit_c','vowel_c','spchar_c','space_c']].columns).sort_values(ascending=True).reset_index() - - #MODEL - import tensorflow as tf - from tensorflow.keras import layers - from tensorflow import keras - - from sklearn.model_selection import train_test_split - from ast import literal_eval - - train_df, test_df = train_test_split(data2,test_size=.1,stratify=data2['LABEL'].values) - val_df = test_df.sample(frac=0.5) - test_df.drop(val_df.index, inplace=True) - - terms = tf.ragged.constant(data2['LABEL'].values) - lookup = tf.keras.layers.StringLookup(output_mode="one_hot") - lookup.adapt(terms) - vocab = lookup.get_vocabulary() - - def invert_multi_hot(encoded_labels): - hot_indices = np.argwhere(encoded_labels == 1.0)[..., 0] - return np.take(vocab, hot_indices) - - max_seqlen = 150 - batch_size = 128 - padding_token = "" - auto = tf.data.AUTOTUNE - - feature_tf=data2.columns.tolist()[2:] - - def make_dataset(dataframe,feature,batch_size,is_train=True): - labels = tf.ragged.constant(dataframe["LABEL"].values) - label_binarized = lookup(labels).numpy() - dataset = tf.data.Dataset.from_tensor_slices( - (dataframe[feature].values, label_binarized) - ) - dataset = dataset.shuffle(batch_size * 10) if is_train else dataset - return dataset.batch(batch_size) - - train_dataset = make_dataset(train_df,feature_tf,batch_size, is_train=True) - validation_dataset = make_dataset(val_df,feature_tf,batch_size, is_train=False) - test_dataset = make_dataset(test_df,feature_tf,batch_size, is_train=False) - - - shallow_mlp_model = keras.Sequential( - [ - layers.Dense(512, activation="relu"), - layers.Dense(256, activation="relu"), - layers.Dense(lookup.vocabulary_size(), activation="softmax"), - ] - ) - - shallow_mlp_model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["CategoricalAccuracy"]) - epochs=20 - history = shallow_mlp_model.fit(train_dataset, validation_data=validation_dataset, epochs=epochs) - - #MODEL TEST - _, category_acc = shallow_mlp_model.evaluate(test_dataset) - - - #INPUT PREPROCESSING - - i_cols=i_df.columns - i_cols=i_df.columns.tolist() - i_data=pd.DataFrame(columns=['DATA','LABEL']) - i_temp=pd.DataFrame(columns=['DATA','LABEL']) - for x in i_cols: - i_temp['DATA']=i_df[x] - i_temp['LABEL']=x - i_data=pd.concat([i_data,i_temp],ignore_index=True) - i_data['DATA']=i_data['DATA'].astype('string') - i_data['LABEL']=i_data['LABEL'].astype('string') - i_data=i_data.dropna() - i_data=i_data.reset_index(drop=True) - i_X=vectorizer.transform(i_data['DATA']) - i_feature=pd.DataFrame(data=i_X.toarray(),columns=vectorizer.get_feature_names_out()) - i_data1=pd.concat([i_data,i_feature],axis=1) - i_data2=i_data1[feature_chi.to_list()] - i_data2=pd.concat([i_data,i_data2],axis=1) - i_data2['LENGTH']=i_data2['DATA'].apply(lambda x:len(x)) - i_data2['digit_c']=i_data2['DATA'].apply(lambda x:count_digits(x)) - i_data2['vowel_c']=i_data2['DATA'].apply(lambda x:count_vowels(x)) - i_data2['spchar_c']=i_data2['DATA'].apply(lambda x:count_special_character(x)) - i_data2['space_c']=i_data2['DATA'].apply(lambda x:count_spaces(x)) - i_run_dataset=tf.data.Dataset.from_tensor_slices((i_data2[feature_tf].values,lookup(tf.ragged.constant(i_data2["LABEL"].values)).numpy())).batch(649) - - - i_predicted_probabilities = shallow_mlp_model.predict(i_run_dataset) - i_predicted_labels = np.where(i_predicted_probabilities == i_predicted_probabilities.max(axis=1, keepdims=True), 1, 0) - i_predicted_label_df=pd.DataFrame(i_predicted_labels,columns=vocab) - i_predicted_label_df1=pd.concat([i_data,i_predicted_label_df],axis=1) - i_predicted_label_df1['PREDICTION']=i_predicted_label_df1[vocab].idxmax(axis=1) - i_result=i_predicted_label_df1[['DATA','LABEL','PREDICTION']] - - column_mapping=pd.DataFrame(columns=['source','target']) - temp_column_mapping=pd.DataFrame(columns=['source','target']) - for i in i_df.columns.to_list(): - temp_df1=i_result.loc[i_result['LABEL']==i] - temp_max=temp_df1['PREDICTION'].value_counts().idxmax() - temp_column_mapping.loc[0]=[i,temp_max] - column_mapping=pd.concat([column_mapping,temp_column_mapping],ignore_index=True) - not_null=i_df.count().reset_index() - tot_rows=i_df.shape[0] - not_null['not null percentage']=not_null[0]/tot_rows - coltobemodified=not_null[not_null['not null percentage']<.05]['index'].to_list() - column_mapping.loc[column_mapping['source'].isin(coltobemodified), 'target'] = '**TOO FEW COLUMN VALUES**' - st.success('Mapping completed successfully!') - st.session_state[f'{table_selector}_{target_selector}_map_un'] = column_mapping.copy() - # st.subheader('MAPPED COLUMN') - # st.dataframe(column_mapping) - - - if f'{table_selector}_{target_selector}_map_un' in st.session_state and btn: - taba, tabb, tabc = st.tabs(['Mappings Generated', 'Source Table Preview', 'Target Table Preview']) - with tabb: - st.subheader('Souce Data Preview') - with stylable_container( - key=f"source_container_with_border", - css_styles=""" - { - border: 1px solid white; - border-radius: 0.5rem; - padding: calc(1em - 1px); - width: 103%; /* Set container width to 100% */ - } - """ - ): - st.dataframe(st.session_state['source_data_un']) - with tabc: - st.subheader('Target Table Preview') - with stylable_container( - key=f"target_container_with_border", - css_styles=""" - { - border: 1px solid white; - border-radius: 0.5rem; - padding: calc(1em - 1px); - width: 103%; /* Set container width to 100% */ - } - """ - ): - st.write(st.session_state['target_data_un']) - with taba: - st.subheader("Mapping Generated:") - with stylable_container( - key=f"container_with_border", - css_styles=""" - { - border: 1px solid white; - border-radius: 0.5rem; - padding: calc(1em - 1px); - width: 103%; /* Set container width to 100% */ - } - """ - ): - - edited_map_df = st.data_editor( - st.session_state[f'{table_selector}_{target_selector}_map_un'], - column_config={ - "target": st.column_config.SelectboxColumn( - "Available Column Names", - help="Please Verify/Change the Target Column Mapping", - width="medium", - options=st.session_state.opt_un, - required=True, - ) - }, - hide_index=False, - num_rows = 'fixed', - use_container_width = True - ) - val = button("Validate", key="Val_un") - if val: - st.session_state[f'{table_selector}_{target_selector}_map_un'].update(edited_map_df) - dup= len(st.session_state[f'{table_selector}_{target_selector}_map_un'][st.session_state[f'{table_selector}_{target_selector}_map_un']['target'].duplicated()]) - if dup != 0: - dup_index= list(st.session_state[f'{table_selector}_{target_selector}_map_un'][st.session_state[f'{table_selector}_{target_selector}_map_un']['target'].duplicated(keep=False)].index) - dup_mess=str(dup_index[0]) - for val in dup_index[1:]: - dup_mess = dup_mess + f' and {str(val)}' - st.error(f"One to Many Column mapping Exists. Please Check Mapping Number: {dup_mess}") - else: - st.success("Mapping Validated! You can proceed for Mapping") - - migrate= st.button("Mapping") - if migrate: - st.subheader('Mapping PHASE') - m_queiry1="select count(*) as TARGET_COUNT_CURRENT from ["+ st.session_state.table1_un['TABLE_SCHEMA'][0]+"].["+st.session_state.target_selector_un+"]" - #st.write(m_queiry1) - old_count=pd.read_sql_query(m_queiry1,con=conn) - st.write('RECORDS IN TARGET TABLE BEFORE Mapping',old_count) - with st.spinner('Mapping in progress...'): - cursor1=conn.cursor() - q1='INSERT INTO ['+ st.session_state.table1_un['TABLE_SCHEMA'][0]+'].['+st.session_state.target_selector_un+'] ("' - q2=' select "' - for i,x in enumerate(st.session_state['source_data_un'].columns.values.tolist()): - t=st.session_state[f'{table_selector}_{target_selector}_map_un'].loc[st.session_state[f'{table_selector}_{target_selector}_map_un']['source']==x,'target'].values[0] - if i==len(st.session_state['source_data_un'].columns.values.tolist())-1: - q_temp1=t+'") ' - q_temp2=x+'" ' - else: - q_temp1=t+'", "' - q_temp2=x+'", "' - q1=q1+q_temp1 - q2=q2+q_temp2 - #q_temp='INSERT INTO ['+ table1['TABLE_SCHEMA'][0]+'].['+target_selector+'] ("'+t+'") select "'+x+'" from ['+ table1['TABLE_SCHEMA'][0]+'].['+table_selector+']' - # st.write(q) - q=q1+q2+' from ['+ st.session_state.table1_un['TABLE_SCHEMA'][0]+'].['+table_selector+']' - #st.write(q) - cursor1.execute(q) - conn.commit() - # m_query2="UPDATE ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"] SET ID=9999 WHERE ID IS NULL" - # # cur_time=datetime.datetime.now().time().strftime("%Y%m%d%H%M%S") - # m_query3="UPDATE ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"] SET LOADID='LOADEDBYAI' WHERE LOADID IS NULL" - # m_query4="UPDATE ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"] SET FILE_NAME='AUTOMATED_INSERT' WHERE FILE_NAME IS NULL" - # cursor1.execute(m_query2) - # cursor1.execute(m_query3) - # cursor1.execute(m_query4) - # conn.commit() - st.success('Mapping completed successfully!') - m_query5="select count(*) as TARGET_COUNT_AFTER_Mapping from ["+ st.session_state.table1_un['TABLE_SCHEMA'][0]+"].["+st.session_state.target_selector_un+"]" - new_count=pd.read_sql_query(m_query5,con=conn) - conn.close() - st.write('RECORDS IN TARGET TABLE AFTER Mapping',new_count) - - - -if mode == 'Unsupervised Mapping(You Do Not Have Sufficient Sample Data in Target Template)': - conn = pymssql.connect("Server=sql-ext-dev-uks-001.database.windows.net;" - "Database=sqldb-ext-dev-uks-001;" - "UID=dbadmin;" - "PWD=mYpa$$w0rD" ) - query1="select * from INFORMATION_SCHEMA.TABLES where TABLE_SCHEMA='dbo' ORDER BY TABLE_NAME ASC" - table1=pd.read_sql_query(query1,con=conn) - st.session_state.table1= table1 - table1['TABLE_NAME']=table1['TABLE_NAME'].astype('str') - #col2sel1, col2sel2 = st.columns(2) + if set(['ID','LOADID','FILE_NAME']).issubset(i_df.columns): + i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1) - table_selector=st.multiselect('SOURCE TABLE NAME(S)',['TCM', 'TCVM','TEM', 'TPM', 'TPP', 'TPT', 'TRM', 'TSCM', 'TSM'],default=None,placeholder='Select table for automated column mapping') + i_df = i_df.loc[:, ~i_df.columns.duplicated()] - #target_selector=st.selectbox('TARGET TABLE NAME',['POLICY_MAPPINGTARGET_TBL','FINANCE_MAPPINGTARGET_TBL','CUSTOMER_MASTER_TARGET'],index=None,placeholder='Select target table') - target_selector = st.file_uploader("UPLOAD TARGET METADATA FILE", type=['csv']) - tgt_name=None - mapping_df=None - if target_selector is not None: - mapping_df = pd.read_csv(target_selector) - tgt_name = target_selector.name - - required_columns = ['Field_Name', 'Primary Key'] - if all(col in mapping_df.columns for col in required_columns): - field_names = mapping_df['Field_Name'].tolist() - tgt_df = pd.DataFrame(columns=field_names) - - st.session_state.target_selector = target_selector - # mapping_selector = target_selector - # st.session_state.mapping_selector = mapping_selector - if mapping_df is not None: - st.session_state.mapping_df = mapping_df - - if table_selector is not None: - if len(table_selector)==1: - query2="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+str(table_selector[0])+"]" - i_df = pd.read_sql_query(query2,con=conn) - # conn.close() - if set(['ID','LOADID','FILE_NAME']).issubset(i_df.columns): - i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1) - elif len(table_selector)>1: - - dataframes = {} - col_names = [] - for tab in table_selector: - query2_2= "select * from [dbo].["+tab+"]" - dataframes[f'{tab}'] = pd.read_sql_query(query2_2,con=conn) - col_names = col_names + list(dataframes[f'{tab}'].columns) - - tab_names = table_selector - metadata = MultiTableMetadata() - metadata.detect_from_dataframes( - data= dataframes - ) - multi_python_dict = metadata.to_dict() - - rlist1=multi_python_dict['relationships'] - relationships=pd.DataFrame(columns=['PARENT TABLE','CHILD TABLE','PARENT PRIMARY KEY','CHILD FOREIGN KEY']) - for i in range(len(rlist1)): - rlist=rlist1[i] - nrow=pd.DataFrame({'PARENT TABLE':rlist['parent_table_name'],'CHILD TABLE':rlist['child_table_name'],'PARENT PRIMARY KEY':rlist['parent_primary_key'],'CHILD FOREIGN KEY':rlist['child_foreign_key']},index=[i]) - relationships=pd.concat([relationships,nrow],ignore_index=True) - - filtered_relationships = relationships[ - (relationships['PARENT TABLE'].isin(table_selector)) & - (relationships['CHILD TABLE'].isin(table_selector)) - ] - - i_df = pd.DataFrame() - - for _, row in filtered_relationships.iterrows(): - parent_table = row['PARENT TABLE'] - child_table = row['CHILD TABLE'] - parent_key = row['PARENT PRIMARY KEY'] - child_key = row['CHILD FOREIGN KEY'] - - if parent_table in dataframes and child_table in dataframes: - parent_df = dataframes[parent_table] - child_df = dataframes[child_table] - - left_joined_df = pd.merge( - parent_df, child_df, how='left', - left_on=parent_key, right_on=child_key, - suffixes=(f'_{parent_table}', f'_{child_table}') - ) - - for col in child_df.columns: - if col != child_key: - left_joined_df.rename( - columns={col: f'{col}_{parent_table}_{child_table}'}, inplace=True - ) - - right_joined_df = pd.merge( - parent_df, child_df, how='left', - left_on=child_key, right_on=parent_key, - suffixes=(f'_{child_table}', f'_{parent_table}') - ) - - for col in child_df.columns: - if col != child_key: - left_joined_df.rename( - columns={col: f'{col}_{child_table}_{parent_table}'}, inplace=True - ) - - i_df = pd.concat([i_df, left_joined_df, right_joined_df], ignore_index=True) - - i_df = i_df.loc[:, ~i_df.columns.duplicated()] - - for table_name in table_selector: - if table_name in dataframes: - for col in dataframes[table_name].columns: - if col in i_df.columns and not any([col.endswith(f'_{table_name}') for table in table_selector]): - i_df.rename(columns={col: f'{col}_{table_name}'}, inplace=True) - - if set(['ID','LOADID','FILE_NAME']).issubset(i_df.columns): - i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1) - - i_df = i_df.loc[:, ~i_df.columns.duplicated()] - - - if table_selector is not None: - if tgt_name is not None: - btn= button('RUN', key='RUN_GENAI') - - if target_selector is not None and btn and f'{table_selector}_{tgt_name}_map' not in st.session_state: - st.session_state['source_data'] = i_df.sample(20).reset_index() - if target_selector is not None: - #query3="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"]" - #tgt_df=pd.read_sql_query(query3,con=conn) - # if set(['ID','LOADID','FILE_NAME']).issubset(tgt_df.columns): - # tgt_df=tgt_df.drop(['ID','LOADID','FILE_NAME'],axis=1) - st.session_state['opt'] = list(tgt_df.columns) - st.session_state['target_data'] = tgt_df.head(20).reset_index() + if table_selector is not None: + if tgt_name is not None: + btn= button('RUN', key='RUN_GENAI') - with st.spinner('Processing Data...'): - selected_df = pd.DataFrame() - #st.write(i_df) - # Iterate through each column - for col in i_df.columns: - # Filter non-null and non-blank values - non_null_values = i_df[col][i_df[col] != ''].dropna().astype(str).str.strip().unique() - - # Select up to 10 values (or fewer if less than 10 non-null values) - selected_values = list(non_null_values[:10]) - selected_values = selected_values + [""] * (10 - len(selected_values)) - # Add selected values to the new dataframe - selected_df[col] = selected_values - - mapping_df = st.session_state.mapping_df - # List of tables provided - tables_list = table_selector - - # Dictionary to store the table columns - table_columns = {} - - # Loop through each table in the list - for table in tables_list: - query = f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'" - - cursor = conn.cursor() - cursor.execute(query) + if target_selector is not None and btn and f'{table_selector}_{tgt_name}_map' not in st.session_state: + st.session_state['source_data'] = i_df.sample(20).reset_index() + if target_selector is not None: + #query3="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"]" + #tgt_df=pd.read_sql_query(query3,con=conn) + # if set(['ID','LOADID','FILE_NAME']).issubset(tgt_df.columns): + # tgt_df=tgt_df.drop(['ID','LOADID','FILE_NAME'],axis=1) + st.session_state['opt'] = list(tgt_df.columns) + st.session_state['target_data'] = tgt_df.head(20).reset_index() + + with st.spinner('Processing Data...'): + selected_df = pd.DataFrame() + #st.write(i_df) + # Iterate through each column + for col in i_df.columns: + # Filter non-null and non-blank values + non_null_values = i_df[col][i_df[col] != ''].dropna().astype(str).str.strip().unique() + + # Select up to 10 values (or fewer if less than 10 non-null values) + selected_values = list(non_null_values[:10]) + selected_values = selected_values + [""] * (10 - len(selected_values)) + # Add selected values to the new dataframe + selected_df[col] = selected_values + + mapping_df = st.session_state.mapping_df + # List of tables provided + tables_list = table_selector + + # Dictionary to store the table columns + table_columns = {} + + # Loop through each table in the list + for table in tables_list: + query = f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'" + + cursor = conn.cursor() + cursor.execute(query) + + # Fetch the column names for the current table + columns = [row[0] for row in cursor.fetchall()] + + # Store the column names in the dictionary + table_columns[table] = columns - # Fetch the column names for the current table - columns = [row[0] for row in cursor.fetchall()] + if 'table_columns' not in st.session_state: + st.session_state.table_columns = table_columns + + story = f""" Details of the source table: + table columns: {str(list(i_df.columns))} + column datatypes: {str(i_df.dtypes.to_string())} + table sample data: {selected_df.head(10).to_string()} + Source Tables selected : {str(list(table_selector))} + Source Table columns are given as (col_name)_(table_name) + + Joining conditions should be based on relationship table which is : {relationships.to_string()} + + Details of the target table: + table columns: {str(list(tgt_df.columns))} + column datatypes: {str(tgt_df.dtypes.to_string())} + table sample data: {tgt_df.head(10).to_string()} + mapping details: {mapping_df.to_string()} + + Source Column names should match from this dictionary: {str(table_columns)} + """ + response = model.generate_content( + textwrap.dedent(""" + Please return JSON describing the possible **one to one mapping** between source table and target table using this following schema: + + {"Mapping": list[MAPPING]} + + MAPPING = {"Target_Table": str, "Field_Name": str, "Source Table": str, "Source Column": str, "Joining Keys": str, "Primary Key": str, "Direct/Derived": str, "Join Tables": str, "Join Condition": str, "Mapping Confidence": percentage, "Column Operations": str, "Alias": str, "Granularity": str, "Filter Condition": str, "Clauses": str, "Ordering": str} + + The first six columns are provided in mapping details. **THE FIRST SIX COLUMNS OF JSON SHOULD HAVE EXACTLY SAME VALUES AS PROVIDED IN MAPPING DETAILS** + **THE PRIMARY KEY IS COMING FROM THE PARENT TABLE. IF SOURCE TABLE IS NOT THE PRIMARY KEY TABLE, THEN 'Direct/Derived' SHOULD BE LABELLED AS DERIVED, OTHERWISE DIRECT** + **JOIN TABLES SHOULD BE WRITTEN ONLY IN CASE OF DERIVED LABEL, JOIN TABLES SHOULD BE THE PARENT TABLE AND THE SOURCE TABLE (IF DIFFERENT)** + **JOINING CONDITION SHOULD BE BASED ON THE TABLE AND COULMN WHICH HAS PRIMARY KEY AND JOINING TYPE SHOULD BE LEFT OUTER** + **ALIAS SHOULD BE SAME AS FIELD_NAME, GRANULARITY SHOULD BE 1:1** + + 1. For example, Field_Name is 'Product Name', Source Table is 'TRM', Source Column is 'PRODUCT_NAME', 'Joining Keys':'PRODUCT_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DERIVED', 'Join Type':'LEFT OUTER', 'Join Tables':'TPM, TRM', 'Join Condition':'TPM.PRODUCT_ID=TRM.PRODUCT_ID', 'Alias': 'PRODUCT_NAME', 'Granularity': '1:1'} + Joining is done on TPM since the primary key POLICY_ID is taken from TPM table. So TPM.PRODUCT_ID = TRM.PRODUCT_ID is joining condition + + 2. For example, Field_Name is 'Office Code', Source Table is 'TPM', Source Column is 'OFFICE_CD', 'Joining Keys':'POLICY_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DIRECT', 'Join Type':None, 'Join Tables':None, 'Join Condition':None, 'Alias': 'OFFICE_CD', 'Granularity': '1:1'} + Joining is not done since TPM is the parent table. So, 'Direct/Derived": 'DIRECT'. POLICY_ID is the primary key here. + + 3. For example, Field_Name is 'Policy Submission Date', Source Table is 'TSM', Source Column is 'POLICY_SUBMISSION_DT', 'Joining Keys':'POLICY_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DERIVED', 'Join Type':'LEFT OUTER', 'Join Tables':'TPM, TSM', 'Join Condition':'TPM.POLICY_ID=TRM.POLICY_ID', 'Alias': 'POLICY_SUBMISSION_DT', 'Granularity': '1:1'} + Joining is done on TPM since the primary key POLICY_ID is taken from TPM table. So TPM.POLICY_ID = TRM.POLICY_ID is joining condition + + If Source Column is POLICY_ID_TPM, then change it to POLICY_ID. + **Source Column should not contain the '_TPM', '_TRM', '_TSM', '_TPP' part at the end.** + + All Target fields are required. The JSON keys will be same as the column names in mapping details.Validate the mapping as given in mapping details. + Ignore the columns where hardcoded values are there , such as Current Flag, Start Date, End Date, Etl Job ID,Etl Batch ID,Etl Inserted Date,Etl Updated Date. leave them blank. For other fields, there has to be mapping. + + If you are confused on which source tables to map, then provide MAPPING CONFIDENCE LESS THAN 90 + + ALL THE JSON KEYS ARE MANDATORY: 'Target_Table', 'Field_Name', 'Source Table', 'Source Column', 'Joining Keys', 'Primary Key', 'Direct/Derived', 'Join Type', 'Join Tables', 'Join Condition', 'Mapping Confidence', 'Column Operations', 'Alias', 'Granularity', 'Filter Condition', 'Clauses', 'Ordering' + + Important: Only return a single piece of valid JSON text. All fields are required. Please MAP all ***TARGET fields***. If you struggle to map then give low confidence score but YOU HAVE TO MAP ANYWAY. ****MAKE SURE IT IS A **one to one mapping** **** + + Here is the table details: + + """) + story + ) + - # Store the column names in the dictionary - table_columns[table] = columns + res= response.text.replace("\n", '').replace("`", '').replace('json','') + map = print(json.dumps(json.loads(res), indent=2)) + data = json.loads(res) + map_df = pd.json_normalize(data, record_path=['Mapping']) + st.session_state[f'{table_selector}_{tgt_name}_map'] = map_df.copy() - if 'table_columns' not in st.session_state: - st.session_state.table_columns = table_columns - - story = f""" Details of the source table: - table columns: {str(list(i_df.columns))} - column datatypes: {str(i_df.dtypes.to_string())} - table sample data: {selected_df.head(10).to_string()} - Source Tables selected : {str(list(table_selector))} - Source Table columns are given as (col_name)_(table_name) - - Joining conditions should be based on relationship table which is : {relationships.to_string()} - - Details of the target table: - table columns: {str(list(tgt_df.columns))} - column datatypes: {str(tgt_df.dtypes.to_string())} - table sample data: {tgt_df.head(10).to_string()} - mapping details: {mapping_df.to_string()} - - Source Column names should match from this dictionary: {str(table_columns)} - """ - response = model.generate_content( - textwrap.dedent(""" - Please return JSON describing the possible **one to one mapping** between source table and target table using this following schema: - - {"Mapping": list[MAPPING]} - - MAPPING = {"Target_Table": str, "Field_Name": str, "Source Table": str, "Source Column": str, "Joining Keys": str, "Primary Key": str, "Direct/Derived": str, "Join Tables": str, "Join Condition": str, "Mapping Confidence": percentage, "Column Operations": str, "Alias": str, "Granularity": str, "Filter Condition": str, "Clauses": str, "Ordering": str} - - The first six columns are provided in mapping details. **THE FIRST SIX COLUMNS OF JSON SHOULD HAVE EXACTLY SAME VALUES AS PROVIDED IN MAPPING DETAILS** - **THE PRIMARY KEY IS COMING FROM THE PARENT TABLE. IF SOURCE TABLE IS NOT THE PRIMARY KEY TABLE, THEN 'Direct/Derived' SHOULD BE LABELLED AS DERIVED, OTHERWISE DIRECT** - **JOIN TABLES SHOULD BE WRITTEN ONLY IN CASE OF DERIVED LABEL, JOIN TABLES SHOULD BE THE PARENT TABLE AND THE SOURCE TABLE (IF DIFFERENT)** - **JOINING CONDITION SHOULD BE BASED ON THE TABLE AND COULMN WHICH HAS PRIMARY KEY AND JOINING TYPE SHOULD BE LEFT OUTER** - **ALIAS SHOULD BE SAME AS FIELD_NAME, GRANULARITY SHOULD BE 1:1** - - 1. For example, Field_Name is 'Product Name', Source Table is 'TRM', Source Column is 'PRODUCT_NAME', 'Joining Keys':'PRODUCT_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DERIVED', 'Join Type':'LEFT OUTER', 'Join Tables':'TPM, TRM', 'Join Condition':'TPM.PRODUCT_ID=TRM.PRODUCT_ID', 'Alias': 'PRODUCT_NAME', 'Granularity': '1:1'} - Joining is done on TPM since the primary key POLICY_ID is taken from TPM table. So TPM.PRODUCT_ID = TRM.PRODUCT_ID is joining condition - - 2. For example, Field_Name is 'Office Code', Source Table is 'TPM', Source Column is 'OFFICE_CD', 'Joining Keys':'POLICY_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DIRECT', 'Join Type':None, 'Join Tables':None, 'Join Condition':None, 'Alias': 'OFFICE_CD', 'Granularity': '1:1'} - Joining is not done since TPM is the parent table. So, 'Direct/Derived": 'DIRECT'. POLICY_ID is the primary key here. - - 3. For example, Field_Name is 'Policy Submission Date', Source Table is 'TSM', Source Column is 'POLICY_SUBMISSION_DT', 'Joining Keys':'POLICY_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DERIVED', 'Join Type':'LEFT OUTER', 'Join Tables':'TPM, TSM', 'Join Condition':'TPM.POLICY_ID=TRM.POLICY_ID', 'Alias': 'POLICY_SUBMISSION_DT', 'Granularity': '1:1'} - Joining is done on TPM since the primary key POLICY_ID is taken from TPM table. So TPM.POLICY_ID = TRM.POLICY_ID is joining condition - - If Source Column is POLICY_ID_TPM, then change it to POLICY_ID. - **Source Column should not contain the '_TPM', '_TRM', '_TSM', '_TPP' part at the end.** - - All Target fields are required. The JSON keys will be same as the column names in mapping details.Validate the mapping as given in mapping details. - Ignore the columns where hardcoded values are there , such as Current Flag, Start Date, End Date, Etl Job ID,Etl Batch ID,Etl Inserted Date,Etl Updated Date. leave them blank. For other fields, there has to be mapping. - - If you are confused on which source tables to map, then provide MAPPING CONFIDENCE LESS THAN 90 - - ALL THE JSON KEYS ARE MANDATORY: 'Target_Table', 'Field_Name', 'Source Table', 'Source Column', 'Joining Keys', 'Primary Key', 'Direct/Derived', 'Join Type', 'Join Tables', 'Join Condition', 'Mapping Confidence', 'Column Operations', 'Alias', 'Granularity', 'Filter Condition', 'Clauses', 'Ordering' - - Important: Only return a single piece of valid JSON text. All fields are required. Please MAP all ***TARGET fields***. If you struggle to map then give low confidence score but YOU HAVE TO MAP ANYWAY. ****MAKE SURE IT IS A **one to one mapping** **** - - Here is the table details: - - """) + story - ) - - res= response.text.replace("\n", '').replace("`", '').replace('json','') - map = print(json.dumps(json.loads(res), indent=2)) - data = json.loads(res) - map_df = pd.json_normalize(data, record_path=['Mapping']) - st.session_state[f'{table_selector}_{tgt_name}_map'] = map_df.copy() - - - if f'{table_selector}_{tgt_name}_map' in st.session_state and btn: - taba, tabb, tabc = st.tabs(['Mappings Generated', 'Source Table Preview', 'Target Table Preview']) - with tabc: - st.subheader('Target Table Preview') - with stylable_container( - key=f"source_container_with_border", - css_styles=""" - { - border: 1px solid white; - border-radius: 0.5rem; - padding: calc(1em - 1px); - width: 103%; /* Set container width to 100% */ - } - """ - ): - st.dataframe(st.session_state['target_data'].head(0)) - with tabb: - st.subheader('Source Table Preview') - with stylable_container( - key=f"target_container_with_border", - css_styles=""" - { - border: 1px solid white; - border-radius: 0.5rem; - padding: calc(1em - 1px); - width: 103%; /* Set container width to 100% */ - } - """ - ): - st.write(st.session_state['source_data']) - with taba: - st.subheader("Most Probable Mapping Generated:") - with stylable_container( - key=f"container_with_border", - css_styles=""" - { - border: 1px solid white; - border-radius: 0.5rem; - padding: calc(1em - 1px); - width: 103%; /* Set container width to 100% */ - } - """ - ): - edited_map_df = st.data_editor( - st.session_state[f'{table_selector}_{tgt_name}_map'], - column_config={ - "Target Column Name": st.column_config.SelectboxColumn( - "Target Columns", - help="Please Verify/Change the Target Column Mapping", - width="medium", - options=st.session_state.opt, - required=True, - ) - }, - hide_index=False, - num_rows = 'dynamic', - use_container_width = True - ) - - success_show=1 - if success_show==1: - st.success(f"{(edited_map_df['Mapping Confidence']>90).mean().round(2)*100}% of Columns Mapped with more than 90% Mapping Confidence") - - mapped_uploader = st.file_uploader("UPLOAD REVISED MAPPING (OPTIONAL)", type=['csv']) - if mapped_uploader is not None: - success_show=0 - edited_map_df = pd.read_csv(mapped_uploader) - st.write(edited_map_df) - st.success("Mapping Revised!") - - val = button("Validate", key="Val") - if val: - st.session_state[f'{table_selector}_{tgt_name}_map'].update(edited_map_df) - dup= len(st.session_state[f'{table_selector}_{tgt_name}_map'][st.session_state[f'{table_selector}_{tgt_name}_map']['Field_Name'].duplicated()]) + if f'{table_selector}_{tgt_name}_map' in st.session_state and btn: + taba, tabb, tabc = st.tabs(['Mappings Generated', 'Source Table Preview', 'Target Table Preview']) + with tabc: + st.subheader('Target Table Preview') + with stylable_container( + key=f"source_container_with_border", + css_styles=""" + { + border: 1px solid white; + border-radius: 0.5rem; + padding: calc(1em - 1px); + width: 103%; /* Set container width to 100% */ + } + """ + ): + st.dataframe(st.session_state['target_data'].head(0)) + with tabb: + st.subheader('Source Table Preview') + with stylable_container( + key=f"target_container_with_border", + css_styles=""" + { + border: 1px solid white; + border-radius: 0.5rem; + padding: calc(1em - 1px); + width: 103%; /* Set container width to 100% */ + } + """ + ): + st.write(st.session_state['source_data']) + with taba: + st.subheader("Most Probable Mapping Generated:") + with stylable_container( + key=f"container_with_border", + css_styles=""" + { + border: 1px solid white; + border-radius: 0.5rem; + padding: calc(1em - 1px); + width: 103%; /* Set container width to 100% */ + } + """ + ): + edited_map_df = st.data_editor( + st.session_state[f'{table_selector}_{tgt_name}_map'], + column_config={ + "Target Column Name": st.column_config.SelectboxColumn( + "Target Columns", + help="Please Verify/Change the Target Column Mapping", + width="medium", + options=st.session_state.opt, + required=True, + ) + }, + hide_index=False, + num_rows = 'dynamic', + use_container_width = True + ) + + success_show=1 + if success_show==1: + st.success(f"{(edited_map_df['Mapping Confidence']>90).mean().round(2)*100}% of Columns Mapped with more than 90% Mapping Confidence") + + mapped_uploader = st.file_uploader("UPLOAD REVISED MAPPING (OPTIONAL)", type=['csv']) + if mapped_uploader is not None: + success_show=0 + edited_map_df = pd.read_csv(mapped_uploader) + st.write(edited_map_df) + st.success("Mapping Revised!") - error_messages = [] - table_columns = st.session_state.table_columns - - for _, (index, row) in enumerate(edited_map_df.iterrows()): - source_table = row['Source Table'] - source_column = row['Source Column'] + val = button("Validate", key="Val") + if val: + st.session_state[f'{table_selector}_{tgt_name}_map'].update(edited_map_df) + dup= len(st.session_state[f'{table_selector}_{tgt_name}_map'][st.session_state[f'{table_selector}_{tgt_name}_map']['Field_Name'].duplicated()]) - if source_column not in table_columns.get(source_table, []) and (source_column is not None) and (source_table is not None) and (source_table in table_selector): - error_messages.append(f"Column '{source_column}' not found in table '{source_table}'.\n") + error_messages = [] + table_columns = st.session_state.table_columns + + for _, (index, row) in enumerate(edited_map_df.iterrows()): + source_table = row['Source Table'] + source_column = row['Source Column'] + + if source_column not in table_columns.get(source_table, []) and (source_column is not None) and (source_table is not None) and (source_table in table_selector): + error_messages.append(f"Column '{source_column}' not found in table '{source_table}'.\n") + + # Output success or error messages + if error_messages: + validation_result = "\n".join(error_messages) + else: + validation_result = "Success" + + if dup != 0: + dup_index= list(st.session_state[f'{table_selector}_{tgt_name}_map'][st.session_state[f'{table_selector}_{tgt_name}_map']['Target Column Name'].duplicated(keep=False)].index) + dup_mess=str(dup_index[0]) + for val in dup_index[1:]: + dup_mess = dup_mess + f' and {str(val)}' + st.error(f"One to Many Column mapping Exists. Please Check Mapping Number: {dup_mess}") + elif validation_result != "Success": + st.error(validation_result) + else: + st.success("Mapping Validated!") + + df_tbl_dtls = pd.read_csv(r'tbl_dtl.csv') + + with pd.ExcelWriter('Final.xlsx') as writer: + edited_map_df.to_excel(writer, sheet_name='POLICY', index=False) + df_tbl_dtls.to_excel(writer, sheet_name='Table Details', index=False) + + path = 'Final.xlsx' + + temp_table = None + for x in pd.ExcelFile(path).sheet_names: + if x != 'Table Details': + temp_table = x + + df = read_excel(path, temp_table) + table_details_df = read_excel(path, 'Table Details') + table_details_mapping = table_details_df.set_index('Table Name')[['ETL Timestamp','Change Timestamp','ETL Filter']].T.to_dict('list') + + base_table, base_pk, proc_query, incr_join_grps, incr_table_join_info, incr_join, temp_table_schema = generate_sql(temp_table) + sql_query = '' + for x in proc_query: + sql_query += x + sql_query += '\n' + spark_sql = generate_spark(proc_query, incr_join_grps, base_table, base_pk, incr_table_join_info, incr_join, temp_table_schema) + + #out=edited_map_df.to_csv().encode('utf-8') + col21, col22= st.columns([1,4]) + with col21: + st.download_button('Download SQL Statement', sql_query, file_name='sql_code.txt') + with col22: + st.download_button('Download Spark Statement', spark_sql, file_name='spark_code.txt') + #st.download_button(label='DOWNLOAD MAPPING',data=out, file_name='S2T_Mapping.csv',mime='csv') - # Output success or error messages - if error_messages: - validation_result = "\n".join(error_messages) - else: - validation_result = "Success" - - if dup != 0: - dup_index= list(st.session_state[f'{table_selector}_{tgt_name}_map'][st.session_state[f'{table_selector}_{tgt_name}_map']['Target Column Name'].duplicated(keep=False)].index) - dup_mess=str(dup_index[0]) - for val in dup_index[1:]: - dup_mess = dup_mess + f' and {str(val)}' - st.error(f"One to Many Column mapping Exists. Please Check Mapping Number: {dup_mess}") - elif validation_result != "Success": - st.error(validation_result) - else: - st.success("Mapping Validated!") - - df_tbl_dtls = pd.read_csv(r'tbl_dtl.csv') - - with pd.ExcelWriter('Final.xlsx') as writer: - edited_map_df.to_excel(writer, sheet_name='POLICY', index=False) - df_tbl_dtls.to_excel(writer, sheet_name='Table Details', index=False) - - path = 'Final.xlsx' - - temp_table = None - for x in pd.ExcelFile(path).sheet_names: - if x != 'Table Details': - temp_table = x - - df = read_excel(path, temp_table) - table_details_df = read_excel(path, 'Table Details') - table_details_mapping = table_details_df.set_index('Table Name')[['ETL Timestamp','Change Timestamp','ETL Filter']].T.to_dict('list') - - base_table, base_pk, proc_query, incr_join_grps, incr_table_join_info, incr_join, temp_table_schema = generate_sql(temp_table) - sql_query = '' - for x in proc_query: - sql_query += x - sql_query += '\n' - spark_sql = generate_spark(proc_query, incr_join_grps, base_table, base_pk, incr_table_join_info, incr_join, temp_table_schema) - - #out=edited_map_df.to_csv().encode('utf-8') - col21, col22= st.columns([1,4]) - with col21: - st.download_button('Download SQL Statement', sql_query, file_name='sql_code.txt') - with col22: - st.download_button('Download Spark Statement', spark_sql, file_name='spark_code.txt') - #st.download_button(label='DOWNLOAD MAPPING',data=out, file_name='S2T_Mapping.csv',mime='csv') +###### +if __name__ == '__main__': + main() \ No newline at end of file