|
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from sklearn.feature_extraction.text import CountVectorizer
|
|
import regex as re
|
|
import streamlit as st
|
|
import pyodbc
|
|
import datetime
|
|
import google.generativeai as genai
|
|
import textwrap
|
|
import json
|
|
from streamlit_extras.stateful_button import button
|
|
from streamlit_extras.stylable_container import stylable_container
|
|
import sdv
|
|
from sdv.metadata import MultiTableMetadata
|
|
from collections import defaultdict
|
|
genai.configure(api_key='AIzaSyCeY8jSHKW6t0OSDRjc2VAfBvMunVrff2w')
|
|
|
|
model = genai.GenerativeModel(
|
|
model_name='models/gemini-1.5-flash'
|
|
)
|
|
|
|
def read_excel(path, sheet):
|
|
df = pd.read_excel(path, sheet_name = sheet, dtype = 'str')
|
|
return df
|
|
|
|
def split_join_condition(join_condition):
|
|
conditions = []
|
|
condition = ''
|
|
bracket_count = 0
|
|
|
|
for char in join_condition:
|
|
if char == '(':
|
|
bracket_count += 1
|
|
elif char == ')':
|
|
bracket_count -+ 1
|
|
if char == ',' and bracket_count == 0:
|
|
conditions.append(condition.strip())
|
|
condition = ''
|
|
else:
|
|
condition += char
|
|
if condition:
|
|
conditions.append(condition.strip())
|
|
|
|
return conditions
|
|
|
|
def join_incr(join_conditions):
|
|
join = []
|
|
join_pattern = re.compile(r'(\w+\.\w+)\s*=\s*(\w+\w.\w+)', re.IGNORECASE)
|
|
for join_condition in join_conditions:
|
|
parts = re.split(r'\sAND\s|\sOR\s', join_condition, flags = re.IGNORECASE)
|
|
temp = [x.strip() for x in parts if join_pattern.match(x.strip())]
|
|
join.append(' AND '.join(temp))
|
|
return join
|
|
|
|
def generate_sql(temp_table):
|
|
proc_query = []
|
|
base_table = None
|
|
|
|
source_table_schema = 'MAIN.GOLD'
|
|
temp_table_schema = 'MAIN.GOLD'
|
|
base_pk = []
|
|
|
|
join_fields = set()
|
|
|
|
for _,row in df.iterrows():
|
|
source_table = row['Source Table']
|
|
primary_key = row['Primary Key']
|
|
source_column = row['Source Column']
|
|
alias = row['Alias']
|
|
joining_keys = row['Joining Keys']
|
|
|
|
if not base_table:
|
|
if primary_key == 'Y':
|
|
base_table = source_table
|
|
base_pk.append(joining_keys)
|
|
|
|
if pd.notna(joining_keys):
|
|
keys = [x.strip() for x in joining_keys.split(',')]
|
|
for x in keys:
|
|
if x not in join_fields:
|
|
join_fields.add(x)
|
|
|
|
unique_cols = ['Source Table', 'Joining Keys', 'Primary Key', 'Join Type','Join Tables','Join Condition']
|
|
unique_df = df.drop_duplicates(subset = unique_cols)
|
|
|
|
incremantal_mapping = {}
|
|
incr_joins = {}
|
|
|
|
for _,row in unique_df.iterrows():
|
|
|
|
source_table = row['Source Table']
|
|
source_column = row['Source Column']
|
|
joining_keys = row['Joining Keys']
|
|
primary_key = row['Primary Key']
|
|
direct_derived = row['Direct/Derived']
|
|
join_type = row['Join Type']
|
|
join_tables = row['Join Tables']
|
|
join_condition = row['Join Condition']
|
|
|
|
if source_table == base_table:
|
|
if primary_key == 'Y':
|
|
key = (source_table, joining_keys, join_type, join_tables, join_condition)
|
|
key1 = source_table
|
|
else:
|
|
continue
|
|
else:
|
|
key = (source_table, joining_keys, join_type, join_tables, join_condition)
|
|
key1 = source_table
|
|
if pd.notna(direct_derived) and pd.notna(source_table) and pd.notna(source_column):
|
|
if key not in incremantal_mapping:
|
|
incremantal_mapping[key] = {
|
|
'source_table': source_table,
|
|
'joining_keys':joining_keys,
|
|
'join_type': join_type,
|
|
'join_tables': join_tables,
|
|
'join_condition': join_condition
|
|
}
|
|
if key1 not in incr_joins:
|
|
if pd.notna(direct_derived) and direct_derived == 'DERIVED':
|
|
incr_joins[key1] = {
|
|
'join_type': join_type,
|
|
'join_tables': ', '.join([x.strip() for x in join_tables.split(',') if x != base_table]),
|
|
'join_condition': join_condition
|
|
}
|
|
incremental_df = pd.DataFrame(incremantal_mapping.values())
|
|
incr_join_grps = incremental_df.groupby(['source_table'])
|
|
proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.{temp_table}_INCR;')
|
|
|
|
incr_table_join_info = {}
|
|
for _,row in incremental_df.iterrows():
|
|
source_table = row['source_table']
|
|
|
|
if source_table != base_table:
|
|
joining_keys = row['joining_keys']
|
|
join_type = row['join_type']
|
|
join_tables = [x.strip() for x in row['join_tables'].split(',')]
|
|
index = join_tables.index(source_table)
|
|
join_condition = [x.strip() for x in row['join_condition'].split(',')][0:index]
|
|
incr_table_join_info[source_table] = ', '.join(join_condition)
|
|
|
|
incr_query = []
|
|
incr_cols = ''
|
|
incr_tables = []
|
|
incr_join = {}
|
|
|
|
for _, group in incr_join_grps:
|
|
|
|
for table in _.split():
|
|
if base_table != table:
|
|
|
|
join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')]
|
|
join_keys = [t.strip() for t in ','.join(base_pk).split(',')]
|
|
join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')]
|
|
join_cond = split_join_condition(incr_table_join_info[table])
|
|
join_condition = join_incr(join_cond)
|
|
source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
|
|
|
|
join_key_list = []
|
|
for x in join_keys:
|
|
join_key_list.append(f'{base_table}.{x}')
|
|
join_key = ', '.join(join_key_list)
|
|
|
|
for y in source_table:
|
|
sql = f"""
|
|
INSERT INTO {temp_table_schema}.{temp_table}_INCR
|
|
(
|
|
SELECT {join_key}, {table_details_mapping[y][0]}, {table_details_mapping[y][1]}, '{y}', 1, CURRENT_TIMESTAMP
|
|
FROM {source_table_schema}.{base_table} {base_table}"""
|
|
|
|
incr_join_text = ''
|
|
for i in range(len(join_condition)):
|
|
sql += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
|
|
incr_join_text += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
|
|
incr_join[y] = incr_join_text
|
|
|
|
sql += f"""
|
|
WHERE COALESCE({join_tables[i+1]}.operation,'NA') <> 'D'
|
|
AND TO_TIMESTAMP( CAST(SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),1,4) || '-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),5,2) ||'-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),7,2) || ' ' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),9,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),11,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{y}')
|
|
);"""
|
|
|
|
incr_query.append(sql)
|
|
incr_tables.append(y)
|
|
|
|
else:
|
|
source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
|
|
join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')]
|
|
|
|
join_key_list = []
|
|
for x in join_keys:
|
|
join_key_list.append(f'{base_table}.{x}')
|
|
join_key = ', '.join(join_key_list)
|
|
|
|
incr_cols = join_key
|
|
sql = f"""
|
|
INSERT INTO {temp_table_schema}.{temp_table}_INCR
|
|
(
|
|
SELECT {join_key}, {table_details_mapping[base_table][0]}, {table_details_mapping[base_table][1]}, '{base_table}', 1, CURRENT_TIMESTAMP
|
|
FROM {source_table_schema}.{base_table} {base_table}
|
|
WHERE COALESCE(operation,'NA') <> 'D'
|
|
AND TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{base_table}')
|
|
);"""
|
|
proc_query.append(sql)
|
|
incr_tables.append(base_table)
|
|
|
|
proc_query.append('\n'.join(incr_query))
|
|
proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.INCR1_{temp_table};')
|
|
|
|
sql = f"""
|
|
INSERT INTO {temp_table_schema}.INCR1_{temp_table}
|
|
(
|
|
SELECT DISTINCT {incr_cols.replace(f'{base_table}.', '')}
|
|
FROM {temp_table_schema}.{temp_table}_INCR
|
|
);"""
|
|
|
|
proc_query.append(sql)
|
|
|
|
incr_table_dict = {}
|
|
for table in incr_tables:
|
|
if table == base_table:
|
|
incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}'
|
|
else:
|
|
p = [x for x in incr_join[table].split('\n\t') if len(x) > 1]
|
|
if len(p) == 1:
|
|
incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}'
|
|
else:
|
|
incr_table_dict[table] = f'{source_table_schema}.{table}'
|
|
|
|
s = []
|
|
for table in incr_tables:
|
|
incr2_sql_list = []
|
|
|
|
if table == base_table:
|
|
for key in incr_cols.replace(f'{base_table}.', '').split(','):
|
|
incr2_sql_list.append(f"{base_table}.{key} = A.{key}")
|
|
incr2_sql_join = ' AND '.join(incr2_sql_list)
|
|
|
|
sql = f"""
|
|
CREATE TABLE {temp_table_schema}.INCR2_{table}
|
|
AS
|
|
SELECT
|
|
{table}.*
|
|
FROM
|
|
{source_table_schema}.{table} {table}
|
|
INNER JOIN
|
|
{temp_table_schema}.INCR1_{temp_table} A ON {incr2_sql_join}; """
|
|
proc_query.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};')
|
|
proc_query.append(sql)
|
|
|
|
else:
|
|
|
|
p = [x for x in incr_join[table].split('\n\t') if len(x) > 1]
|
|
if len(p) == 1:
|
|
sql = f"""
|
|
CREATE TABLE {temp_table_schema}.INCR2_{table}
|
|
AS
|
|
SELECT
|
|
{table}.*
|
|
FROM
|
|
{temp_table_schema}.INCR2_{base_table} {base_table} {incr_join[table]};"""
|
|
s.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};')
|
|
s.append(sql)
|
|
|
|
for x in s:
|
|
proc_query.append(x)
|
|
|
|
select_clause = []
|
|
from_clause = []
|
|
where_clause = []
|
|
|
|
for _,row in df.iterrows():
|
|
field_name = row['Field_Name']
|
|
source_table = row['Source Table']
|
|
source_column = row['Source Column']
|
|
joining_keys = row['Joining Keys']
|
|
primary_key = row['Primary Key']
|
|
direct_derived = row['Direct/Derived']
|
|
join_type = row['Join Type']
|
|
join_tables = row['Join Tables']
|
|
join_condition = row['Join Condition']
|
|
column_operation = row['Column Operations']
|
|
alias = row['Alias']
|
|
granularity = row['Granularity']
|
|
filter_condition = row['Filter Condition']
|
|
clauses = row['Clauses']
|
|
ordering = row['Ordering']
|
|
|
|
if pd.notna(direct_derived):
|
|
if pd.notna(column_operation):
|
|
if len(column_operation.split()) == 1:
|
|
select_expr = f'{column_operation.upper()}({source_table}.{source_column})'
|
|
else:
|
|
select_expr = column_operation
|
|
else:
|
|
if pd.notna(source_table):
|
|
select_expr = f'{source_table}.{source_column}'
|
|
else:
|
|
select_expr = source_column
|
|
|
|
if source_column not in join_fields:
|
|
if pd.notna(alias):
|
|
select_expr += f' AS {alias}'
|
|
else:
|
|
if pd.notna(column_operation) and pd.notna(source_column):
|
|
select_expr += f' AS {source_column}'
|
|
|
|
if direct_derived.upper() == 'DIRECT':
|
|
select_clause.append(select_expr)
|
|
elif direct_derived.upper() == 'DERIVED_BASE':
|
|
select_clause.append(select_expr)
|
|
|
|
if pd.notna(filter_condition):
|
|
where_clause.append(filter_condition)
|
|
|
|
select_query = ',\n\t'.join(select_clause)
|
|
sql_query = f"CREATE TABLE {temp_table_schema}.{base_table}_BASE\nAS \n\tSELECT \n\t{select_query} \nFROM\n\t{incr_table_dict[base_table]} {base_table}"
|
|
if where_clause:
|
|
sql_query += f"\nWHERE {' AND'.join(where_clause)}"
|
|
sql_query += ';'
|
|
proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{base_table}_BASE;")
|
|
proc_query.append(sql_query)
|
|
|
|
df['Clauses'].fillna('', inplace = True)
|
|
df['Ordering'].fillna('', inplace = True)
|
|
c = 1
|
|
temp_base_table = f'{base_table}_BASE'
|
|
grp_cols = ['Join Condition', 'Clauses', 'Ordering']
|
|
join_grps = df[df['Direct/Derived'] == 'DERIVED'].groupby(['Join Condition', 'Clauses', 'Ordering'])
|
|
temp_tables_sql = []
|
|
for (join_condition,clauses,ordering), group in join_grps:
|
|
if pd.notna(group['Direct/Derived'].iloc[0]):
|
|
if group['Direct/Derived'].iloc[0].upper() == 'DERIVED':
|
|
join_tables = [t.strip() for t in group['Join Tables'].iloc[0].split(',')]
|
|
join_keys = [t.strip() for t in group['Joining Keys'].iloc[0].split(',')]
|
|
join_type = [t.strip() for t in group['Join Type'].iloc[0].split(',')]
|
|
join_condition = split_join_condition(group['Join Condition'].iloc[0])
|
|
temp_table_name = f"TEMP_{group['Source Table'].iloc[0]}"
|
|
source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')]
|
|
alias = [t.strip() for t in (','.join(group['Alias'])).split(',')]
|
|
source_table = [t.strip() for t in (','.join(group['Source Table'])).split(',')]
|
|
|
|
base_cols = []
|
|
for join_key in join_keys:
|
|
base_cols.append(f'{join_tables[0]}.{join_key}')
|
|
|
|
for s_table,col,alias in zip(source_table,source_column,alias):
|
|
if pd.notna(group['Column Operations'].iloc[0]):
|
|
if len(group['Column Operations'].iloc[0].split()) == 1:
|
|
select_expr = f"{group['Column Operations'].iloc[0].upper()}({s_table}.{col})"
|
|
else:
|
|
select_expr = group['Column Operations'].iloc[0]
|
|
else:
|
|
if pd.notna(s_table):
|
|
select_expr = f"{s_table}.{col}"
|
|
else:
|
|
select_expr = col
|
|
|
|
if alias:
|
|
select_expr += f" AS {alias}"
|
|
base_cols.append(select_expr)
|
|
|
|
if ordering:
|
|
base_cols.append(f"{ordering} AS RN")
|
|
|
|
sql = ',\n\t\t'.join(base_cols)
|
|
|
|
join_sql = f"SELECT \n\t\t{sql} \nFROM\n\t{incr_table_dict[base_table]} {join_tables[0]}"
|
|
for i in range(len(join_type)):
|
|
join_sql += f'\n\t{join_type[i]} JOIN {incr_table_dict[join_tables[i+1]]} {join_tables[i+1]} ON {join_condition[i]}'
|
|
if clauses:
|
|
join_sql += f'\n\t{clauses}'
|
|
join_sql += ';'
|
|
|
|
proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{temp_table_name};")
|
|
proc_query.append(f"CREATE TABLE {temp_table_schema}.{temp_table_name}\nAS \n\t{join_sql}")
|
|
|
|
granularity = [t.strip() for t in group['Granularity'].iloc[0].split(',')]
|
|
|
|
sql = []
|
|
for key in join_keys:
|
|
sql.append(f"A.{key} = B.{key}")
|
|
|
|
temp_cols = []
|
|
temp_cols.append('A.*')
|
|
|
|
source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')]
|
|
alias = [t.strip() for t in (','.join(group['Alias'])).split(',')]
|
|
|
|
for col,alias in zip(source_column,alias):
|
|
select_expr = f"B.{col}"
|
|
if alias:
|
|
select_expr = f"B.{alias}"
|
|
else:
|
|
select_expr = f"B.{col}"
|
|
temp_cols.append(select_expr)
|
|
|
|
temp_select_query = ',\n\t\t'.join(temp_cols)
|
|
|
|
proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.TEMP_{temp_table}_{c};")
|
|
|
|
base_sql = f"CREATE TABLE {temp_table_schema}.TEMP_{temp_table}_{c}\nAS \n\tSELECT \n\t\t{temp_select_query} \nFROM\n\t{temp_table_schema}.{temp_base_table} AS A"
|
|
base_sql += f"\n\tLEFT OUTER JOIN {temp_table_schema}.{temp_table_name} B ON {' AND '.join(sql)}"
|
|
|
|
if '1:1' in granularity and len(ordering) > 1:
|
|
base_sql += f" AND B.RN = 1"
|
|
base_sql += ';'
|
|
|
|
temp_base_table = f'TEMP_{temp_table}_{c}'
|
|
c += 1
|
|
proc_query.append(base_sql)
|
|
|
|
fin_table_name = temp_table
|
|
fin_table_cols = []
|
|
|
|
for _,row in df.iterrows():
|
|
field_name = row['Field_Name']
|
|
source_table = row['Source Table']
|
|
source_column = row['Source Column']
|
|
alias = row['Alias']
|
|
|
|
if pd.notna(row['Direct/Derived']):
|
|
if (source_column in join_fields):
|
|
fin_table_cols.append(f'{source_column} AS "{field_name}"')
|
|
else:
|
|
fin_table_cols.append(f'"{field_name}"')
|
|
|
|
fin_table_cols = ',\n\t\t'.join(fin_table_cols)
|
|
fin_sql = f"INSERT INTO {temp_table_schema}.{fin_table_name}\n\tSELECT \n\t\t{fin_table_cols} \nFROM\n\t{temp_table_schema}.TEMP_{temp_table}_{c-1};"
|
|
|
|
|
|
condition_col = '_'.join(incr_cols.replace(f'{base_table}.', '').split(','))
|
|
proc_query.append(f"DELETE FROM {temp_table_schema}.{fin_table_name}\nWHERE {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} IN (SELECT {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} FROM {temp_table_schema}.INCR1_{temp_table});")
|
|
proc_query.append(fin_sql)
|
|
|
|
for table in incr_tables:
|
|
sql = f"""
|
|
INSERT INTO audit.reportingdb_audit_tbl_{temp_table}
|
|
(
|
|
SELECT
|
|
'{temp_table}' as mart_table_name,
|
|
'{table}' as src_table_name,
|
|
coalesce( max(TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS')),(select max(max_update_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) max_update_date,
|
|
CURRENT_TIMESTAMP as load_timestamp,
|
|
coalesce(max(prev_updt_ts),(select max(source_reference_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) AS source_reference_date,
|
|
max(nvl(batch_number,0))+1
|
|
FROM {temp_table_schema}.{temp_table}_INCR where table_name = '{table}'
|
|
);"""
|
|
proc_query.append(sql)
|
|
|
|
return base_table, base_pk, proc_query, incr_join_grps, incr_table_join_info, incr_join, temp_table_schema
|
|
|
|
def create_df(query, table_df_mapping, table_usage_count):
|
|
script = []
|
|
query = ' '.join(query.split()).strip()
|
|
match = re.match(r'CREATE TABLE (\w+\.\w+\.\w+) AS (SELECT .+)', query, re.IGNORECASE)
|
|
source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE)
|
|
source_tables = [table for pair in source_tables for table in pair if table]
|
|
|
|
if not match:
|
|
raise ValueError('Invalid SQL')
|
|
table_name = match.group(1).split('.')[2]
|
|
select_statement = match.group(2)
|
|
create_script = f'{table_name} = spark.sql(""" {select_statement} """)'
|
|
persist_script = f'{table_name} = {table_name}.persist()'
|
|
view_script = f'{table_name}.createOrReplaceTempView("{table_name}")'
|
|
|
|
for table in source_tables:
|
|
create_script = create_script.replace(table, table_df_mapping[table])
|
|
|
|
script.append(f"\n\t\t######################---------Creating table {create_script.split('=')[0].strip()}-------############################")
|
|
script.append(create_script)
|
|
script.append(persist_script)
|
|
script.append(view_script)
|
|
script.append(f'''print("{create_script.split('=')[0].strip()} count: ", {create_script.split('=')[0].strip()}.count()''')
|
|
|
|
if 'INCR2_' in table_name:
|
|
x = table_name.split('INCR2_')[1]
|
|
if x in table_details_mapping.keys():
|
|
script.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################")
|
|
script.append(f"{x}_max_update_date = INCR2_{x}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]")
|
|
script.append(f"{x}_max_source_reference_date = INCR2_{x}.agg(max(to_timestamp('{table_details_mapping[x][1].replace(x+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]")
|
|
script.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{x}',{x}_max_update_date,{x}_max_source_reference_date, max_batch_id, config)")
|
|
script.append('\n')
|
|
|
|
for table in source_tables:
|
|
table_usage_count[table.split('.')[2]] -= 1
|
|
|
|
for table in source_tables:
|
|
if table_usage_count[table.split('.')[2]] == 0 and 'INCR1_' not in table:
|
|
unpersist_script = f"{table.split('.')[2]}.unpersist()"
|
|
script.append(unpersist_script)
|
|
|
|
return '\n\t\t'.join(script)
|
|
|
|
def generate_spark(proc_query, incr_join_grps, base_table, base_pk, incr_table_join_info, incr_join, temp_table_schema):
|
|
table_usage_count = defaultdict(int)
|
|
table_df_mapping = {}
|
|
|
|
for query in proc_query:
|
|
if 'CREATE TABLE' or 'DELETE' in query:
|
|
source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE)
|
|
source_tables = [table for pair in source_tables for table in pair if table]
|
|
for table in source_tables:
|
|
table_usage_count[table.split('.')[2]] += 1
|
|
if 'DELETE' not in query:
|
|
table_df_mapping[table] = table.split('.')[2]
|
|
|
|
script = []
|
|
for query in proc_query:
|
|
if 'CREATE TABLE' in query:
|
|
script.append(create_df(query, table_df_mapping,table_usage_count))
|
|
|
|
spark_query = []
|
|
spark_query.append("\t\t######################---------Reading source data -------############################")
|
|
for table in table_details_mapping.keys():
|
|
spark_query.append(f'{table} = read_file(spark, config, \"{table}\").filter("{table_details_mapping[table][2]}")')
|
|
spark_query.append(f'{table} = {table}.persist()')
|
|
spark_query.append(f'{table}.createOrReplaceTempView("{table}")')
|
|
spark_query.append(f'print("{table} count: ", {table}.count()')
|
|
spark_query.append('\n')
|
|
|
|
spark_query.append("\n\t\t######################---------Reading records-------############################")
|
|
for table in table_details_mapping.keys():
|
|
spark_query.append(f"{table}_max_update_date = read_max_update_date(redshift_conn, config['application_name'],'{table}', config)")
|
|
spark_query.append(f'{table}_max_update_date = {table}_max_update_date[0][0]')
|
|
spark_query.append('\n')
|
|
|
|
incr1_spark = []
|
|
temp_incr1 = []
|
|
for _, group in incr_join_grps:
|
|
for table in _.split():
|
|
if base_table != table:
|
|
join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')]
|
|
join_keys = [t.strip() for t in ','.join(base_pk).split(',')]
|
|
join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')]
|
|
join_cond = split_join_condition(incr_table_join_info[table])
|
|
join_condition = join_incr(join_cond)
|
|
source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
|
|
|
|
join_key_list = []
|
|
for x in join_keys:
|
|
join_key_list.append(f'{base_table}.{x}')
|
|
join_key = ', '.join(join_key_list)
|
|
|
|
for y in source_table:
|
|
sql = f"""SELECT {join_key} FROM {base_table} {base_table}"""
|
|
|
|
incr_join_text = ''
|
|
i=0
|
|
for i in range(len(join_condition)):
|
|
sql += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
|
|
incr_join_text += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
|
|
|
|
sql += f''' WHERE {join_tables[i+1]}._hoodie_commit_time > cast('"""+str({join_tables[i+1]}_max_update_date)+"""' as timestamp)'''
|
|
temp_incr1.append(sql)
|
|
|
|
else:
|
|
source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
|
|
join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')]
|
|
|
|
join_key_list = []
|
|
for x in join_keys:
|
|
join_key_list.append(f'{base_table}.{x}')
|
|
join_key = ', '.join(join_key_list)
|
|
|
|
sql = f'''SELECT {join_key} FROM {base_table} {base_table} WHERE {base_table}._hoodie_commit_time > cast('"""+str({base_table}_max_update_date)+"""' as timestamp)'''
|
|
incr1_spark.append(sql)
|
|
for i in temp_incr1:
|
|
incr1_spark.append(i)
|
|
incr1_spark = '\nUNION\n'.join(incr1_spark)
|
|
spark_query.append("\n\t\t######################---------Creating INCR1-------############################")
|
|
spark_query.append(f'INCR1_{temp_table} = spark.sql(""" {incr1_spark} """)')
|
|
spark_query.append(f'\n\t\tINCR1_{temp_table} = INCR1_{temp_table}.dropDuplicates()')
|
|
spark_query.append(f'INCR1_{temp_table} = INCR1_{temp_table}.persist()')
|
|
spark_query.append(f'INCR1_{temp_table}.createOrReplaceTempView("INCR1_{temp_table}")')
|
|
spark_query.append(f'print("INCR1_{temp_table} count: ", INCR1_{temp_table}.count())')
|
|
|
|
spark_query.append("\n\t\t######################---------Creating INCR2-------############################")
|
|
for table in table_details_mapping.keys():
|
|
if table in incr_join.keys():
|
|
p = [x for x in incr_join[table].split('\n\t') if len(x) > 1]
|
|
if len(p) > 1:
|
|
spark_query.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################")
|
|
spark_query.append(f"{table}_max_update_date = {table}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]")
|
|
spark_query.append(f"{table}_max_source_reference_date = {table}.agg(max(to_timestamp('{table_details_mapping[table][1].replace(table+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]")
|
|
spark_query.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{table}',{table}_max_update_date,{table}_max_source_reference_date, max_batch_id, config)")
|
|
spark_query.append('\n')
|
|
|
|
for query in script:
|
|
spark_query.append(query)
|
|
spark_query.append('\n')
|
|
|
|
spark_query1 = []
|
|
spark_query1.append('\n')
|
|
for query in proc_query:
|
|
if f'{temp_table_schema}.{temp_table}\n' in query:
|
|
final_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE)
|
|
final_tables = [table.split('.')[2].strip() for pair in final_tables for table in pair if table and table.split('.')[2].strip() != temp_table][0]
|
|
if 'INCR1_' in final_tables:
|
|
spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['incr2df_path'])")
|
|
else:
|
|
spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['resultdf_path'])")
|
|
spark_query1.append(f'''cur.execute(""" {query} """)''')
|
|
spark_query1.append('\n')
|
|
|
|
with open('template.txt') as file:
|
|
template = file.read()
|
|
|
|
result = template.replace('INSERT_CODE_1', '\n\t\t'.join(spark_query))
|
|
result = result.replace('INSERT_CODE_2', '\t\t'.join(spark_query1))
|
|
|
|
return result
|
|
|
|
|
|
|
|
st.set_page_config(page_title='AUTOMATED SOURCE TO TARGET MAPPING', layout= 'wide')
|
|
st.markdown("""
|
|
<style>
|
|
|
|
/* Remove blank space at top and bottom */
|
|
.block-container {
|
|
padding-top: 1.9rem;
|
|
padding-bottom: 1rem;
|
|
}
|
|
|
|
/* Remove blank space at the center canvas */
|
|
.st-emotion-cache-z5fcl4 {
|
|
position: relative;
|
|
top: -62px;
|
|
}
|
|
|
|
/* Make the toolbar transparent and the content below it clickable */
|
|
.st-emotion-cache-18ni7ap {
|
|
pointer-events: none;
|
|
background: rgb(255 255 255 / 0%)
|
|
}
|
|
.st-emotion-cache-zq5wmm {
|
|
pointer-events: auto;
|
|
background: rgb(255 255 255);
|
|
border-radius: 5px;
|
|
}
|
|
</style>
|
|
""", unsafe_allow_html=True)
|
|
st.subheader('AUTOMATED SOURCE TO TARGET MAPPING')
|
|
mode= st.selectbox('Select Mode of Mapping',('Supervised Mapping(You Have Sufficient Sample Data in Target Template)', 'Unsupervised Mapping(You Do Not Have Sufficient Sample Data in Target Template)'), index=None,placeholder='Select category of table')
|
|
if mode == 'Supervised Mapping(You Have Sufficient Sample Data in Target Template)':
|
|
conn = pyodbc.connect("Driver={ODBC Driver 17 for SQL Server};"
|
|
"Server=sql-ext-dev-uks-001.database.windows.net;"
|
|
"Database=sqldb-ext-dev-uks-001;"
|
|
"UID=dbadmin;"
|
|
"PWD=mYpa$$w0rD" )
|
|
query1="select * from INFORMATION_SCHEMA.TABLES where TABLE_SCHEMA='dbo' ORDER BY TABLE_NAME ASC"
|
|
table1=pd.read_sql_query(query1,con=conn)
|
|
st.session_state.table1_un= table1
|
|
table1['TABLE_NAME']=table1['TABLE_NAME'].astype('str')
|
|
colsel1, colsel2= st.columns(2)
|
|
with colsel1:
|
|
table_selector=st.selectbox('SOURCE TABLE NAME',['TCM', 'TCVM','TEM', 'TPM', 'TPP', 'TPT', 'TRM', 'TSCM', 'TSM'],index=None,placeholder='Select table for automated column mapping')
|
|
with colsel2:
|
|
target_selector=st.selectbox('TARGET TABLE NAME',['POLICY_MAPPINGTARGET_TBL','FINANCE_MAAPINGTARGET_TBL','CUSTOMER_MASTER_TARGET'],index=None,placeholder='Select target table')
|
|
st.session_state.target_selector_un = target_selector
|
|
|
|
if table_selector is not None and target_selector is not None:
|
|
btn=button('RUN',key='RUN_GENAI_UN')
|
|
if target_selector is not None and btn and f'{table_selector}_{target_selector}_map_un' not in st.session_state:
|
|
query2="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+table_selector+"]"
|
|
i_df = pd.read_sql_query(query2,con=conn)
|
|
|
|
i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1)
|
|
st.session_state['source_data_un'] = i_df
|
|
|
|
|
|
|
|
query3="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+target_selector+"]"
|
|
tgt_df=pd.read_sql_query(query3,con=conn).reset_index(drop=True)
|
|
main_list=tgt_df.columns.to_list()
|
|
sub_list=['ID','LOADID','FILE_NAME']
|
|
if any(main_list[i:i+len(sub_list)] == sub_list for i in range(len(main_list) - len(sub_list) + 1)):
|
|
tgt_df=tgt_df.drop(['ID','LOADID','FILE_NAME'],axis=1)
|
|
st.session_state.opt_un= list(tgt_df.columns)
|
|
st.session_state['target_data_un'] = tgt_df.head(20).reset_index()
|
|
|
|
|
|
|
|
|
|
|
|
with st.spinner('Running data on neural network...'):
|
|
df=pd.read_csv('C:\\Applications\\MARCO POLO O AIML\\DATA CATALOG\\pages\\CUSTOMER_MASTER_TRAIN_1306.csv')
|
|
cols=df.columns.tolist()
|
|
data=pd.DataFrame(columns=['DATA','LABEL'])
|
|
temp=pd.DataFrame(columns=['DATA','LABEL'])
|
|
for x in cols:
|
|
temp['DATA']=df[x]
|
|
temp['LABEL']=x
|
|
data=pd.concat([data,temp],ignore_index=True)
|
|
data['DATA']=data['DATA'].astype('string')
|
|
data['LABEL']=data['LABEL'].astype('string')
|
|
data=data.dropna()
|
|
data=data.reset_index(drop=True)
|
|
|
|
|
|
|
|
|
|
|
|
vectorizer = CountVectorizer(analyzer='char_wb', ngram_range=(1, 3), min_df=1)
|
|
X=vectorizer.fit_transform(data['DATA'])
|
|
feature=pd.DataFrame(data=X.toarray(),columns=vectorizer.get_feature_names_out())
|
|
data1=pd.concat([data,feature],axis=1)
|
|
|
|
|
|
from sklearn.feature_selection import chi2
|
|
chi_x=data1.drop(['DATA','LABEL'],axis=1)
|
|
chi_y=data1['LABEL']
|
|
chi_scores=chi2(chi_x,chi_y)
|
|
p_values=pd.Series(chi_scores[1],index=chi_x.columns)
|
|
p_values=p_values.sort_values(ascending=True).reset_index()
|
|
feature_chi=p_values['index'][:1000]
|
|
data2=data1[feature_chi.to_list()]
|
|
data2=pd.concat([data,data2],axis=1)
|
|
|
|
|
|
def count_digits(str1):
|
|
return len("".join(re.findall("\d+", str1)))
|
|
|
|
def count_vowels(string):
|
|
vowels = "aeiouAEIOU"
|
|
count = 0
|
|
for char in string:
|
|
if char in vowels:
|
|
count += 1
|
|
return count
|
|
|
|
def count_special_character(string):
|
|
special_characters = "!@#$%^&*()-+?_=,<>/"
|
|
special_char = 0
|
|
for i in range(0, len(string)):
|
|
if (string[i] in special_characters):
|
|
special_char += 1
|
|
return special_char
|
|
|
|
def count_spaces(string):
|
|
spaces = 0
|
|
for char in string:
|
|
if char == " ":
|
|
spaces += 1
|
|
return spaces
|
|
|
|
data2['LENGTH']=data2['DATA'].apply(lambda x:len(x))
|
|
data2['digit_c']=data2['DATA'].apply(lambda x:count_digits(x))
|
|
data2['vowel_c']=data2['DATA'].apply(lambda x:count_vowels(x))
|
|
data2['spchar_c']=data2['DATA'].apply(lambda x:count_special_character(x))
|
|
data2['space_c']=data2['DATA'].apply(lambda x:count_spaces(x))
|
|
|
|
chi_scores1=chi2(data2[['LENGTH','digit_c','vowel_c','spchar_c','space_c']],data2['LABEL'])
|
|
p_values1=pd.Series(chi_scores1[1],index=data2[['LENGTH','digit_c','vowel_c','spchar_c','space_c']].columns).sort_values(ascending=True).reset_index()
|
|
|
|
|
|
import tensorflow as tf
|
|
from tensorflow.keras import layers
|
|
from tensorflow import keras
|
|
|
|
from sklearn.model_selection import train_test_split
|
|
from ast import literal_eval
|
|
|
|
train_df, test_df = train_test_split(data2,test_size=.1,stratify=data2['LABEL'].values)
|
|
val_df = test_df.sample(frac=0.5)
|
|
test_df.drop(val_df.index, inplace=True)
|
|
|
|
terms = tf.ragged.constant(data2['LABEL'].values)
|
|
lookup = tf.keras.layers.StringLookup(output_mode="one_hot")
|
|
lookup.adapt(terms)
|
|
vocab = lookup.get_vocabulary()
|
|
|
|
def invert_multi_hot(encoded_labels):
|
|
hot_indices = np.argwhere(encoded_labels == 1.0)[..., 0]
|
|
return np.take(vocab, hot_indices)
|
|
|
|
max_seqlen = 150
|
|
batch_size = 128
|
|
padding_token = "<pad>"
|
|
auto = tf.data.AUTOTUNE
|
|
|
|
feature_tf=data2.columns.tolist()[2:]
|
|
|
|
def make_dataset(dataframe,feature,batch_size,is_train=True):
|
|
labels = tf.ragged.constant(dataframe["LABEL"].values)
|
|
label_binarized = lookup(labels).numpy()
|
|
dataset = tf.data.Dataset.from_tensor_slices(
|
|
(dataframe[feature].values, label_binarized)
|
|
)
|
|
dataset = dataset.shuffle(batch_size * 10) if is_train else dataset
|
|
return dataset.batch(batch_size)
|
|
|
|
train_dataset = make_dataset(train_df,feature_tf,batch_size, is_train=True)
|
|
validation_dataset = make_dataset(val_df,feature_tf,batch_size, is_train=False)
|
|
test_dataset = make_dataset(test_df,feature_tf,batch_size, is_train=False)
|
|
|
|
|
|
shallow_mlp_model = keras.Sequential(
|
|
[
|
|
layers.Dense(512, activation="relu"),
|
|
layers.Dense(256, activation="relu"),
|
|
layers.Dense(lookup.vocabulary_size(), activation="softmax"),
|
|
]
|
|
)
|
|
|
|
shallow_mlp_model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["CategoricalAccuracy"])
|
|
epochs=20
|
|
history = shallow_mlp_model.fit(train_dataset, validation_data=validation_dataset, epochs=epochs)
|
|
|
|
|
|
_, category_acc = shallow_mlp_model.evaluate(test_dataset)
|
|
|
|
|
|
|
|
|
|
i_cols=i_df.columns
|
|
i_cols=i_df.columns.tolist()
|
|
i_data=pd.DataFrame(columns=['DATA','LABEL'])
|
|
i_temp=pd.DataFrame(columns=['DATA','LABEL'])
|
|
for x in i_cols:
|
|
i_temp['DATA']=i_df[x]
|
|
i_temp['LABEL']=x
|
|
i_data=pd.concat([i_data,i_temp],ignore_index=True)
|
|
i_data['DATA']=i_data['DATA'].astype('string')
|
|
i_data['LABEL']=i_data['LABEL'].astype('string')
|
|
i_data=i_data.dropna()
|
|
i_data=i_data.reset_index(drop=True)
|
|
i_X=vectorizer.transform(i_data['DATA'])
|
|
i_feature=pd.DataFrame(data=i_X.toarray(),columns=vectorizer.get_feature_names_out())
|
|
i_data1=pd.concat([i_data,i_feature],axis=1)
|
|
i_data2=i_data1[feature_chi.to_list()]
|
|
i_data2=pd.concat([i_data,i_data2],axis=1)
|
|
i_data2['LENGTH']=i_data2['DATA'].apply(lambda x:len(x))
|
|
i_data2['digit_c']=i_data2['DATA'].apply(lambda x:count_digits(x))
|
|
i_data2['vowel_c']=i_data2['DATA'].apply(lambda x:count_vowels(x))
|
|
i_data2['spchar_c']=i_data2['DATA'].apply(lambda x:count_special_character(x))
|
|
i_data2['space_c']=i_data2['DATA'].apply(lambda x:count_spaces(x))
|
|
i_run_dataset=tf.data.Dataset.from_tensor_slices((i_data2[feature_tf].values,lookup(tf.ragged.constant(i_data2["LABEL"].values)).numpy())).batch(649)
|
|
|
|
|
|
i_predicted_probabilities = shallow_mlp_model.predict(i_run_dataset)
|
|
i_predicted_labels = np.where(i_predicted_probabilities == i_predicted_probabilities.max(axis=1, keepdims=True), 1, 0)
|
|
i_predicted_label_df=pd.DataFrame(i_predicted_labels,columns=vocab)
|
|
i_predicted_label_df1=pd.concat([i_data,i_predicted_label_df],axis=1)
|
|
i_predicted_label_df1['PREDICTION']=i_predicted_label_df1[vocab].idxmax(axis=1)
|
|
i_result=i_predicted_label_df1[['DATA','LABEL','PREDICTION']]
|
|
|
|
column_mapping=pd.DataFrame(columns=['source','target'])
|
|
temp_column_mapping=pd.DataFrame(columns=['source','target'])
|
|
for i in i_df.columns.to_list():
|
|
temp_df1=i_result.loc[i_result['LABEL']==i]
|
|
temp_max=temp_df1['PREDICTION'].value_counts().idxmax()
|
|
temp_column_mapping.loc[0]=[i,temp_max]
|
|
column_mapping=pd.concat([column_mapping,temp_column_mapping],ignore_index=True)
|
|
not_null=i_df.count().reset_index()
|
|
tot_rows=i_df.shape[0]
|
|
not_null['not null percentage']=not_null[0]/tot_rows
|
|
coltobemodified=not_null[not_null['not null percentage']<.05]['index'].to_list()
|
|
column_mapping.loc[column_mapping['source'].isin(coltobemodified), 'target'] = '**TOO FEW COLUMN VALUES**'
|
|
st.success('Mapping completed successfully!')
|
|
st.session_state[f'{table_selector}_{target_selector}_map_un'] = column_mapping.copy()
|
|
|
|
|
|
|
|
|
|
if f'{table_selector}_{target_selector}_map_un' in st.session_state and btn:
|
|
taba, tabb, tabc = st.tabs(['Mappings Generated', 'Source Table Preview', 'Target Table Preview'])
|
|
with tabb:
|
|
st.subheader('Souce Data Preview')
|
|
with stylable_container(
|
|
key=f"source_container_with_border",
|
|
css_styles="""
|
|
{
|
|
border: 1px solid white;
|
|
border-radius: 0.5rem;
|
|
padding: calc(1em - 1px);
|
|
width: 103%; /* Set container width to 100% */
|
|
}
|
|
"""
|
|
):
|
|
st.dataframe(st.session_state['source_data_un'])
|
|
with tabc:
|
|
st.subheader('Target Table Preview')
|
|
with stylable_container(
|
|
key=f"target_container_with_border",
|
|
css_styles="""
|
|
{
|
|
border: 1px solid white;
|
|
border-radius: 0.5rem;
|
|
padding: calc(1em - 1px);
|
|
width: 103%; /* Set container width to 100% */
|
|
}
|
|
"""
|
|
):
|
|
st.write(st.session_state['target_data_un'])
|
|
with taba:
|
|
st.subheader("Mapping Generated:")
|
|
with stylable_container(
|
|
key=f"container_with_border",
|
|
css_styles="""
|
|
{
|
|
border: 1px solid white;
|
|
border-radius: 0.5rem;
|
|
padding: calc(1em - 1px);
|
|
width: 103%; /* Set container width to 100% */
|
|
}
|
|
"""
|
|
):
|
|
|
|
edited_map_df = st.data_editor(
|
|
st.session_state[f'{table_selector}_{target_selector}_map_un'],
|
|
column_config={
|
|
"target": st.column_config.SelectboxColumn(
|
|
"Available Column Names",
|
|
help="Please Verify/Change the Target Column Mapping",
|
|
width="medium",
|
|
options=st.session_state.opt_un,
|
|
required=True,
|
|
)
|
|
},
|
|
hide_index=False,
|
|
num_rows = 'fixed',
|
|
use_container_width = True
|
|
)
|
|
val = button("Validate", key="Val_un")
|
|
if val:
|
|
st.session_state[f'{table_selector}_{target_selector}_map_un'].update(edited_map_df)
|
|
dup= len(st.session_state[f'{table_selector}_{target_selector}_map_un'][st.session_state[f'{table_selector}_{target_selector}_map_un']['target'].duplicated()])
|
|
if dup != 0:
|
|
dup_index= list(st.session_state[f'{table_selector}_{target_selector}_map_un'][st.session_state[f'{table_selector}_{target_selector}_map_un']['target'].duplicated(keep=False)].index)
|
|
dup_mess=str(dup_index[0])
|
|
for val in dup_index[1:]:
|
|
dup_mess = dup_mess + f' and {str(val)}'
|
|
st.error(f"One to Many Column mapping Exists. Please Check Mapping Number: {dup_mess}")
|
|
else:
|
|
st.success("Mapping Validated! You can proceed for Mapping")
|
|
|
|
migrate= st.button("Mapping")
|
|
if migrate:
|
|
st.subheader('Mapping PHASE')
|
|
m_queiry1="select count(*) as TARGET_COUNT_CURRENT from ["+ st.session_state.table1_un['TABLE_SCHEMA'][0]+"].["+st.session_state.target_selector_un+"]"
|
|
|
|
old_count=pd.read_sql_query(m_queiry1,con=conn)
|
|
st.write('RECORDS IN TARGET TABLE BEFORE Mapping',old_count)
|
|
with st.spinner('Mapping in progress...'):
|
|
cursor1=conn.cursor()
|
|
q1='INSERT INTO ['+ st.session_state.table1_un['TABLE_SCHEMA'][0]+'].['+st.session_state.target_selector_un+'] ("'
|
|
q2=' select "'
|
|
for i,x in enumerate(st.session_state['source_data_un'].columns.values.tolist()):
|
|
t=st.session_state[f'{table_selector}_{target_selector}_map_un'].loc[st.session_state[f'{table_selector}_{target_selector}_map_un']['source']==x,'target'].values[0]
|
|
if i==len(st.session_state['source_data_un'].columns.values.tolist())-1:
|
|
q_temp1=t+'") '
|
|
q_temp2=x+'" '
|
|
else:
|
|
q_temp1=t+'", "'
|
|
q_temp2=x+'", "'
|
|
q1=q1+q_temp1
|
|
q2=q2+q_temp2
|
|
|
|
|
|
q=q1+q2+' from ['+ st.session_state.table1_un['TABLE_SCHEMA'][0]+'].['+table_selector+']'
|
|
|
|
cursor1.execute(q)
|
|
conn.commit()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.success('Mapping completed successfully!')
|
|
m_query5="select count(*) as TARGET_COUNT_AFTER_Mapping from ["+ st.session_state.table1_un['TABLE_SCHEMA'][0]+"].["+st.session_state.target_selector_un+"]"
|
|
new_count=pd.read_sql_query(m_query5,con=conn)
|
|
conn.close()
|
|
st.write('RECORDS IN TARGET TABLE AFTER Mapping',new_count)
|
|
|
|
|
|
|
|
if mode == 'Unsupervised Mapping(You Do Not Have Sufficient Sample Data in Target Template)':
|
|
conn = pyodbc.connect("Driver={ODBC Driver 17 for SQL Server};"
|
|
"Server=sql-ext-dev-uks-001.database.windows.net;"
|
|
"Database=sqldb-ext-dev-uks-001;"
|
|
"UID=dbadmin;"
|
|
"PWD=mYpa$$w0rD" )
|
|
query1="select * from INFORMATION_SCHEMA.TABLES where TABLE_SCHEMA='dbo' ORDER BY TABLE_NAME ASC"
|
|
table1=pd.read_sql_query(query1,con=conn)
|
|
st.session_state.table1= table1
|
|
table1['TABLE_NAME']=table1['TABLE_NAME'].astype('str')
|
|
|
|
|
|
table_selector=st.multiselect('SOURCE TABLE NAME(S)',['TCM', 'TCVM','TEM', 'TPM', 'TPP', 'TPT', 'TRM', 'TSCM', 'TSM'],default=None,placeholder='Select table for automated column mapping')
|
|
|
|
|
|
|
|
target_selector = st.file_uploader("UPLOAD TARGET METADATA FILE", type=['csv'])
|
|
tgt_name=None
|
|
mapping_df=None
|
|
if target_selector is not None:
|
|
mapping_df = pd.read_csv(target_selector)
|
|
tgt_name = target_selector.name
|
|
|
|
required_columns = ['Field_Name', 'Primary Key']
|
|
if all(col in mapping_df.columns for col in required_columns):
|
|
field_names = mapping_df['Field_Name'].tolist()
|
|
tgt_df = pd.DataFrame(columns=field_names)
|
|
|
|
st.session_state.target_selector = target_selector
|
|
|
|
|
|
if mapping_df is not None:
|
|
st.session_state.mapping_df = mapping_df
|
|
|
|
if table_selector is not None:
|
|
if len(table_selector)==1:
|
|
query2="select * from ["+ table1['TABLE_SCHEMA'][0]+"].["+str(table_selector[0])+"]"
|
|
i_df = pd.read_sql_query(query2,con=conn)
|
|
|
|
if set(['ID','LOADID','FILE_NAME']).issubset(i_df.columns):
|
|
i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1)
|
|
elif len(table_selector)>1:
|
|
|
|
dataframes = {}
|
|
col_names = []
|
|
for tab in table_selector:
|
|
query2_2= "select * from [dbo].["+tab+"]"
|
|
dataframes[f'{tab}'] = pd.read_sql_query(query2_2,con=conn)
|
|
col_names = col_names + list(dataframes[f'{tab}'].columns)
|
|
|
|
tab_names = table_selector
|
|
metadata = MultiTableMetadata()
|
|
metadata.detect_from_dataframes(
|
|
data= dataframes
|
|
)
|
|
multi_python_dict = metadata.to_dict()
|
|
|
|
rlist1=multi_python_dict['relationships']
|
|
relationships=pd.DataFrame(columns=['PARENT TABLE','CHILD TABLE','PARENT PRIMARY KEY','CHILD FOREIGN KEY'])
|
|
for i in range(len(rlist1)):
|
|
rlist=rlist1[i]
|
|
nrow=pd.DataFrame({'PARENT TABLE':rlist['parent_table_name'],'CHILD TABLE':rlist['child_table_name'],'PARENT PRIMARY KEY':rlist['parent_primary_key'],'CHILD FOREIGN KEY':rlist['child_foreign_key']},index=[i])
|
|
relationships=pd.concat([relationships,nrow],ignore_index=True)
|
|
|
|
filtered_relationships = relationships[
|
|
(relationships['PARENT TABLE'].isin(table_selector)) &
|
|
(relationships['CHILD TABLE'].isin(table_selector))
|
|
]
|
|
|
|
i_df = pd.DataFrame()
|
|
|
|
for _, row in filtered_relationships.iterrows():
|
|
parent_table = row['PARENT TABLE']
|
|
child_table = row['CHILD TABLE']
|
|
parent_key = row['PARENT PRIMARY KEY']
|
|
child_key = row['CHILD FOREIGN KEY']
|
|
|
|
if parent_table in dataframes and child_table in dataframes:
|
|
parent_df = dataframes[parent_table]
|
|
child_df = dataframes[child_table]
|
|
|
|
left_joined_df = pd.merge(
|
|
parent_df, child_df, how='left',
|
|
left_on=parent_key, right_on=child_key,
|
|
suffixes=(f'_{parent_table}', f'_{child_table}')
|
|
)
|
|
|
|
for col in child_df.columns:
|
|
if col != child_key:
|
|
left_joined_df.rename(
|
|
columns={col: f'{col}_{parent_table}_{child_table}'}, inplace=True
|
|
)
|
|
|
|
right_joined_df = pd.merge(
|
|
parent_df, child_df, how='left',
|
|
left_on=child_key, right_on=parent_key,
|
|
suffixes=(f'_{child_table}', f'_{parent_table}')
|
|
)
|
|
|
|
for col in child_df.columns:
|
|
if col != child_key:
|
|
left_joined_df.rename(
|
|
columns={col: f'{col}_{child_table}_{parent_table}'}, inplace=True
|
|
)
|
|
|
|
i_df = pd.concat([i_df, left_joined_df, right_joined_df], ignore_index=True)
|
|
|
|
i_df = i_df.loc[:, ~i_df.columns.duplicated()]
|
|
|
|
for table_name in table_selector:
|
|
if table_name in dataframes:
|
|
for col in dataframes[table_name].columns:
|
|
if col in i_df.columns and not any([col.endswith(f'_{table_name}') for table in table_selector]):
|
|
i_df.rename(columns={col: f'{col}_{table_name}'}, inplace=True)
|
|
|
|
if set(['ID','LOADID','FILE_NAME']).issubset(i_df.columns):
|
|
i_df=i_df.drop(['ID','LOADID','FILE_NAME'],axis=1)
|
|
|
|
i_df = i_df.loc[:, ~i_df.columns.duplicated()]
|
|
|
|
|
|
if table_selector is not None:
|
|
if tgt_name is not None:
|
|
btn= button('RUN', key='RUN_GENAI')
|
|
|
|
if target_selector is not None and btn and f'{table_selector}_{tgt_name}_map' not in st.session_state:
|
|
st.session_state['source_data'] = i_df.sample(20).reset_index()
|
|
if target_selector is not None:
|
|
|
|
|
|
|
|
|
|
st.session_state['opt'] = list(tgt_df.columns)
|
|
st.session_state['target_data'] = tgt_df.head(20).reset_index()
|
|
|
|
with st.spinner('Processing Data...'):
|
|
selected_df = pd.DataFrame()
|
|
|
|
|
|
for col in i_df.columns:
|
|
|
|
non_null_values = i_df[col][i_df[col] != ''].dropna().astype(str).str.strip().unique()
|
|
|
|
|
|
selected_values = list(non_null_values[:10])
|
|
selected_values = selected_values + [""] * (10 - len(selected_values))
|
|
|
|
selected_df[col] = selected_values
|
|
|
|
mapping_df = st.session_state.mapping_df
|
|
|
|
tables_list = table_selector
|
|
|
|
|
|
table_columns = {}
|
|
|
|
|
|
for table in tables_list:
|
|
query = f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table}'"
|
|
|
|
cursor = conn.cursor()
|
|
cursor.execute(query)
|
|
|
|
|
|
columns = [row[0] for row in cursor.fetchall()]
|
|
|
|
|
|
table_columns[table] = columns
|
|
|
|
if 'table_columns' not in st.session_state:
|
|
st.session_state.table_columns = table_columns
|
|
|
|
story = f""" Details of the source table:
|
|
table columns: {str(list(i_df.columns))}
|
|
column datatypes: {str(i_df.dtypes.to_string())}
|
|
table sample data: {selected_df.head(10).to_string()}
|
|
Source Tables selected : {str(list(table_selector))}
|
|
Source Table columns are given as (col_name)_(table_name)
|
|
|
|
Joining conditions should be based on relationship table which is : {relationships.to_string()}
|
|
|
|
Details of the target table:
|
|
table columns: {str(list(tgt_df.columns))}
|
|
column datatypes: {str(tgt_df.dtypes.to_string())}
|
|
table sample data: {tgt_df.head(10).to_string()}
|
|
mapping details: {mapping_df.to_string()}
|
|
|
|
Source Column names should match from this dictionary: {str(table_columns)}
|
|
"""
|
|
response = model.generate_content(
|
|
textwrap.dedent("""
|
|
Please return JSON describing the possible **one to one mapping** between source table and target table using this following schema:
|
|
|
|
{"Mapping": list[MAPPING]}
|
|
|
|
MAPPING = {"Target_Table": str, "Field_Name": str, "Source Table": str, "Source Column": str, "Joining Keys": str, "Primary Key": str, "Direct/Derived": str, "Join Tables": str, "Join Condition": str, "Mapping Confidence": percentage, "Column Operations": str, "Alias": str, "Granularity": str, "Filter Condition": str, "Clauses": str, "Ordering": str}
|
|
|
|
The first six columns are provided in mapping details. **THE FIRST SIX COLUMNS OF JSON SHOULD HAVE EXACTLY SAME VALUES AS PROVIDED IN MAPPING DETAILS**
|
|
**THE PRIMARY KEY IS COMING FROM THE PARENT TABLE. IF SOURCE TABLE IS NOT THE PRIMARY KEY TABLE, THEN 'Direct/Derived' SHOULD BE LABELLED AS DERIVED, OTHERWISE DIRECT**
|
|
**JOIN TABLES SHOULD BE WRITTEN ONLY IN CASE OF DERIVED LABEL, JOIN TABLES SHOULD BE THE PARENT TABLE AND THE SOURCE TABLE (IF DIFFERENT)**
|
|
**JOINING CONDITION SHOULD BE BASED ON THE TABLE AND COULMN WHICH HAS PRIMARY KEY AND JOINING TYPE SHOULD BE LEFT OUTER**
|
|
**ALIAS SHOULD BE SAME AS FIELD_NAME, GRANULARITY SHOULD BE 1:1**
|
|
|
|
1. For example, Field_Name is 'Product Name', Source Table is 'TRM', Source Column is 'PRODUCT_NAME', 'Joining Keys':'PRODUCT_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DERIVED', 'Join Type':'LEFT OUTER', 'Join Tables':'TPM, TRM', 'Join Condition':'TPM.PRODUCT_ID=TRM.PRODUCT_ID', 'Alias': 'PRODUCT_NAME', 'Granularity': '1:1'}
|
|
Joining is done on TPM since the primary key POLICY_ID is taken from TPM table. So TPM.PRODUCT_ID = TRM.PRODUCT_ID is joining condition
|
|
|
|
2. For example, Field_Name is 'Office Code', Source Table is 'TPM', Source Column is 'OFFICE_CD', 'Joining Keys':'POLICY_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DIRECT', 'Join Type':None, 'Join Tables':None, 'Join Condition':None, 'Alias': 'OFFICE_CD', 'Granularity': '1:1'}
|
|
Joining is not done since TPM is the parent table. So, 'Direct/Derived": 'DIRECT'. POLICY_ID is the primary key here.
|
|
|
|
3. For example, Field_Name is 'Policy Submission Date', Source Table is 'TSM', Source Column is 'POLICY_SUBMISSION_DT', 'Joining Keys':'POLICY_ID', 'Primary Key' not labelled, then {'Direct/Derived": 'DERIVED', 'Join Type':'LEFT OUTER', 'Join Tables':'TPM, TSM', 'Join Condition':'TPM.POLICY_ID=TRM.POLICY_ID', 'Alias': 'POLICY_SUBMISSION_DT', 'Granularity': '1:1'}
|
|
Joining is done on TPM since the primary key POLICY_ID is taken from TPM table. So TPM.POLICY_ID = TRM.POLICY_ID is joining condition
|
|
|
|
If Source Column is POLICY_ID_TPM, then change it to POLICY_ID.
|
|
**Source Column should not contain the '_TPM', '_TRM', '_TSM', '_TPP' part at the end.**
|
|
|
|
All Target fields are required. The JSON keys will be same as the column names in mapping details.Validate the mapping as given in mapping details.
|
|
Ignore the columns where hardcoded values are there , such as Current Flag, Start Date, End Date, Etl Job ID,Etl Batch ID,Etl Inserted Date,Etl Updated Date. leave them blank. For other fields, there has to be mapping.
|
|
|
|
If you are confused on which source tables to map, then provide MAPPING CONFIDENCE LESS THAN 90
|
|
|
|
ALL THE JSON KEYS ARE MANDATORY: 'Target_Table', 'Field_Name', 'Source Table', 'Source Column', 'Joining Keys', 'Primary Key', 'Direct/Derived', 'Join Type', 'Join Tables', 'Join Condition', 'Mapping Confidence', 'Column Operations', 'Alias', 'Granularity', 'Filter Condition', 'Clauses', 'Ordering'
|
|
|
|
Important: Only return a single piece of valid JSON text. All fields are required. Please MAP all ***TARGET fields***. If you struggle to map then give low confidence score but YOU HAVE TO MAP ANYWAY. ****MAKE SURE IT IS A **one to one mapping** ****
|
|
|
|
Here is the table details:
|
|
|
|
""") + story
|
|
)
|
|
|
|
|
|
res= response.text.replace("\n", '').replace("`", '').replace('json','')
|
|
map = print(json.dumps(json.loads(res), indent=2))
|
|
data = json.loads(res)
|
|
map_df = pd.json_normalize(data, record_path=['Mapping'])
|
|
st.session_state[f'{table_selector}_{tgt_name}_map'] = map_df.copy()
|
|
|
|
|
|
if f'{table_selector}_{tgt_name}_map' in st.session_state and btn:
|
|
taba, tabb, tabc = st.tabs(['Mappings Generated', 'Source Table Preview', 'Target Table Preview'])
|
|
with tabc:
|
|
st.subheader('Target Table Preview')
|
|
with stylable_container(
|
|
key=f"source_container_with_border",
|
|
css_styles="""
|
|
{
|
|
border: 1px solid white;
|
|
border-radius: 0.5rem;
|
|
padding: calc(1em - 1px);
|
|
width: 103%; /* Set container width to 100% */
|
|
}
|
|
"""
|
|
):
|
|
st.dataframe(st.session_state['target_data'].head(0))
|
|
with tabb:
|
|
st.subheader('Source Table Preview')
|
|
with stylable_container(
|
|
key=f"target_container_with_border",
|
|
css_styles="""
|
|
{
|
|
border: 1px solid white;
|
|
border-radius: 0.5rem;
|
|
padding: calc(1em - 1px);
|
|
width: 103%; /* Set container width to 100% */
|
|
}
|
|
"""
|
|
):
|
|
st.write(st.session_state['source_data'])
|
|
with taba:
|
|
st.subheader("Most Probable Mapping Generated:")
|
|
with stylable_container(
|
|
key=f"container_with_border",
|
|
css_styles="""
|
|
{
|
|
border: 1px solid white;
|
|
border-radius: 0.5rem;
|
|
padding: calc(1em - 1px);
|
|
width: 103%; /* Set container width to 100% */
|
|
}
|
|
"""
|
|
):
|
|
edited_map_df = st.data_editor(
|
|
st.session_state[f'{table_selector}_{tgt_name}_map'],
|
|
column_config={
|
|
"Target Column Name": st.column_config.SelectboxColumn(
|
|
"Target Columns",
|
|
help="Please Verify/Change the Target Column Mapping",
|
|
width="medium",
|
|
options=st.session_state.opt,
|
|
required=True,
|
|
)
|
|
},
|
|
hide_index=False,
|
|
num_rows = 'dynamic',
|
|
use_container_width = True
|
|
)
|
|
|
|
success_show=1
|
|
if success_show==1:
|
|
st.success(f"{(edited_map_df['Mapping Confidence']>90).mean().round(2)*100}% of Columns Mapped with more than 90% Mapping Confidence")
|
|
|
|
mapped_uploader = st.file_uploader("UPLOAD REVISED MAPPING (OPTIONAL)", type=['csv'])
|
|
if mapped_uploader is not None:
|
|
success_show=0
|
|
edited_map_df = pd.read_csv(mapped_uploader)
|
|
st.write(edited_map_df)
|
|
st.success("Mapping Revised!")
|
|
|
|
val = button("Validate", key="Val")
|
|
if val:
|
|
st.session_state[f'{table_selector}_{tgt_name}_map'].update(edited_map_df)
|
|
dup= len(st.session_state[f'{table_selector}_{tgt_name}_map'][st.session_state[f'{table_selector}_{tgt_name}_map']['Field_Name'].duplicated()])
|
|
|
|
error_messages = []
|
|
table_columns = st.session_state.table_columns
|
|
|
|
for _, (index, row) in enumerate(edited_map_df.iterrows()):
|
|
source_table = row['Source Table']
|
|
source_column = row['Source Column']
|
|
|
|
if source_column not in table_columns.get(source_table, []) and (source_column is not None) and (source_table is not None) and (source_table in table_selector):
|
|
error_messages.append(f"Column '{source_column}' not found in table '{source_table}'.\n")
|
|
|
|
|
|
if error_messages:
|
|
validation_result = "\n".join(error_messages)
|
|
else:
|
|
validation_result = "Success"
|
|
|
|
if dup != 0:
|
|
dup_index= list(st.session_state[f'{table_selector}_{tgt_name}_map'][st.session_state[f'{table_selector}_{tgt_name}_map']['Target Column Name'].duplicated(keep=False)].index)
|
|
dup_mess=str(dup_index[0])
|
|
for val in dup_index[1:]:
|
|
dup_mess = dup_mess + f' and {str(val)}'
|
|
st.error(f"One to Many Column mapping Exists. Please Check Mapping Number: {dup_mess}")
|
|
elif validation_result != "Success":
|
|
st.error(validation_result)
|
|
else:
|
|
st.success("Mapping Validated!")
|
|
|
|
df_tbl_dtls = pd.read_csv(r'tbl_dtl.csv')
|
|
|
|
with pd.ExcelWriter('Final.xlsx') as writer:
|
|
edited_map_df.to_excel(writer, sheet_name='POLICY', index=False)
|
|
df_tbl_dtls.to_excel(writer, sheet_name='Table Details', index=False)
|
|
|
|
path = 'Final.xlsx'
|
|
|
|
temp_table = None
|
|
for x in pd.ExcelFile(path).sheet_names:
|
|
if x != 'Table Details':
|
|
temp_table = x
|
|
|
|
df = read_excel(path, temp_table)
|
|
table_details_df = read_excel(path, 'Table Details')
|
|
table_details_mapping = table_details_df.set_index('Table Name')[['ETL Timestamp','Change Timestamp','ETL Filter']].T.to_dict('list')
|
|
|
|
base_table, base_pk, proc_query, incr_join_grps, incr_table_join_info, incr_join, temp_table_schema = generate_sql(temp_table)
|
|
sql_query = ''
|
|
for x in proc_query:
|
|
sql_query += x
|
|
sql_query += '\n'
|
|
spark_sql = generate_spark(proc_query, incr_join_grps, base_table, base_pk, incr_table_join_info, incr_join, temp_table_schema)
|
|
|
|
|
|
col21, col22= st.columns([1,4])
|
|
with col21:
|
|
st.download_button('Download SQL Statement', sql_query, file_name='sql_code.txt')
|
|
with col22:
|
|
st.download_button('Download Spark Statement', spark_sql, file_name='spark_code.txt')
|
|
|
|
|