|
|
|
|
|
import pandas as pd |
|
import numpy as np |
|
from sklearn.feature_extraction.text import CountVectorizer |
|
import regex as re |
|
import streamlit as st |
|
import pyodbc |
|
import datetime |
|
import google.generativeai as genai |
|
import textwrap |
|
import json |
|
from streamlit_extras.stateful_button import button |
|
from streamlit_extras.stylable_container import stylable_container |
|
import sdv |
|
from sdv.metadata import MultiTableMetadata |
|
from collections import defaultdict |
|
import pymssql |
|
|
|
|
|
from streamlit_app import sidebar |
|
|
|
genai.configure(api_key='AIzaSyCeY8jSHKW6t0OSDRjc2VAfBvMunVrff2w') |
|
|
|
model = genai.GenerativeModel( |
|
model_name='models/gemini-1.5-flash' |
|
) |
|
|
|
|
|
def main(): |
|
|
|
sidebar() |
|
|
|
def read_excel(path, sheet): |
|
df = pd.read_excel(path, sheet_name = sheet, dtype = 'str') |
|
return df |
|
|
|
def split_join_condition(join_condition): |
|
conditions = [] |
|
condition = '' |
|
bracket_count = 0 |
|
|
|
for char in join_condition: |
|
if char == '(': |
|
bracket_count += 1 |
|
elif char == ')': |
|
bracket_count -+ 1 |
|
if char == ',' and bracket_count == 0: |
|
conditions.append(condition.strip()) |
|
condition = '' |
|
else: |
|
condition += char |
|
if condition: |
|
conditions.append(condition.strip()) |
|
|
|
return conditions |
|
|
|
def join_incr(join_conditions): |
|
join = [] |
|
join_pattern = re.compile(r'(\w+\.\w+)\s*=\s*(\w+\w.\w+)', re.IGNORECASE) |
|
for join_condition in join_conditions: |
|
parts = re.split(r'\sAND\s|\sOR\s', join_condition, flags = re.IGNORECASE) |
|
temp = [x.strip() for x in parts if join_pattern.match(x.strip())] |
|
join.append(' AND '.join(temp)) |
|
return join |
|
|
|
def generate_sql(temp_table): |
|
proc_query = [] |
|
base_table = None |
|
|
|
source_table_schema = 'MAIN.GOLD' |
|
temp_table_schema = 'MAIN.GOLD' |
|
base_pk = [] |
|
|
|
join_fields = set() |
|
|
|
for _,row in df.iterrows(): |
|
source_table = row['Source Table'] |
|
primary_key = row['Primary Key'] |
|
source_column = row['Source Column'] |
|
alias = row['Alias'] |
|
joining_keys = row['Joining Keys'] |
|
|
|
if not base_table: |
|
if primary_key == 'Y': |
|
base_table = source_table |
|
base_pk.append(joining_keys) |
|
|
|
if pd.notna(joining_keys): |
|
keys = [x.strip() for x in joining_keys.split(',')] |
|
for x in keys: |
|
if x not in join_fields: |
|
join_fields.add(x) |
|
|
|
unique_cols = ['Source Table', 'Joining Keys', 'Primary Key', 'Join Type','Join Tables','Join Condition'] |
|
unique_df = df.drop_duplicates(subset = unique_cols) |
|
|
|
incremantal_mapping = {} |
|
incr_joins = {} |
|
|
|
for _,row in unique_df.iterrows(): |
|
|
|
source_table = row['Source Table'] |
|
source_column = row['Source Column'] |
|
joining_keys = row['Joining Keys'] |
|
primary_key = row['Primary Key'] |
|
direct_derived = row['Direct/Derived'] |
|
join_type = row['Join Type'] |
|
join_tables = row['Join Tables'] |
|
join_condition = row['Join Condition'] |
|
|
|
if source_table == base_table: |
|
if primary_key == 'Y': |
|
key = (source_table, joining_keys, join_type, join_tables, join_condition) |
|
key1 = source_table |
|
else: |
|
continue |
|
else: |
|
key = (source_table, joining_keys, join_type, join_tables, join_condition) |
|
key1 = source_table |
|
if pd.notna(direct_derived) and pd.notna(source_table) and pd.notna(source_column): |
|
if key not in incremantal_mapping: |
|
incremantal_mapping[key] = { |
|
'source_table': source_table, |
|
'joining_keys':joining_keys, |
|
'join_type': join_type, |
|
'join_tables': join_tables, |
|
'join_condition': join_condition |
|
} |
|
if key1 not in incr_joins: |
|
if pd.notna(direct_derived) and direct_derived == 'DERIVED': |
|
incr_joins[key1] = { |
|
'join_type': join_type, |
|
'join_tables': ', '.join([x.strip() for x in join_tables.split(',') if x != base_table]), |
|
'join_condition': join_condition |
|
} |
|
incremental_df = pd.DataFrame(incremantal_mapping.values()) |
|
incr_join_grps = incremental_df.groupby(['source_table']) |
|
proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.{temp_table}_INCR;') |
|
|
|
incr_table_join_info = {} |
|
for _,row in incremental_df.iterrows(): |
|
source_table = row['source_table'] |
|
|
|
if source_table != base_table: |
|
joining_keys = row['joining_keys'] |
|
join_type = row['join_type'] |
|
join_tables = [x.strip() for x in row['join_tables'].split(',')] |
|
index = join_tables.index(source_table) |
|
join_condition = [x.strip() for x in row['join_condition'].split(',')][0:index] |
|
incr_table_join_info[source_table] = ', '.join(join_condition) |
|
|
|
incr_query = [] |
|
incr_cols = '' |
|
incr_tables = [] |
|
incr_join = {} |
|
|
|
for _, group in incr_join_grps: |
|
|
|
for table in _.split(): |
|
if base_table != table: |
|
|
|
join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')] |
|
join_keys = [t.strip() for t in ','.join(base_pk).split(',')] |
|
join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')] |
|
join_cond = split_join_condition(incr_table_join_info[table]) |
|
join_condition = join_incr(join_cond) |
|
source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] |
|
|
|
join_key_list = [] |
|
for x in join_keys: |
|
join_key_list.append(f'{base_table}.{x}') |
|
join_key = ', '.join(join_key_list) |
|
|
|
for y in source_table: |
|
sql = f""" |
|
INSERT INTO {temp_table_schema}.{temp_table}_INCR |
|
( |
|
SELECT {join_key}, {table_details_mapping[y][0]}, {table_details_mapping[y][1]}, '{y}', 1, CURRENT_TIMESTAMP |
|
FROM {source_table_schema}.{base_table} {base_table}""" |
|
|
|
incr_join_text = '' |
|
for i in range(len(join_condition)): |
|
sql += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' |
|
incr_join_text += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' |
|
incr_join[y] = incr_join_text |
|
|
|
sql += f""" |
|
WHERE COALESCE({join_tables[i+1]}.operation,'NA') <> 'D' |
|
AND TO_TIMESTAMP( CAST(SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),1,4) || '-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),5,2) ||'-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),7,2) || ' ' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),9,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),11,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{y}') |
|
);""" |
|
|
|
incr_query.append(sql) |
|
incr_tables.append(y) |
|
|
|
else: |
|
source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] |
|
join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')] |
|
|
|
join_key_list = [] |
|
for x in join_keys: |
|
join_key_list.append(f'{base_table}.{x}') |
|
join_key = ', '.join(join_key_list) |
|
|
|
incr_cols = join_key |
|
sql = f""" |
|
INSERT INTO {temp_table_schema}.{temp_table}_INCR |
|
( |
|
SELECT {join_key}, {table_details_mapping[base_table][0]}, {table_details_mapping[base_table][1]}, '{base_table}', 1, CURRENT_TIMESTAMP |
|
FROM {source_table_schema}.{base_table} {base_table} |
|
WHERE COALESCE(operation,'NA') <> 'D' |
|
AND TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{base_table}') |
|
);""" |
|
proc_query.append(sql) |
|
incr_tables.append(base_table) |
|
|
|
proc_query.append('\n'.join(incr_query)) |
|
proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.INCR1_{temp_table};') |
|
|
|
sql = f""" |
|
INSERT INTO {temp_table_schema}.INCR1_{temp_table} |
|
( |
|
SELECT DISTINCT {incr_cols.replace(f'{base_table}.', '')} |
|
FROM {temp_table_schema}.{temp_table}_INCR |
|
);""" |
|
|
|
proc_query.append(sql) |
|
|
|
incr_table_dict = {} |
|
for table in incr_tables: |
|
if table == base_table: |
|
incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}' |
|
else: |
|
p = [x for x in incr_join[table].split('\n\t') if len(x) > 1] |
|
if len(p) == 1: |
|
incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}' |
|
else: |
|
incr_table_dict[table] = f'{source_table_schema}.{table}' |
|
|
|
s = [] |
|
for table in incr_tables: |
|
incr2_sql_list = [] |
|
|
|
if table == base_table: |
|
for key in incr_cols.replace(f'{base_table}.', '').split(','): |
|
incr2_sql_list.append(f"{base_table}.{key} = A.{key}") |
|
incr2_sql_join = ' AND '.join(incr2_sql_list) |
|
|
|
sql = f""" |
|
CREATE TABLE {temp_table_schema}.INCR2_{table} |
|
AS |
|
SELECT |
|
{table}.* |
|
FROM |
|
{source_table_schema}.{table} {table} |
|
INNER JOIN |
|
{temp_table_schema}.INCR1_{temp_table} A ON {incr2_sql_join}; """ |
|
proc_query.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};') |
|
proc_query.append(sql) |
|
|
|
else: |
|
|
|
p = [x for x in incr_join[table].split('\n\t') if len(x) > 1] |
|
if len(p) == 1: |
|
sql = f""" |
|
CREATE TABLE {temp_table_schema}.INCR2_{table} |
|
AS |
|
SELECT |
|
{table}.* |
|
FROM |
|
{temp_table_schema}.INCR2_{base_table} {base_table} {incr_join[table]};""" |
|
s.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};') |
|
s.append(sql) |
|
|
|
for x in s: |
|
proc_query.append(x) |
|
|
|
select_clause = [] |
|
from_clause = [] |
|
where_clause = [] |
|
|
|
for _,row in df.iterrows(): |
|
field_name = row['Field_Name'] |
|
source_table = row['Source Table'] |
|
source_column = row['Source Column'] |
|
joining_keys = row['Joining Keys'] |
|
primary_key = row['Primary Key'] |
|
direct_derived = row['Direct/Derived'] |
|
join_type = row['Join Type'] |
|
join_tables = row['Join Tables'] |
|
join_condition = row['Join Condition'] |
|
column_operation = row['Column Operations'] |
|
alias = row['Alias'] |
|
granularity = row['Granularity'] |
|
filter_condition = row['Filter Condition'] |
|
clauses = row['Clauses'] |
|
ordering = row['Ordering'] |
|
|
|
if pd.notna(direct_derived): |
|
if pd.notna(column_operation): |
|
if len(column_operation.split()) == 1: |
|
select_expr = f'{column_operation.upper()}({source_table}.{source_column})' |
|
else: |
|
select_expr = column_operation |
|
else: |
|
if pd.notna(source_table): |
|
select_expr = f'{source_table}.{source_column}' |
|
else: |
|
select_expr = source_column |
|
|
|
if source_column not in join_fields: |
|
if pd.notna(alias): |
|
select_expr += f' AS {alias}' |
|
else: |
|
if pd.notna(column_operation) and pd.notna(source_column): |
|
select_expr += f' AS {source_column}' |
|
|
|
if direct_derived.upper() == 'DIRECT': |
|
select_clause.append(select_expr) |
|
elif direct_derived.upper() == 'DERIVED_BASE': |
|
select_clause.append(select_expr) |
|
|
|
if pd.notna(filter_condition): |
|
where_clause.append(filter_condition) |
|
|
|
select_query = ',\n\t'.join(select_clause) |
|
sql_query = f"CREATE TABLE {temp_table_schema}.{base_table}_BASE\nAS \n\tSELECT \n\t{select_query} \nFROM\n\t{incr_table_dict[base_table]} {base_table}" |
|
if where_clause: |
|
sql_query += f"\nWHERE {' AND'.join(where_clause)}" |
|
sql_query += ';' |
|
proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{base_table}_BASE;") |
|
proc_query.append(sql_query) |
|
|
|
df['Clauses'].fillna('', inplace = True) |
|
df['Ordering'].fillna('', inplace = True) |
|
c = 1 |
|
temp_base_table = f'{base_table}_BASE' |
|
grp_cols = ['Join Condition', 'Clauses', 'Ordering'] |
|
join_grps = df[df['Direct/Derived'] == 'DERIVED'].groupby(['Join Condition', 'Clauses', 'Ordering']) |
|
temp_tables_sql = [] |
|
for (join_condition,clauses,ordering), group in join_grps: |
|
if pd.notna(group['Direct/Derived'].iloc[0]): |
|
if group['Direct/Derived'].iloc[0].upper() == 'DERIVED': |
|
join_tables = [t.strip() for t in group['Join Tables'].iloc[0].split(',')] |
|
join_keys = [t.strip() for t in group['Joining Keys'].iloc[0].split(',')] |
|
join_type = [t.strip() for t in group['Join Type'].iloc[0].split(',')] |
|
join_condition = split_join_condition(group['Join Condition'].iloc[0]) |
|
temp_table_name = f"TEMP_{group['Source Table'].iloc[0]}" |
|
source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')] |
|
alias = [t.strip() for t in (','.join(group['Alias'])).split(',')] |
|
source_table = [t.strip() for t in (','.join(group['Source Table'])).split(',')] |
|
|
|
base_cols = [] |
|
for join_key in join_keys: |
|
base_cols.append(f'{join_tables[0]}.{join_key}') |
|
|
|
for s_table,col,alias in zip(source_table,source_column,alias): |
|
if pd.notna(group['Column Operations'].iloc[0]): |
|
if len(group['Column Operations'].iloc[0].split()) == 1: |
|
select_expr = f"{group['Column Operations'].iloc[0].upper()}({s_table}.{col})" |
|
else: |
|
select_expr = group['Column Operations'].iloc[0] |
|
else: |
|
if pd.notna(s_table): |
|
select_expr = f"{s_table}.{col}" |
|
else: |
|
select_expr = col |
|
|
|
if alias: |
|
select_expr += f" AS {alias}" |
|
base_cols.append(select_expr) |
|
|
|
if ordering: |
|
base_cols.append(f"{ordering} AS RN") |
|
|
|
sql = ',\n\t\t'.join(base_cols) |
|
|
|
join_sql = f"SELECT \n\t\t{sql} \nFROM\n\t{incr_table_dict[base_table]} {join_tables[0]}" |
|
for i in range(len(join_type)): |
|
join_sql += f'\n\t{join_type[i]} JOIN {incr_table_dict[join_tables[i+1]]} {join_tables[i+1]} ON {join_condition[i]}' |
|
if clauses: |
|
join_sql += f'\n\t{clauses}' |
|
join_sql += ';' |
|
|
|
proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{temp_table_name};") |
|
proc_query.append(f"CREATE TABLE {temp_table_schema}.{temp_table_name}\nAS \n\t{join_sql}") |
|
|
|
granularity = [t.strip() for t in group['Granularity'].iloc[0].split(',')] |
|
|
|
sql = [] |
|
for key in join_keys: |
|
sql.append(f"A.{key} = B.{key}") |
|
|
|
temp_cols = [] |
|
temp_cols.append('A.*') |
|
|
|
source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')] |
|
alias = [t.strip() for t in (','.join(group['Alias'])).split(',')] |
|
|
|
for col,alias in zip(source_column,alias): |
|
select_expr = f"B.{col}" |
|
if alias: |
|
select_expr = f"B.{alias}" |
|
else: |
|
select_expr = f"B.{col}" |
|
temp_cols.append(select_expr) |
|
|
|
temp_select_query = ',\n\t\t'.join(temp_cols) |
|
|
|
proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.TEMP_{temp_table}_{c};") |
|
|
|
base_sql = f"CREATE TABLE {temp_table_schema}.TEMP_{temp_table}_{c}\nAS \n\tSELECT \n\t\t{temp_select_query} \nFROM\n\t{temp_table_schema}.{temp_base_table} AS A" |
|
base_sql += f"\n\tLEFT OUTER JOIN {temp_table_schema}.{temp_table_name} B ON {' AND '.join(sql)}" |
|
|
|
if '1:1' in granularity and len(ordering) > 1: |
|
base_sql += f" AND B.RN = 1" |
|
base_sql += ';' |
|
|
|
temp_base_table = f'TEMP_{temp_table}_{c}' |
|
c += 1 |
|
proc_query.append(base_sql) |
|
|
|
fin_table_name = temp_table |
|
fin_table_cols = [] |
|
|
|
for _,row in df.iterrows(): |
|
field_name = row['Field_Name'] |
|
source_table = row['Source Table'] |
|
source_column = row['Source Column'] |
|
alias = row['Alias'] |
|
|
|
if pd.notna(row['Direct/Derived']): |
|
if (source_column in join_fields): |
|
fin_table_cols.append(f'{source_column} AS "{field_name}"') |
|
else: |
|
fin_table_cols.append(f'"{field_name}"') |
|
|
|
fin_table_cols = ',\n\t\t'.join(fin_table_cols) |
|
fin_sql = f"INSERT INTO {temp_table_schema}.{fin_table_name}\n\tSELECT \n\t\t{fin_table_cols} \nFROM\n\t{temp_table_schema}.TEMP_{temp_table}_{c-1};" |
|
|
|
|
|
condition_col = '_'.join(incr_cols.replace(f'{base_table}.', '').split(',')) |
|
proc_query.append(f"DELETE FROM {temp_table_schema}.{fin_table_name}\nWHERE {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} IN (SELECT {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} FROM {temp_table_schema}.INCR1_{temp_table});") |
|
proc_query.append(fin_sql) |
|
|
|
for table in incr_tables: |
|
sql = f""" |
|
INSERT INTO audit.reportingdb_audit_tbl_{temp_table} |
|
( |
|
SELECT |
|
'{temp_table}' as mart_table_name, |
|
'{table}' as src_table_name, |
|
coalesce( max(TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS')),(select max(max_update_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) max_update_date, |
|
CURRENT_TIMESTAMP as load_timestamp, |
|
coalesce(max(prev_updt_ts),(select max(source_reference_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) AS source_reference_date, |
|
max(nvl(batch_number,0))+1 |
|
FROM {temp_table_schema}.{temp_table}_INCR where table_name = '{table}' |
|
);""" |
|
proc_query.append(sql) |
|
|
|
return base_table, base_pk, proc_query, incr_join_grps, incr_table_join_info, incr_join, temp_table_schema |
|
|
|
def create_df(query, table_df_mapping, table_usage_count): |
|
script = [] |
|
query = ' '.join(query.split()).strip() |
|
match = re.match(r'CREATE TABLE (\w+\.\w+\.\w+) AS (SELECT .+)', query, re.IGNORECASE) |
|
source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE) |
|
source_tables = [table for pair in source_tables for table in pair if table] |
|
|
|
if not match: |
|
raise ValueError('Invalid SQL') |
|
table_name = match.group(1).split('.')[2] |
|
select_statement = match.group(2) |
|
create_script = f'{table_name} = spark.sql(""" {select_statement} """)' |
|
persist_script = f'{table_name} = {table_name}.persist()' |
|
view_script = f'{table_name}.createOrReplaceTempView("{table_name}")' |
|
|
|
for table in source_tables: |
|
create_script = create_script.replace(table, table_df_mapping[table]) |
|
|
|
script.append(f"\n\t\t######################---------Creating table {create_script.split('=')[0].strip()}-------############################") |
|
script.append(create_script) |
|
script.append(persist_script) |
|
script.append(view_script) |
|
script.append(f'''print("{create_script.split('=')[0].strip()} count: ", {create_script.split('=')[0].strip()}.count()''') |
|
|
|
if 'INCR2_' in table_name: |
|
x = table_name.split('INCR2_')[1] |
|
if x in table_details_mapping.keys(): |
|
script.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################") |
|
script.append(f"{x}_max_update_date = INCR2_{x}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]") |
|
script.append(f"{x}_max_source_reference_date = INCR2_{x}.agg(max(to_timestamp('{table_details_mapping[x][1].replace(x+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]") |
|
script.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{x}',{x}_max_update_date,{x}_max_source_reference_date, max_batch_id, config)") |
|
script.append('\n') |
|
|
|
for table in source_tables: |
|
table_usage_count[table.split('.')[2]] -= 1 |
|
|
|
for table in source_tables: |
|
if table_usage_count[table.split('.')[2]] == 0 and 'INCR1_' not in table: |
|
unpersist_script = f"{table.split('.')[2]}.unpersist()" |
|
script.append(unpersist_script) |
|
|
|
return '\n\t\t'.join(script) |
|
|
|
def generate_spark(proc_query, incr_join_grps, base_table, base_pk, incr_table_join_info, incr_join, temp_table_schema): |
|
table_usage_count = defaultdict(int) |
|
table_df_mapping = {} |
|
|
|
for query in proc_query: |
|
if 'CREATE TABLE' or 'DELETE' in query: |
|
source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE) |
|
source_tables = [table for pair in source_tables for table in pair if table] |
|
for table in source_tables: |
|
table_usage_count[table.split('.')[2]] += 1 |
|
if 'DELETE' not in query: |
|
table_df_mapping[table] = table.split('.')[2] |
|
|
|
script = [] |
|
for query in proc_query: |
|
if 'CREATE TABLE' in query: |
|
script.append(create_df(query, table_df_mapping,table_usage_count)) |
|
|
|
spark_query = [] |
|
spark_query.append("\t\t######################---------Reading source data -------############################") |
|
for table in table_details_mapping.keys(): |
|
spark_query.append(f'{table} = read_file(spark, config, \"{table}\").filter("{table_details_mapping[table][2]}")') |
|
spark_query.append(f'{table} = {table}.persist()') |
|
spark_query.append(f'{table}.createOrReplaceTempView("{table}")') |
|
spark_query.append(f'print("{table} count: ", {table}.count()') |
|
spark_query.append('\n') |
|
|
|
spark_query.append("\n\t\t######################---------Reading records-------############################") |
|
for table in table_details_mapping.keys(): |
|
spark_query.append(f"{table}_max_update_date = read_max_update_date(redshift_conn, config['application_name'],'{table}', config)") |
|
spark_query.append(f'{table}_max_update_date = {table}_max_update_date[0][0]') |
|
spark_query.append('\n') |
|
|
|
incr1_spark = [] |
|
temp_incr1 = [] |
|
for _, group in incr_join_grps: |
|
for table in _.split(): |
|
if base_table != table: |
|
join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')] |
|
join_keys = [t.strip() for t in ','.join(base_pk).split(',')] |
|
join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')] |
|
join_cond = split_join_condition(incr_table_join_info[table]) |
|
join_condition = join_incr(join_cond) |
|
source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] |
|
|
|
join_key_list = [] |
|
for x in join_keys: |
|
join_key_list.append(f'{base_table}.{x}') |
|
join_key = ', '.join(join_key_list) |
|
|
|
for y in source_table: |
|
sql = f"""SELECT {join_key} FROM {base_table} {base_table}""" |
|
|
|
incr_join_text = '' |
|
i=0 |
|
for i in range(len(join_condition)): |
|
sql += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' |
|
incr_join_text += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}' |
|
|
|
sql += f''' WHERE {join_tables[i+1]}._hoodie_commit_time > cast('"""+str({join_tables[i+1]}_max_update_date)+"""' as timestamp)''' |
|
temp_incr1.append(sql) |
|
|
|
else: |
|
source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')] |
|
join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')] |
|
|
|
join_key_list = [] |
|
for x in join_keys: |
|
join_key_list.append(f'{base_table}.{x}') |
|
join_key = ', '.join(join_key_list) |
|
|
|
sql = f'''SELECT {join_key} FROM {base_table} {base_table} WHERE {base_table}._hoodie_commit_time > cast('"""+str({base_table}_max_update_date)+"""' as timestamp)''' |
|
incr1_spark.append(sql) |
|
for i in temp_incr1: |
|
incr1_spark.append(i) |
|
incr1_spark = '\nUNION\n'.join(incr1_spark) |
|
spark_query.append("\n\t\t######################---------Creating INCR1-------############################") |
|
spark_query.append(f'INCR1_{temp_table} = spark.sql(""" {incr1_spark} """)') |
|
spark_query.append(f'\n\t\tINCR1_{temp_table} = INCR1_{temp_table}.dropDuplicates()') |
|
spark_query.append(f'INCR1_{temp_table} = INCR1_{temp_table}.persist()') |
|
spark_query.append(f'INCR1_{temp_table}.createOrReplaceTempView("INCR1_{temp_table}")') |
|
spark_query.append(f'print("INCR1_{temp_table} count: ", INCR1_{temp_table}.count())') |
|
|
|
spark_query.append("\n\t\t######################---------Creating INCR2-------############################") |
|
for table in table_details_mapping.keys(): |
|
if table in incr_join.keys(): |
|
p = [x for x in incr_join[table].split('\n\t') if len(x) > 1] |
|
if len(p) > 1: |
|
spark_query.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################") |
|
spark_query.append(f"{table}_max_update_date = {table}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]") |
|
spark_query.append(f"{table}_max_source_reference_date = {table}.agg(max(to_timestamp('{table_details_mapping[table][1].replace(table+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]") |
|
spark_query.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{table}',{table}_max_update_date,{table}_max_source_reference_date, max_batch_id, config)") |
|
spark_query.append('\n') |
|
|
|
for query in script: |
|
spark_query.append(query) |
|
spark_query.append('\n') |
|
|
|
spark_query1 = [] |
|
spark_query1.append('\n') |
|
for query in proc_query: |
|
if f'{temp_table_schema}.{temp_table}\n' in query: |
|
final_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE) |
|
final_tables = [table.split('.')[2].strip() for pair in final_tables for table in pair if table and table.split('.')[2].strip() != temp_table][0] |
|
if 'INCR1_' in final_tables: |
|
spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['incr2df_path'])") |
|
else: |
|
spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['resultdf_path'])") |
|
spark_query1.append(f'''cur.execute(""" {query} """)''') |
|
spark_query1.append('\n') |
|
|
|
with open('template.txt') as file: |
|
template = file.read() |
|
|
|
result = template.replace('INSERT_CODE_1', '\n\t\t'.join(spark_query)) |
|
result = result.replace('INSERT_CODE_2', '\t\t'.join(spark_query1)) |
|
|
|
return result |
|
|
|
|
|
|
|
st.set_page_config(page_title='AUTOMATED SOURCE TO TARGET MAPPING', layout= 'wide') |
|
st.markdown(""" |
|
<style> |
|
|
|
/* Remove blank space at top and bottom */ |
|
.block-container { |
|
padding-top: 1.9rem; |
|
padding-bottom: 1rem; |
|
} |
|
|
|
/* Remove blank space at the center canvas */ |
|
.st-emotion-cache-z5fcl4 { |
|
position: relative; |
|
top: -62px; |
|
} |
|
|
|
/* Make the toolbar transparent and the content below it clickable */ |
|
.st-emotion-cache-18ni7ap { |
|
pointer-events: none; |
|
background: rgb(255 255 255 / 0%) |
|
} |
|
.st-emotion-cache-zq5wmm { |
|
pointer-events: auto; |
|
background: rgb(255 255 255); |
|
border-radius: 5px; |
|
} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
st.subheader('AUTOMATED SOURCE TO TARGET MAPPING') |
|
mode= st.selectbox('Select Mode of Mapping',('Supervised Mapping(You Have Sufficient Sample Data in Target Template)', 'Unsupervised Mapping(You Do Not Have Sufficient Sample Data in Target Template)'), index=None,placeholder='Select category of table') |
|
if mode == 'Supervised Mapping(You Have Sufficient Sample Data in Target Template)': |
|
conn = pymssql.connect( "Server=sql-ext-dev-uks-001.database.windows.net;" |
|
"Database=sqldb-ext-dev-uks-001;" |
|
"UID=dbadmin;" |
|
"PWD=mYpa$$w0rD" ) |
|
query1="select * from INFORMATION_SCHEMA.TABLES where TABLE_SCHEMA='dbo' ORDER BY TABLE_NAME ASC" |
|
table1=pd.read_sql_query(query1,con=conn) |
|
st.session_state.table1_un= table1 |
|
table1['TABLE_NAME']=table1['TABLE_NAME'].astype('str') |
|
colsel1, colsel2= st.columns(2) |
|
with colsel1: |
|
table_selector=st.selectbox('SOURCE TABLE NAME',['TCM', 'TCVM','TEM', 'TPM', 'TPP', 'TPT', 'TRM', 'TSCM', 'TSM'],index=None,placeholder='Select table for automated column mapping') |
|
with colsel2: |
|
target_selector=st.selectbox('TARGET TABLE NAME',['POLICY_MAPPINGTARGET_TBL','FINANCE_MAAPINGTARGET_TBL','CUSTOMER_MASTER_TARGET'],index=None,placeholder='Select target table') |
|
st.session_state.target_selector_un = target_selector |
|
|
|
if table_selector is not None and target_selector is not None: |
|
btn=button('RUN',key='RUN_GENAI_UN') |
|
if target_selector is not None and btn and f'{table_selector}_{target_selector}_map_un' not in st.session_state: |
|
query2="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+table_selector+"]" |
|
i_df = pd.read_sql_query(query2,con=conn) |
|
|
|
i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1) |
|
st.session_state['source_data_un'] = i_df |
|
|
|
|
|
|
|
query3="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"]" |
|
tgt_df=pd.read_sql_query(query3,con=conn).reset_index(drop=True) |
|
main_list=tgt_df.columns.to_list() |
|
sub_list=['ID','LOADID','FILE_NAME'] |
|
if any(main_list[i:i+len(sub_list)] == sub_list for i in range(len(main_list) - len(sub_list) + 1)): |
|
tgt_df=tgt_df.drop(['ID','LOADID','FILE_NAME'],axis=1) |
|
st.session_state.opt_un= list(tgt_df.columns) |
|
st.session_state['target_data_un'] = tgt_df.head(20).reset_index() |
|
|
|
|
|
|
|
|
|
|
|
with st.spinner('Running data on neural network...'): |
|
df=pd.read_csv('C:\\Applications\\MARCO POLO O AIML\\DATA CATALOG\\pages\\CUSTOMER_MASTER_TRAIN_1306.csv') |
|
cols=df.columns.tolist() |
|
data=pd.DataFrame(columns=['DATA','LABEL']) |
|
temp=pd.DataFrame(columns=['DATA','LABEL']) |
|
for x in cols: |
|
temp['DATA']=df[x] |
|
temp['LABEL']=x |
|
data=pd.concat([data,temp],ignore_index=True) |
|
data['DATA']=data['DATA'].astype('string') |
|
data['LABEL']=data['LABEL'].astype('string') |
|
data=data.dropna() |
|
data=data.reset_index(drop=True) |
|
|
|
|
|
|
|
|
|
|
|
vectorizer = CountVectorizer(analyzer='char_wb', ngram_range=(1, 3), min_df=1) |
|
X=vectorizer.fit_transform(data['DATA']) |
|
feature=pd.DataFrame(data=X.toarray(),columns=vectorizer.get_feature_names_out()) |
|
data1=pd.concat([data,feature],axis=1) |
|
|
|
|
|
from sklearn.feature_selection import chi2 |
|
chi_x=data1.drop(['DATA','LABEL'],axis=1) |
|
chi_y=data1['LABEL'] |
|
chi_scores=chi2(chi_x,chi_y) |
|
p_values=pd.Series(chi_scores[1],index=chi_x.columns) |
|
p_values=p_values.sort_values(ascending=True).reset_index() |
|
feature_chi=p_values['index'][:1000] |
|
data2=data1[feature_chi.to_list()] |
|
data2=pd.concat([data,data2],axis=1) |
|
|
|
|
|
def count_digits(str1): |
|
return len("".join(re.findall("\d+", str1))) |
|
|
|
def count_vowels(string): |
|
vowels = "aeiouAEIOU" |
|
count = 0 |
|
for char in string: |
|
if char in vowels: |
|
count += 1 |
|
return count |
|
|
|
def count_special_character(string): |
|
special_characters = "!@#$%^&*()-+?_=,<>/" |
|
special_char = 0 |
|
for i in range(0, len(string)): |
|
if (string[i] in special_characters): |
|
special_char += 1 |
|
return special_char |
|
|
|
def count_spaces(string): |
|
spaces = 0 |
|
for char in string: |
|
if char == " ": |
|
spaces += 1 |
|
return spaces |
|
|
|
data2['LENGTH']=data2['DATA'].apply(lambda x:len(x)) |
|
data2['digit_c']=data2['DATA'].apply(lambda x:count_digits(x)) |
|
data2['vowel_c']=data2['DATA'].apply(lambda x:count_vowels(x)) |
|
data2['spchar_c']=data2['DATA'].apply(lambda x:count_special_character(x)) |
|
data2['space_c']=data2['DATA'].apply(lambda x:count_spaces(x)) |
|
|
|
chi_scores1=chi2(data2[['LENGTH','digit_c','vowel_c','spchar_c','space_c']],data2['LABEL']) |
|
p_values1=pd.Series(chi_scores1[1],index=data2[['LENGTH','digit_c','vowel_c','spchar_c','space_c']].columns).sort_values(ascending=True).reset_index() |
|
|
|
|
|
import tensorflow as tf |
|
from tensorflow.keras import layers |
|
from tensorflow import keras |
|
|
|
from sklearn.model_selection import train_test_split |
|
from ast import literal_eval |
|
|
|
train_df, test_df = train_test_split(data2,test_size=.1,stratify=data2['LABEL'].values) |
|
val_df = test_df.sample(frac=0.5) |
|
test_df.drop(val_df.index, inplace=True) |
|
|
|
terms = tf.ragged.constant(data2['LABEL'].values) |
|
lookup = tf.keras.layers.StringLookup(output_mode="one_hot") |
|
lookup.adapt(terms) |
|
vocab = lookup.get_vocabulary() |
|
|
|
def invert_multi_hot(encoded_labels): |
|
hot_indices = np.argwhere(encoded_labels == 1.0)[..., 0] |
|
return np.take(vocab, hot_indices) |
|
|
|
max_seqlen = 150 |
|
batch_size = 128 |
|
padding_token = "<pad>" |
|
auto = tf.data.AUTOTUNE |
|
|
|
feature_tf=data2.columns.tolist()[2:] |
|
|
|
def make_dataset(dataframe,feature,batch_size,is_train=True): |
|
labels = tf.ragged.constant(dataframe["LABEL"].values) |
|
label_binarized = lookup(labels).numpy() |
|
dataset = tf.data.Dataset.from_tensor_slices( |
|
(dataframe[feature].values, label_binarized) |
|
) |
|
dataset = dataset.shuffle(batch_size * 10) if is_train else dataset |
|
return dataset.batch(batch_size) |
|
|
|
train_dataset = make_dataset(train_df,feature_tf,batch_size, is_train=True) |
|
validation_dataset = make_dataset(val_df,feature_tf,batch_size, is_train=False) |
|
test_dataset = make_dataset(test_df,feature_tf,batch_size, is_train=False) |
|
|
|
|
|
shallow_mlp_model = keras.Sequential( |
|
[ |
|
layers.Dense(512, activation="relu"), |
|
layers.Dense(256, activation="relu"), |
|
layers.Dense(lookup.vocabulary_size(), activation="softmax"), |
|
] |
|
) |
|
|
|
shallow_mlp_model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["CategoricalAccuracy"]) |
|
epochs=20 |
|
history = shallow_mlp_model.fit(train_dataset, validation_data=validation_dataset, epochs=epochs) |
|
|
|
|
|
_, category_acc = shallow_mlp_model.evaluate(test_dataset) |
|
|
|
|
|
|
|
|
|
i_cols=i_df.columns |
|
i_cols=i_df.columns.tolist() |
|
i_data=pd.DataFrame(columns=['DATA','LABEL']) |
|
i_temp=pd.DataFrame(columns=['DATA','LABEL']) |
|
for x in i_cols: |
|
i_temp['DATA']=i_df[x] |
|
i_temp['LABEL']=x |
|
i_data=pd.concat([i_data,i_temp],ignore_index=True) |
|
i_data['DATA']=i_data['DATA'].astype('string') |
|
i_data['LABEL']=i_data['LABEL'].astype('string') |
|
i_data=i_data.dropna() |
|
i_data=i_data.reset_index(drop=True) |
|
i_X=vectorizer.transform(i_data['DATA']) |
|
i_feature=pd.DataFrame(data=i_X.toarray(),columns=vectorizer.get_feature_names_out()) |
|
i_data1=pd.concat([i_data,i_feature],axis=1) |
|
i_data2=i_data1[feature_chi.to_list()] |
|
i_data2=pd.concat([i_data,i_data2],axis=1) |
|
i_data2['LENGTH']=i_data2['DATA'].apply(lambda x:len(x)) |
|
i_data2['digit_c']=i_data2['DATA'].apply(lambda x:count_digits(x)) |
|
i_data2['vowel_c']=i_data2['DATA'].apply(lambda x:count_vowels(x)) |
|
i_data2['spchar_c']=i_data2['DATA'].apply(lambda x:count_special_character(x)) |
|
i_data2['space_c']=i_data2['DATA'].apply(lambda x:count_spaces(x)) |
|
i_run_dataset=tf.data.Dataset.from_tensor_slices((i_data2[feature_tf].values,lookup(tf.ragged.constant(i_data2["LABEL"].values)).numpy())).batch(649) |
|
|
|
|
|
i_predicted_probabilities = shallow_mlp_model.predict(i_run_dataset) |
|
i_predicted_labels = np.where(i_predicted_probabilities == i_predicted_probabilities.max(axis=1, keepdims=True), 1, 0) |
|
i_predicted_label_df=pd.DataFrame(i_predicted_labels,columns=vocab) |
|
i_predicted_label_df1=pd.concat([i_data,i_predicted_label_df],axis=1) |
|
i_predicted_label_df1['PREDICTION']=i_predicted_label_df1[vocab].idxmax(axis=1) |
|
i_result=i_predicted_label_df1[['DATA','LABEL','PREDICTION']] |
|
|
|
column_mapping=pd.DataFrame(columns=['source','target']) |
|
temp_column_mapping=pd.DataFrame(columns=['source','target']) |
|
for i in i_df.columns.to_list(): |
|
temp_df1=i_result.loc[i_result['LABEL']==i] |
|
temp_max=temp_df1['PREDICTION'].value_counts().idxmax() |
|
temp_column_mapping.loc[0]=[i,temp_max] |
|
column_mapping=pd.concat([column_mapping,temp_column_mapping],ignore_index=True) |
|
not_null=i_df.count().reset_index() |
|
tot_rows=i_df.shape[0] |
|
not_null['not null percentage']=not_null[0]/tot_rows |
|
coltobemodified=not_null[not_null['not null percentage']<.05]['index'].to_list() |
|
column_mapping.loc[column_mapping['source'].isin(coltobemodified), 'target'] = '**TOO FEW COLUMN VALUES**' |
|
st.success('Mapping completed successfully!') |
|
st.session_state[f'{table_selector}_{target_selector}_map_un'] = column_mapping.copy() |
|
|
|
|
|
|
|
|
|
if f'{table_selector}_{target_selector}_map_un' in st.session_state and btn: |
|
taba, tabb, tabc = st.tabs(['Mappings Generated', 'Source Table Preview', 'Target Table Preview']) |
|
with tabb: |
|
st.subheader('Souce Data Preview') |
|
with stylable_container( |
|
key=f"source_container_with_border", |
|
css_styles=""" |
|
{ |
|
border: 1px solid white; |
|
border-radius: 0.5rem; |
|
padding: calc(1em - 1px); |
|
width: 103%; /* Set container width to 100% */ |
|
} |
|
""" |
|
): |
|
st.dataframe(st.session_state['source_data_un']) |
|
with tabc: |
|
st.subheader('Target Table Preview') |
|
with stylable_container( |
|
key=f"target_container_with_border", |
|
css_styles=""" |
|
{ |
|
border: 1px solid white; |
|
border-radius: 0.5rem; |
|
padding: calc(1em - 1px); |
|
width: 103%; /* Set container width to 100% */ |
|
} |
|
""" |
|
): |
|
st.write(st.session_state['target_data_un']) |
|
with taba: |
|
st.subheader("Mapping Generated:") |
|
with stylable_container( |
|
key=f"container_with_border", |
|
css_styles=""" |
|
{ |
|
border: 1px solid white; |
|
border-radius: 0.5rem; |
|
padding: calc(1em - 1px); |
|
width: 103%; /* Set container width to 100% */ |
|
} |
|
""" |
|
): |
|
|
|
edited_map_df = st.data_editor( |
|
st.session_state[f'{table_selector}_{target_selector}_map_un'], |
|
column_config={ |
|
"target": st.column_config.SelectboxColumn( |
|
"Available Column Names", |
|
help="Please Verify/Change the Target Column Mapping", |
|
width="medium", |
|
options=st.session_state.opt_un, |
|
required=True, |
|
) |
|
}, |
|
hide_index=False, |
|
num_rows = 'fixed', |
|
use_container_width = True |
|
) |
|
val = button("Validate", key="Val_un") |
|
if val: |
|
st.session_state[f'{table_selector}_{target_selector}_map_un'].update(edited_map_df) |
|
dup= len(st.session_state[f'{table_selector}_{target_selector}_map_un'][st.session_state[f'{table_selector}_{target_selector}_map_un']['target'].duplicated()]) |
|
if dup != 0: |
|
dup_index= list(st.session_state[f'{table_selector}_{target_selector}_map_un'][st.session_state[f'{table_selector}_{target_selector}_map_un']['target'].duplicated(keep=False)].index) |
|
dup_mess=str(dup_index[0]) |
|
for val in dup_index[1:]: |
|
dup_mess = dup_mess + f' and {str(val)}' |
|
st.error(f"One to Many Column mapping Exists. Please Check Mapping Number: {dup_mess}") |
|
else: |
|
st.success("Mapping Validated! You can proceed for Mapping") |
|
|
|
migrate= st.button("Mapping") |
|
if migrate: |
|
st.subheader('Mapping PHASE') |
|
m_queiry1="select count(*) as TARGET_COUNT_CURRENT from ["+ st.session_state.table1_un['TABLE_SCHEMA'][0]+"].["+st.session_state.target_selector_un+"]" |
|
|
|
old_count=pd.read_sql_query(m_queiry1,con=conn) |
|
st.write('RECORDS IN TARGET TABLE BEFORE Mapping',old_count) |
|
with st.spinner('Mapping in progress...'): |
|
cursor1=conn.cursor() |
|
q1='INSERT INTO ['+ st.session_state.table1_un['TABLE_SCHEMA'][0]+'].['+st.session_state.target_selector_un+'] ("' |
|
q2=' select "' |
|
for i,x in enumerate(st.session_state['source_data_un'].columns.values.tolist()): |
|
t=st.session_state[f'{table_selector}_{target_selector}_map_un'].loc[st.session_state[f'{table_selector}_{target_selector}_map_un']['source']==x,'target'].values[0] |
|
if i==len(st.session_state['source_data_un'].columns.values.tolist())-1: |
|
q_temp1=t+'") ' |
|
q_temp2=x+'" ' |
|
else: |
|
q_temp1=t+'", "' |
|
q_temp2=x+'", "' |
|
q1=q1+q_temp1 |
|
q2=q2+q_temp2 |
|
|
|
|
|
q=q1+q2+' from ['+ st.session_state.table1_un['TABLE_SCHEMA'][0]+'].['+table_selector+']' |
|
|
|
cursor1.execute(q) |
|
conn.commit() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.success('Mapping completed successfully!') |
|
m_query5="select count(*) as TARGET_COUNT_AFTER_Mapping from ["+ st.session_state.table1_un['TABLE_SCHEMA'][0]+"].["+st.session_state.target_selector_un+"]" |
|
new_count=pd.read_sql_query(m_query5,con=conn) |
|
conn.close() |
|
st.write('RECORDS IN TARGET TABLE AFTER Mapping',new_count) |
|
|
|
|
|
|
|
if mode == 'Unsupervised Mapping(You Do Not Have Sufficient Sample Data in Target Template)': |
|
conn = pymssql.connect("Server=sql-ext-dev-uks-001.database.windows.net;" |
|
"Database=sqldb-ext-dev-uks-001;" |
|
"UID=dbadmin;" |
|
"PWD=mYpa$$w0rD" ) |
|
query1="select * from INFORMATION_SCHEMA.TABLES where TABLE_SCHEMA='dbo' ORDER BY TABLE_NAME ASC" |
|
table1=pd.read_sql_query(query1,con=conn) |
|
st.session_state.table1= table1 |
|
table1['TABLE_NAME']=table1['TABLE_NAME'].astype('str') |
|
|
|
|
|
table_selector=st.multiselect('SOURCE TABLE NAME(S)',['TCM', 'TCVM','TEM', 'TPM', 'TPP', 'TPT', 'TRM', 'TSCM', 'TSM'],default=None,placeholder='Select table for automated column mapping') |
|
|
|
|
|
|
|
target_selector = st.file_uploader("UPLOAD TARGET METADATA FILE", type=['csv']) |
|
tgt_name=None |
|
mapping_df=None |
|
if target_selector is not None: |
|
mapping_df = pd.read_csv(target_selector) |
|
tgt_name = target_selector.name |
|
|
|
required_columns = ['Field_Name', 'Primary Key'] |
|
if all(col in mapping_df.columns for col in required_columns): |
|
field_names = mapping_df['Field_Name'].tolist() |
|
tgt_df = pd.DataFrame(columns=field_names) |
|
|
|
st.session_state.target_selector = target_selector |
|
|
|
|
|
if mapping_df is not None: |
|
st.session_state.mapping_df = mapping_df |
|
|
|
if table_selector is not None: |
|
if len(table_selector)==1: |
|
query2="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+str(table_selector[0])+"]" |
|
i_df = pd.read_sql_query(query2,con=conn) |
|
|
|
if set(['ID','LOADID','FILE_NAME']).issubset(i_df.columns): |
|
i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1) |
|
elif len(table_selector)>1: |
|
|
|
dataframes = {} |
|
col_names = [] |
|
for tab in table_selector: |
|
query2_2= "select * from [dbo].["+tab+"]" |
|
dataframes[f'{tab}'] = pd.read_sql_query(query2_2,con=conn) |
|
col_names = col_names + list(dataframes[f'{tab}'].columns) |
|
|
|
tab_names = table_selector |
|
metadata = MultiTableMetadata() |
|
metadata.detect_from_dataframes( |
|
data= dataframes |
|
) |
|
multi_python_dict = metadata.to_dict() |
|
|
|
rlist1=multi_python_dict['relationships'] |
|
relationships=pd.DataFrame(columns=['PARENT TABLE','CHILD TABLE','PARENT PRIMARY KEY','CHILD FOREIGN KEY']) |
|
for i in range(len(rlist1)): |
|
rlist=rlist1[i] |
|
nrow=pd.DataFrame({'PARENT TABLE':rlist['parent_table_name'],'CHILD TABLE':rlist['child_table_name'],'PARENT PRIMARY KEY':rlist['parent_primary_key'],'CHILD FOREIGN KEY':rlist['child_foreign_key']},index=[i]) |
|
relationships=pd.concat([relationships,nrow],ignore_index=True) |
|
|
|
filtered_relationships = relationships[ |
|
(relationships['PARENT TABLE'].isin(table_selector)) & |
|
(relationships['CHILD TABLE'].isin(table_selector)) |
|
] |
|
|
|
i_df = pd.DataFrame() |
|
|
|
for _, row in filtered_relationships.iterrows(): |
|
parent_table = row['PARENT TABLE'] |
|
child_table = row['CHILD TABLE'] |
|
parent_key = row['PARENT PRIMARY KEY'] |
|
child_key = row['CHILD FOREIGN KEY'] |
|
|
|
if parent_table in dataframes and child_table in dataframes: |
|
parent_df = dataframes[parent_table] |
|
child_df = dataframes[child_table] |
|
|
|
left_joined_df = pd.merge( |
|
parent_df, child_df, how='left', |
|
left_on=parent_key, right_on=child_key, |
|
suffixes=(f'_{parent_table}', f'_{child_table}') |
|
) |
|
|
|
for col in child_df.columns: |
|
if col != child_key: |
|
left_joined_df.rename( |
|
columns={col: f'{col}_{parent_table}_{child_table}'}, inplace=True |
|
) |
|
|
|
right_joined_df = pd.merge( |
|
parent_df, child_df, how='left', |
|
left_on=child_key, right_on=parent_key, |
|
suffixes=(f'_{child_table}', f'_{parent_table}') |
|
) |
|
|
|
for col in child_df.columns: |
|
if col != child_key: |
|
left_joined_df.rename( |
|
columns={col: f'{col}_{child_table}_{parent_table}'}, inplace=True |
|
) |
|
|
|
i_df = pd.concat([i_df, left_joined_df, right_joined_df], ignore_index=True) |
|
|
|
i_df = i_df.loc[:, ~i_df.columns.duplicated()] |
|
|
|
for table_name in table_selector: |
|
if table_name in dataframes: |
|
for col in dataframes[table_name].columns: |
|
if col in i_df.columns and not any([col.endswith(f'_{table_name}') for table in table_selector]): |
|
i_df.rename(columns={col: f'{col}_{table_name}'}, inplace=True) |
|
|
|
if set(['ID','LOADID','FILE_NAME']).issubset(i_df.columns): |
|
i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1) |
|
|
|
i_df = i_df.loc[:, ~i_df.columns.duplicated()] |
|
|
|
|
|
if table_selector is not None: |
|
if tgt_name is not None: |
|
btn= button('RUN', key='RUN_GENAI') |
|
|
|
if target_selector is not None and btn and f'{table_selector}_{tgt_name}_map' not in st.session_state: |
|
st.session_state['source_data'] = i_df.sample(20).reset_index() |
|
if target_selector is not None: |
|
|
|
|
|
|
|
|
|
st.session_state['opt'] = list(tgt_df.columns) |
|
st.session_state['target_data'] = tgt_df.head(20).reset_index() |
|
|
|
with st.spinner('Processing Data...'): |
|
selected_df = pd.DataFrame() |
|
|
|
|
|
for col in i_df.columns: |
|
|
|
non_null_values = i_df[col][i_df[col] != ''].dropna().astype(str).str.strip().unique() |
|
|
|
|
|
selected_values = list(non_null_values[:10]) |
|
selected_values = selected_values + [""] * (10 - len(selected_values)) |
|
|
|
selected_df[col] = selected_values |
|
|
|
mapping_df = st.session_state.mapping_df |
|
|
|
tables_list = table_selector |
|
|
|
|
|
table_columns = {} |
|
|
|
|
|
for table in tables_list: |
|
query = f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'" |
|
|
|
cursor = conn.cursor() |
|
cursor.execute(query) |
|
|
|
|
|
columns = [row[0] for row in cursor.fetchall()] |
|
|
|
|
|
table_columns[table] = columns |
|
|
|
if 'table_columns' not in st.session_state: |
|
st.session_state.table_columns = table_columns |
|
|
|
story = f""" Details of the source table: |
|
table columns: {str(list(i_df.columns))} |
|
column datatypes: {str(i_df.dtypes.to_string())} |
|
table sample data: {selected_df.head(10).to_string()} |
|
Source Tables selected : {str(list(table_selector))} |
|
Source Table columns are given as (col_name)_(table_name) |
|
|
|
Joining conditions should be based on relationship table which is : {relationships.to_string()} |
|
|
|
Details of the target table: |
|
table columns: {str(list(tgt_df.columns))} |
|
column datatypes: {str(tgt_df.dtypes.to_string())} |
|
table sample data: {tgt_df.head(10).to_string()} |
|
mapping details: {mapping_df.to_string()} |
|
|
|
Source Column names should match from this dictionary: {str(table_columns)} |
|
""" |
|
response = model.generate_content( |
|
textwrap.dedent(""" |
|
Please return JSON describing the possible **one to one mapping** between source table and target table using this following schema: |
|
|
|
{"Mapping": list[MAPPING]} |
|
|
|
MAPPING = {"Target_Table": str, "Field_Name": str, "Source Table": str, "Source Column": str, "Joining Keys": str, "Primary Key": str, "Direct/Derived": str, "Join Tables": str, "Join Condition": str, "Mapping Confidence": percentage, "Column Operations": str, "Alias": str, "Granularity": str, "Filter Condition": str, "Clauses": str, "Ordering": str} |
|
|
|
The first six columns are provided in mapping details. **THE FIRST SIX COLUMNS OF JSON SHOULD HAVE EXACTLY SAME VALUES AS PROVIDED IN MAPPING DETAILS** |
|
**THE PRIMARY KEY IS COMING FROM THE PARENT TABLE. IF SOURCE TABLE IS NOT THE PRIMARY KEY TABLE, THEN 'Direct/Derived' SHOULD BE LABELLED AS DERIVED, OTHERWISE DIRECT** |
|
**JOIN TABLES SHOULD BE WRITTEN ONLY IN CASE OF DERIVED LABEL, JOIN TABLES SHOULD BE THE PARENT TABLE AND THE SOURCE TABLE (IF DIFFERENT)** |
|
**JOINING CONDITION SHOULD BE BASED ON THE TABLE AND COULMN WHICH HAS PRIMARY KEY AND JOINING TYPE SHOULD BE LEFT OUTER** |
|
**ALIAS SHOULD BE SAME AS FIELD_NAME, GRANULARITY SHOULD BE 1:1** |
|
|
|
1. For example, Field_Name is 'Product Name', Source Table is 'TRM', Source Column is 'PRODUCT_NAME', 'Joining Keys':'PRODUCT_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DERIVED', 'Join Type':'LEFT OUTER', 'Join Tables':'TPM, TRM', 'Join Condition':'TPM.PRODUCT_ID=TRM.PRODUCT_ID', 'Alias': 'PRODUCT_NAME', 'Granularity': '1:1'} |
|
Joining is done on TPM since the primary key POLICY_ID is taken from TPM table. So TPM.PRODUCT_ID = TRM.PRODUCT_ID is joining condition |
|
|
|
2. For example, Field_Name is 'Office Code', Source Table is 'TPM', Source Column is 'OFFICE_CD', 'Joining Keys':'POLICY_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DIRECT', 'Join Type':None, 'Join Tables':None, 'Join Condition':None, 'Alias': 'OFFICE_CD', 'Granularity': '1:1'} |
|
Joining is not done since TPM is the parent table. So, 'Direct/Derived": 'DIRECT'. POLICY_ID is the primary key here. |
|
|
|
3. For example, Field_Name is 'Policy Submission Date', Source Table is 'TSM', Source Column is 'POLICY_SUBMISSION_DT', 'Joining Keys':'POLICY_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DERIVED', 'Join Type':'LEFT OUTER', 'Join Tables':'TPM, TSM', 'Join Condition':'TPM.POLICY_ID=TRM.POLICY_ID', 'Alias': 'POLICY_SUBMISSION_DT', 'Granularity': '1:1'} |
|
Joining is done on TPM since the primary key POLICY_ID is taken from TPM table. So TPM.POLICY_ID = TRM.POLICY_ID is joining condition |
|
|
|
If Source Column is POLICY_ID_TPM, then change it to POLICY_ID. |
|
**Source Column should not contain the '_TPM', '_TRM', '_TSM', '_TPP' part at the end.** |
|
|
|
All Target fields are required. The JSON keys will be same as the column names in mapping details.Validate the mapping as given in mapping details. |
|
Ignore the columns where hardcoded values are there , such as Current Flag, Start Date, End Date, Etl Job ID,Etl Batch ID,Etl Inserted Date,Etl Updated Date. leave them blank. For other fields, there has to be mapping. |
|
|
|
If you are confused on which source tables to map, then provide MAPPING CONFIDENCE LESS THAN 90 |
|
|
|
ALL THE JSON KEYS ARE MANDATORY: 'Target_Table', 'Field_Name', 'Source Table', 'Source Column', 'Joining Keys', 'Primary Key', 'Direct/Derived', 'Join Type', 'Join Tables', 'Join Condition', 'Mapping Confidence', 'Column Operations', 'Alias', 'Granularity', 'Filter Condition', 'Clauses', 'Ordering' |
|
|
|
Important: Only return a single piece of valid JSON text. All fields are required. Please MAP all ***TARGET fields***. If you struggle to map then give low confidence score but YOU HAVE TO MAP ANYWAY. ****MAKE SURE IT IS A **one to one mapping** **** |
|
|
|
Here is the table details: |
|
|
|
""") + story |
|
) |
|
|
|
|
|
res= response.text.replace("\n", '').replace("`", '').replace('json','') |
|
map = print(json.dumps(json.loads(res), indent=2)) |
|
data = json.loads(res) |
|
map_df = pd.json_normalize(data, record_path=['Mapping']) |
|
st.session_state[f'{table_selector}_{tgt_name}_map'] = map_df.copy() |
|
|
|
|
|
if f'{table_selector}_{tgt_name}_map' in st.session_state and btn: |
|
taba, tabb, tabc = st.tabs(['Mappings Generated', 'Source Table Preview', 'Target Table Preview']) |
|
with tabc: |
|
st.subheader('Target Table Preview') |
|
with stylable_container( |
|
key=f"source_container_with_border", |
|
css_styles=""" |
|
{ |
|
border: 1px solid white; |
|
border-radius: 0.5rem; |
|
padding: calc(1em - 1px); |
|
width: 103%; /* Set container width to 100% */ |
|
} |
|
""" |
|
): |
|
st.dataframe(st.session_state['target_data'].head(0)) |
|
with tabb: |
|
st.subheader('Source Table Preview') |
|
with stylable_container( |
|
key=f"target_container_with_border", |
|
css_styles=""" |
|
{ |
|
border: 1px solid white; |
|
border-radius: 0.5rem; |
|
padding: calc(1em - 1px); |
|
width: 103%; /* Set container width to 100% */ |
|
} |
|
""" |
|
): |
|
st.write(st.session_state['source_data']) |
|
with taba: |
|
st.subheader("Most Probable Mapping Generated:") |
|
with stylable_container( |
|
key=f"container_with_border", |
|
css_styles=""" |
|
{ |
|
border: 1px solid white; |
|
border-radius: 0.5rem; |
|
padding: calc(1em - 1px); |
|
width: 103%; /* Set container width to 100% */ |
|
} |
|
""" |
|
): |
|
edited_map_df = st.data_editor( |
|
st.session_state[f'{table_selector}_{tgt_name}_map'], |
|
column_config={ |
|
"Target Column Name": st.column_config.SelectboxColumn( |
|
"Target Columns", |
|
help="Please Verify/Change the Target Column Mapping", |
|
width="medium", |
|
options=st.session_state.opt, |
|
required=True, |
|
) |
|
}, |
|
hide_index=False, |
|
num_rows = 'dynamic', |
|
use_container_width = True |
|
) |
|
|
|
success_show=1 |
|
if success_show==1: |
|
st.success(f"{(edited_map_df['Mapping Confidence']>90).mean().round(2)*100}% of Columns Mapped with more than 90% Mapping Confidence") |
|
|
|
mapped_uploader = st.file_uploader("UPLOAD REVISED MAPPING (OPTIONAL)", type=['csv']) |
|
if mapped_uploader is not None: |
|
success_show=0 |
|
edited_map_df = pd.read_csv(mapped_uploader) |
|
st.write(edited_map_df) |
|
st.success("Mapping Revised!") |
|
|
|
val = button("Validate", key="Val") |
|
if val: |
|
st.session_state[f'{table_selector}_{tgt_name}_map'].update(edited_map_df) |
|
dup= len(st.session_state[f'{table_selector}_{tgt_name}_map'][st.session_state[f'{table_selector}_{tgt_name}_map']['Field_Name'].duplicated()]) |
|
|
|
error_messages = [] |
|
table_columns = st.session_state.table_columns |
|
|
|
for _, (index, row) in enumerate(edited_map_df.iterrows()): |
|
source_table = row['Source Table'] |
|
source_column = row['Source Column'] |
|
|
|
if source_column not in table_columns.get(source_table, []) and (source_column is not None) and (source_table is not None) and (source_table in table_selector): |
|
error_messages.append(f"Column '{source_column}' not found in table '{source_table}'.\n") |
|
|
|
|
|
if error_messages: |
|
validation_result = "\n".join(error_messages) |
|
else: |
|
validation_result = "Success" |
|
|
|
if dup != 0: |
|
dup_index= list(st.session_state[f'{table_selector}_{tgt_name}_map'][st.session_state[f'{table_selector}_{tgt_name}_map']['Target Column Name'].duplicated(keep=False)].index) |
|
dup_mess=str(dup_index[0]) |
|
for val in dup_index[1:]: |
|
dup_mess = dup_mess + f' and {str(val)}' |
|
st.error(f"One to Many Column mapping Exists. Please Check Mapping Number: {dup_mess}") |
|
elif validation_result != "Success": |
|
st.error(validation_result) |
|
else: |
|
st.success("Mapping Validated!") |
|
|
|
df_tbl_dtls = pd.read_csv(r'tbl_dtl.csv') |
|
|
|
with pd.ExcelWriter('Final.xlsx') as writer: |
|
edited_map_df.to_excel(writer, sheet_name='POLICY', index=False) |
|
df_tbl_dtls.to_excel(writer, sheet_name='Table Details', index=False) |
|
|
|
path = 'Final.xlsx' |
|
|
|
temp_table = None |
|
for x in pd.ExcelFile(path).sheet_names: |
|
if x != 'Table Details': |
|
temp_table = x |
|
|
|
df = read_excel(path, temp_table) |
|
table_details_df = read_excel(path, 'Table Details') |
|
table_details_mapping = table_details_df.set_index('Table Name')[['ETL Timestamp','Change Timestamp','ETL Filter']].T.to_dict('list') |
|
|
|
base_table, base_pk, proc_query, incr_join_grps, incr_table_join_info, incr_join, temp_table_schema = generate_sql(temp_table) |
|
sql_query = '' |
|
for x in proc_query: |
|
sql_query += x |
|
sql_query += '\n' |
|
spark_sql = generate_spark(proc_query, incr_join_grps, base_table, base_pk, incr_table_join_info, incr_join, temp_table_schema) |
|
|
|
|
|
col21, col22= st.columns([1,4]) |
|
with col21: |
|
st.download_button('Download SQL Statement', sql_query, file_name='sql_code.txt') |
|
with col22: |
|
st.download_button('Download Spark Statement', spark_sql, file_name='spark_code.txt') |
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
main() |