pwc-india commited on
Commit
ba3fa23
1 Parent(s): 0940575

Update pages/5SOURCE TO TARGET MAPPING.py

Browse files
Files changed (1) hide show
  1. pages/5SOURCE TO TARGET MAPPING.py +597 -595
pages/5SOURCE TO TARGET MAPPING.py CHANGED
@@ -54,632 +54,634 @@ st.markdown("""
54
  }
55
  </style>
56
  """, unsafe_allow_html=True)
57
- ######
58
- def main():
59
- # st.title('PAGE TITLE') # Change this for each page
60
- sidebar()
61
- ########
62
- def read_excel(path, sheet):
63
- df = pd.read_excel(path, sheet_name = sheet, dtype = 'str')
64
- return df
65
-
66
- def split_join_condition(join_condition):
67
- conditions = []
68
- condition = ''
69
- bracket_count = 0
70
-
71
- for char in join_condition:
72
- if char == '(':
73
- bracket_count += 1
74
- elif char == ')':
75
- bracket_count -+ 1
76
- if char == ',' and bracket_count == 0:
77
- conditions.append(condition.strip())
78
- condition = ''
79
- else:
80
- condition += char
81
- if condition:
82
  conditions.append(condition.strip())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
- return conditions
 
 
85
 
86
- def join_incr(join_conditions):
87
- join = []
88
- join_pattern = re.compile(r'(\w+\.\w+)\s*=\s*(\w+\w.\w+)', re.IGNORECASE)
89
- for join_condition in join_conditions:
90
- parts = re.split(r'\sAND\s|\sOR\s', join_condition, flags = re.IGNORECASE)
91
- temp = [x.strip() for x in parts if join_pattern.match(x.strip())]
92
- join.append(' AND '.join(temp))
93
- return join
94
 
95
- def generate_sql(temp_table):
96
- proc_query = []
97
- base_table = None
98
-
99
- source_table_schema = 'MAIN.GOLD'
100
- temp_table_schema = 'MAIN.GOLD'
101
- base_pk = []
102
-
103
- join_fields = set()
104
-
105
- for _,row in df.iterrows():
106
- source_table = row['Source Table']
107
- primary_key = row['Primary Key']
108
- source_column = row['Source Column']
109
- alias = row['Alias']
110
- joining_keys = row['Joining Keys']
111
-
112
- if not base_table:
113
- if primary_key == 'Y':
114
- base_table = source_table
115
- base_pk.append(joining_keys)
116
-
117
- if pd.notna(joining_keys):
118
- keys = [x.strip() for x in joining_keys.split(',')]
119
- for x in keys:
120
- if x not in join_fields:
121
- join_fields.add(x)
122
-
123
- unique_cols = ['Source Table', 'Joining Keys', 'Primary Key', 'Join Type','Join Tables','Join Condition']
124
- unique_df = df.drop_duplicates(subset = unique_cols)
125
-
126
- incremantal_mapping = {}
127
- incr_joins = {}
128
-
129
- for _,row in unique_df.iterrows():
130
-
131
- source_table = row['Source Table']
132
- source_column = row['Source Column']
133
- joining_keys = row['Joining Keys']
134
- primary_key = row['Primary Key']
135
- direct_derived = row['Direct/Derived']
136
- join_type = row['Join Type']
137
- join_tables = row['Join Tables']
138
- join_condition = row['Join Condition']
139
-
140
- if source_table == base_table:
141
- if primary_key == 'Y':
142
- key = (source_table, joining_keys, join_type, join_tables, join_condition)
143
- key1 = source_table
144
- else:
145
- continue
146
- else:
147
  key = (source_table, joining_keys, join_type, join_tables, join_condition)
148
  key1 = source_table
149
- if pd.notna(direct_derived) and pd.notna(source_table) and pd.notna(source_column):
150
- if key not in incremantal_mapping:
151
- incremantal_mapping[key] = {
152
- 'source_table': source_table,
153
- 'joining_keys':joining_keys,
 
 
 
 
 
 
 
 
 
 
 
 
154
  'join_type': join_type,
155
- 'join_tables': join_tables,
156
  'join_condition': join_condition
157
  }
158
- if key1 not in incr_joins:
159
- if pd.notna(direct_derived) and direct_derived == 'DERIVED':
160
- incr_joins[key1] = {
161
- 'join_type': join_type,
162
- 'join_tables': ', '.join([x.strip() for x in join_tables.split(',') if x != base_table]),
163
- 'join_condition': join_condition
164
- }
165
- incremental_df = pd.DataFrame(incremantal_mapping.values())
166
- incr_join_grps = incremental_df.groupby(['source_table'])
167
- proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.{temp_table}_INCR;')
168
-
169
- incr_table_join_info = {}
170
- for _,row in incremental_df.iterrows():
171
- source_table = row['source_table']
172
-
173
- if source_table != base_table:
174
- joining_keys = row['joining_keys']
175
- join_type = row['join_type']
176
- join_tables = [x.strip() for x in row['join_tables'].split(',')]
177
- index = join_tables.index(source_table)
178
- join_condition = [x.strip() for x in row['join_condition'].split(',')][0:index]
179
- incr_table_join_info[source_table] = ', '.join(join_condition)
180
-
181
- incr_query = []
182
- incr_cols = ''
183
- incr_tables = []
184
- incr_join = {}
185
 
186
- for _, group in incr_join_grps:
187
-
188
- for table in _.split():
189
- if base_table != table:
190
-
191
- join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')]
192
- join_keys = [t.strip() for t in ','.join(base_pk).split(',')]
193
- join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')]
194
- join_cond = split_join_condition(incr_table_join_info[table])
195
- join_condition = join_incr(join_cond)
196
- source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
197
-
198
- join_key_list = []
199
- for x in join_keys:
200
- join_key_list.append(f'{base_table}.{x}')
201
- join_key = ', '.join(join_key_list)
202
-
203
- for y in source_table:
204
- sql = f"""
205
- INSERT INTO {temp_table_schema}.{temp_table}_INCR
206
- (
207
- SELECT {join_key}, {table_details_mapping[y][0]}, {table_details_mapping[y][1]}, '{y}', 1, CURRENT_TIMESTAMP
208
- FROM {source_table_schema}.{base_table} {base_table}"""
209
-
210
- incr_join_text = ''
211
- for i in range(len(join_condition)):
212
- sql += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
213
- incr_join_text += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
214
- incr_join[y] = incr_join_text
215
-
216
- sql += f"""
217
- WHERE COALESCE({join_tables[i+1]}.operation,'NA') <> 'D'
218
- AND TO_TIMESTAMP( CAST(SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),1,4) || '-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),5,2) ||'-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),7,2) || ' ' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),9,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),11,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{y}')
219
- );"""
220
-
221
- incr_query.append(sql)
222
- incr_tables.append(y)
223
 
224
- else:
225
- source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
226
- join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')]
227
-
228
- join_key_list = []
229
- for x in join_keys:
230
- join_key_list.append(f'{base_table}.{x}')
231
- join_key = ', '.join(join_key_list)
232
-
233
- incr_cols = join_key
 
 
 
234
  sql = f"""
235
- INSERT INTO {temp_table_schema}.{temp_table}_INCR
236
- (
237
- SELECT {join_key}, {table_details_mapping[base_table][0]}, {table_details_mapping[base_table][1]}, '{base_table}', 1, CURRENT_TIMESTAMP
238
- FROM {source_table_schema}.{base_table} {base_table}
239
- WHERE COALESCE(operation,'NA') <> 'D'
240
- AND TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{base_table}')
241
- );"""
242
- proc_query.append(sql)
243
- incr_tables.append(base_table)
244
-
245
- proc_query.append('\n'.join(incr_query))
246
- proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.INCR1_{temp_table};')
247
-
248
- sql = f"""
249
- INSERT INTO {temp_table_schema}.INCR1_{temp_table}
250
- (
251
- SELECT DISTINCT {incr_cols.replace(f'{base_table}.', '')}
252
- FROM {temp_table_schema}.{temp_table}_INCR
253
- );"""
254
-
255
- proc_query.append(sql)
256
-
257
- incr_table_dict = {}
258
- for table in incr_tables:
259
- if table == base_table:
260
- incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}'
261
- else:
262
- p = [x for x in incr_join[table].split('\n\t') if len(x) > 1]
263
- if len(p) == 1:
264
- incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}'
265
- else:
266
- incr_table_dict[table] = f'{source_table_schema}.{table}'
267
 
268
- s = []
269
- for table in incr_tables:
270
- incr2_sql_list = []
 
 
 
 
 
 
 
 
271
 
272
- if table == base_table:
273
- for key in incr_cols.replace(f'{base_table}.', '').split(','):
274
- incr2_sql_list.append(f"{base_table}.{key} = A.{key}")
275
- incr2_sql_join = ' AND '.join(incr2_sql_list)
276
-
277
  sql = f"""
278
- CREATE TABLE {temp_table_schema}.INCR2_{table}
279
- AS
280
- SELECT
281
- {table}.*
282
- FROM
283
- {source_table_schema}.{table} {table}
284
- INNER JOIN
285
- {temp_table_schema}.INCR1_{temp_table} A ON {incr2_sql_join}; """
286
- proc_query.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};')
287
  proc_query.append(sql)
288
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289
  else:
290
-
291
- p = [x for x in incr_join[table].split('\n\t') if len(x) > 1]
292
- if len(p) == 1:
293
- sql = f"""
294
- CREATE TABLE {temp_table_schema}.INCR2_{table}
295
- AS
296
- SELECT
297
- {table}.*
298
- FROM
299
- {temp_table_schema}.INCR2_{base_table} {base_table} {incr_join[table]};"""
300
- s.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};')
301
- s.append(sql)
302
-
303
- for x in s:
304
- proc_query.append(x)
305
-
306
- select_clause = []
307
- from_clause = []
308
- where_clause = []
309
-
310
- for _,row in df.iterrows():
311
- field_name = row['Field_Name']
312
- source_table = row['Source Table']
313
- source_column = row['Source Column']
314
- joining_keys = row['Joining Keys']
315
- primary_key = row['Primary Key']
316
- direct_derived = row['Direct/Derived']
317
- join_type = row['Join Type']
318
- join_tables = row['Join Tables']
319
- join_condition = row['Join Condition']
320
- column_operation = row['Column Operations']
321
- alias = row['Alias']
322
- granularity = row['Granularity']
323
- filter_condition = row['Filter Condition']
324
- clauses = row['Clauses']
325
- ordering = row['Ordering']
326
-
327
- if pd.notna(direct_derived):
328
- if pd.notna(column_operation):
329
- if len(column_operation.split()) == 1:
330
- select_expr = f'{column_operation.upper()}({source_table}.{source_column})'
331
- else:
332
- select_expr = column_operation
333
- else:
334
- if pd.notna(source_table):
335
- select_expr = f'{source_table}.{source_column}'
336
- else:
337
- select_expr = source_column
338
-
339
- if source_column not in join_fields:
340
- if pd.notna(alias):
341
- select_expr += f' AS {alias}'
342
- else:
343
- if pd.notna(column_operation) and pd.notna(source_column):
344
- select_expr += f' AS {source_column}'
345
-
346
- if direct_derived.upper() == 'DIRECT':
347
- select_clause.append(select_expr)
348
- elif direct_derived.upper() == 'DERIVED_BASE':
349
- select_clause.append(select_expr)
350
-
351
- if pd.notna(filter_condition):
352
- where_clause.append(filter_condition)
353
-
354
- select_query = ',\n\t'.join(select_clause)
355
- sql_query = f"CREATE TABLE {temp_table_schema}.{base_table}_BASE\nAS \n\tSELECT \n\t{select_query} \nFROM\n\t{incr_table_dict[base_table]} {base_table}"
356
- if where_clause:
357
- sql_query += f"\nWHERE {' AND'.join(where_clause)}"
358
- sql_query += ';'
359
- proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{base_table}_BASE;")
360
- proc_query.append(sql_query)
361
-
362
- df['Clauses'].fillna('', inplace = True)
363
- df['Ordering'].fillna('', inplace = True)
364
- c = 1
365
- temp_base_table = f'{base_table}_BASE'
366
- grp_cols = ['Join Condition', 'Clauses', 'Ordering']
367
- join_grps = df[df['Direct/Derived'] == 'DERIVED'].groupby(['Join Condition', 'Clauses', 'Ordering'])
368
- temp_tables_sql = []
369
- for (join_condition,clauses,ordering), group in join_grps:
370
- if pd.notna(group['Direct/Derived'].iloc[0]):
371
- if group['Direct/Derived'].iloc[0].upper() == 'DERIVED':
372
- join_tables = [t.strip() for t in group['Join Tables'].iloc[0].split(',')]
373
- join_keys = [t.strip() for t in group['Joining Keys'].iloc[0].split(',')]
374
- join_type = [t.strip() for t in group['Join Type'].iloc[0].split(',')]
375
- join_condition = split_join_condition(group['Join Condition'].iloc[0])
376
- temp_table_name = f"TEMP_{group['Source Table'].iloc[0]}"
377
- source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')]
378
- alias = [t.strip() for t in (','.join(group['Alias'])).split(',')]
379
- source_table = [t.strip() for t in (','.join(group['Source Table'])).split(',')]
380
-
381
- base_cols = []
382
- for join_key in join_keys:
383
- base_cols.append(f'{join_tables[0]}.{join_key}')
384
-
385
- for s_table,col,alias in zip(source_table,source_column,alias):
386
- if pd.notna(group['Column Operations'].iloc[0]):
387
- if len(group['Column Operations'].iloc[0].split()) == 1:
388
- select_expr = f"{group['Column Operations'].iloc[0].upper()}({s_table}.{col})"
389
- else:
390
- select_expr = group['Column Operations'].iloc[0]
391
- else:
392
- if pd.notna(s_table):
393
- select_expr = f"{s_table}.{col}"
394
- else:
395
- select_expr = col
396
-
397
- if alias:
398
- select_expr += f" AS {alias}"
399
- base_cols.append(select_expr)
400
-
401
- if ordering:
402
- base_cols.append(f"{ordering} AS RN")
403
-
404
- sql = ',\n\t\t'.join(base_cols)
405
-
406
- join_sql = f"SELECT \n\t\t{sql} \nFROM\n\t{incr_table_dict[base_table]} {join_tables[0]}"
407
- for i in range(len(join_type)):
408
- join_sql += f'\n\t{join_type[i]} JOIN {incr_table_dict[join_tables[i+1]]} {join_tables[i+1]} ON {join_condition[i]}'
409
- if clauses:
410
- join_sql += f'\n\t{clauses}'
411
- join_sql += ';'
412
-
413
- proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{temp_table_name};")
414
- proc_query.append(f"CREATE TABLE {temp_table_schema}.{temp_table_name}\nAS \n\t{join_sql}")
415
-
416
- granularity = [t.strip() for t in group['Granularity'].iloc[0].split(',')]
417
-
418
- sql = []
419
- for key in join_keys:
420
- sql.append(f"A.{key} = B.{key}")
421
-
422
- temp_cols = []
423
- temp_cols.append('A.*')
424
-
425
- source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')]
426
- alias = [t.strip() for t in (','.join(group['Alias'])).split(',')]
427
-
428
- for col,alias in zip(source_column,alias):
429
- select_expr = f"B.{col}"
430
- if alias:
431
- select_expr = f"B.{alias}"
432
- else:
433
- select_expr = f"B.{col}"
434
- temp_cols.append(select_expr)
435
-
436
- temp_select_query = ',\n\t\t'.join(temp_cols)
437
-
438
- proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.TEMP_{temp_table}_{c};")
439
-
440
- base_sql = f"CREATE TABLE {temp_table_schema}.TEMP_{temp_table}_{c}\nAS \n\tSELECT \n\t\t{temp_select_query} \nFROM\n\t{temp_table_schema}.{temp_base_table} AS A"
441
- base_sql += f"\n\tLEFT OUTER JOIN {temp_table_schema}.{temp_table_name} B ON {' AND '.join(sql)}"
442
-
443
- if '1:1' in granularity and len(ordering) > 1:
444
- base_sql += f" AND B.RN = 1"
445
- base_sql += ';'
446
-
447
- temp_base_table = f'TEMP_{temp_table}_{c}'
448
- c += 1
449
- proc_query.append(base_sql)
450
-
451
- fin_table_name = temp_table
452
- fin_table_cols = []
453
-
454
- for _,row in df.iterrows():
455
- field_name = row['Field_Name']
456
- source_table = row['Source Table']
457
- source_column = row['Source Column']
458
- alias = row['Alias']
459
-
460
- if pd.notna(row['Direct/Derived']):
461
- if (source_column in join_fields):
462
- fin_table_cols.append(f'{source_column} AS "{field_name}"')
463
- else:
464
- fin_table_cols.append(f'"{field_name}"')
465
-
466
- fin_table_cols = ',\n\t\t'.join(fin_table_cols)
467
- fin_sql = f"INSERT INTO {temp_table_schema}.{fin_table_name}\n\tSELECT \n\t\t{fin_table_cols} \nFROM\n\t{temp_table_schema}.TEMP_{temp_table}_{c-1};"
468
-
469
-
470
- condition_col = '_'.join(incr_cols.replace(f'{base_table}.', '').split(','))
471
- proc_query.append(f"DELETE FROM {temp_table_schema}.{fin_table_name}\nWHERE {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} IN (SELECT {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} FROM {temp_table_schema}.INCR1_{temp_table});")
472
- proc_query.append(fin_sql)
473
-
474
- for table in incr_tables:
475
  sql = f"""
476
- INSERT INTO audit.reportingdb_audit_tbl_{temp_table}
477
- (
478
  SELECT
479
- '{temp_table}' as mart_table_name,
480
- '{table}' as src_table_name,
481
- coalesce( max(TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS')),(select max(max_update_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) max_update_date,
482
- CURRENT_TIMESTAMP as load_timestamp,
483
- coalesce(max(prev_updt_ts),(select max(source_reference_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) AS source_reference_date,
484
- max(nvl(batch_number,0))+1
485
- FROM {temp_table_schema}.{temp_table}_INCR where table_name = '{table}'
486
- );"""
487
  proc_query.append(sql)
488
 
489
- return base_table, base_pk, proc_query, incr_join_grps, incr_table_join_info, incr_join, temp_table_schema
490
-
491
- def create_df(query, table_df_mapping, table_usage_count):
492
- script = []
493
- query = ' '.join(query.split()).strip()
494
- match = re.match(r'CREATE TABLE (\w+\.\w+\.\w+) AS (SELECT .+)', query, re.IGNORECASE)
495
- source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE)
496
- source_tables = [table for pair in source_tables for table in pair if table]
497
-
498
- if not match:
499
- raise ValueError('Invalid SQL')
500
- table_name = match.group(1).split('.')[2]
501
- select_statement = match.group(2)
502
- create_script = f'{table_name} = spark.sql(""" {select_statement} """)'
503
- persist_script = f'{table_name} = {table_name}.persist()'
504
- view_script = f'{table_name}.createOrReplaceTempView("{table_name}")'
505
-
506
- for table in source_tables:
507
- create_script = create_script.replace(table, table_df_mapping[table])
508
-
509
- script.append(f"\n\t\t######################---------Creating table {create_script.split('=')[0].strip()}-------############################")
510
- script.append(create_script)
511
- script.append(persist_script)
512
- script.append(view_script)
513
- script.append(f'''print("{create_script.split('=')[0].strip()} count: ", {create_script.split('=')[0].strip()}.count()''')
514
-
515
- if 'INCR2_' in table_name:
516
- x = table_name.split('INCR2_')[1]
517
- if x in table_details_mapping.keys():
518
- script.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################")
519
- script.append(f"{x}_max_update_date = INCR2_{x}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]")
520
- script.append(f"{x}_max_source_reference_date = INCR2_{x}.agg(max(to_timestamp('{table_details_mapping[x][1].replace(x+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]")
521
- script.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{x}',{x}_max_update_date,{x}_max_source_reference_date, max_batch_id, config)")
522
- script.append('\n')
523
-
524
- for table in source_tables:
525
- table_usage_count[table.split('.')[2]] -= 1
526
 
527
- for table in source_tables:
528
- if table_usage_count[table.split('.')[2]] == 0 and 'INCR1_' not in table:
529
- unpersist_script = f"{table.split('.')[2]}.unpersist()"
530
- script.append(unpersist_script)
531
-
532
- return '\n\t\t'.join(script)
 
 
 
 
 
533
 
534
- def generate_spark(proc_query, incr_join_grps, base_table, base_pk, incr_table_join_info, incr_join, temp_table_schema):
535
- table_usage_count = defaultdict(int)
536
- table_df_mapping = {}
537
-
538
- for query in proc_query:
539
- if 'CREATE TABLE' or 'DELETE' in query:
540
- source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE)
541
- source_tables = [table for pair in source_tables for table in pair if table]
542
- for table in source_tables:
543
- table_usage_count[table.split('.')[2]] += 1
544
- if 'DELETE' not in query:
545
- table_df_mapping[table] = table.split('.')[2]
546
-
547
- script = []
548
- for query in proc_query:
549
- if 'CREATE TABLE' in query:
550
- script.append(create_df(query, table_df_mapping,table_usage_count))
551
-
552
- spark_query = []
553
- spark_query.append("\t\t######################---------Reading source data -------############################")
554
- for table in table_details_mapping.keys():
555
- spark_query.append(f'{table} = read_file(spark, config, \"{table}\").filter("{table_details_mapping[table][2]}")')
556
- spark_query.append(f'{table} = {table}.persist()')
557
- spark_query.append(f'{table}.createOrReplaceTempView("{table}")')
558
- spark_query.append(f'print("{table} count: ", {table}.count()')
559
- spark_query.append('\n')
560
-
561
- spark_query.append("\n\t\t######################---------Reading records-------############################")
562
- for table in table_details_mapping.keys():
563
- spark_query.append(f"{table}_max_update_date = read_max_update_date(redshift_conn, config['application_name'],'{table}', config)")
564
- spark_query.append(f'{table}_max_update_date = {table}_max_update_date[0][0]')
565
- spark_query.append('\n')
566
-
567
- incr1_spark = []
568
- temp_incr1 = []
569
- for _, group in incr_join_grps:
570
- for table in _.split():
571
- if base_table != table:
572
- join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')]
573
- join_keys = [t.strip() for t in ','.join(base_pk).split(',')]
574
- join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')]
575
- join_cond = split_join_condition(incr_table_join_info[table])
576
- join_condition = join_incr(join_cond)
577
- source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
578
 
579
- join_key_list = []
580
- for x in join_keys:
581
- join_key_list.append(f'{base_table}.{x}')
582
- join_key = ', '.join(join_key_list)
 
 
 
 
 
 
 
583
 
584
- for y in source_table:
585
- sql = f"""SELECT {join_key} FROM {base_table} {base_table}"""
586
-
587
- incr_join_text = ''
588
- i=0
589
- for i in range(len(join_condition)):
590
- sql += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
591
- incr_join_text += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
592
-
593
- sql += f''' WHERE {join_tables[i+1]}._hoodie_commit_time > cast('"""+str({join_tables[i+1]}_max_update_date)+"""' as timestamp)'''
594
- temp_incr1.append(sql)
595
-
596
- else:
597
- source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
598
- join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
599
 
600
- join_key_list = []
601
- for x in join_keys:
602
- join_key_list.append(f'{base_table}.{x}')
603
- join_key = ', '.join(join_key_list)
604
-
605
- sql = f'''SELECT {join_key} FROM {base_table} {base_table} WHERE {base_table}._hoodie_commit_time > cast('"""+str({base_table}_max_update_date)+"""' as timestamp)'''
606
- incr1_spark.append(sql)
607
- for i in temp_incr1:
608
- incr1_spark.append(i)
609
- incr1_spark = '\nUNION\n'.join(incr1_spark)
610
- spark_query.append("\n\t\t######################---------Creating INCR1-------############################")
611
- spark_query.append(f'INCR1_{temp_table} = spark.sql(""" {incr1_spark} """)')
612
- spark_query.append(f'\n\t\tINCR1_{temp_table} = INCR1_{temp_table}.dropDuplicates()')
613
- spark_query.append(f'INCR1_{temp_table} = INCR1_{temp_table}.persist()')
614
- spark_query.append(f'INCR1_{temp_table}.createOrReplaceTempView("INCR1_{temp_table}")')
615
- spark_query.append(f'print("INCR1_{temp_table} count: ", INCR1_{temp_table}.count())')
616
-
617
- spark_query.append("\n\t\t######################---------Creating INCR2-------############################")
618
- for table in table_details_mapping.keys():
619
- if table in incr_join.keys():
620
- p = [x for x in incr_join[table].split('\n\t') if len(x) > 1]
621
- if len(p) > 1:
622
- spark_query.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################")
623
- spark_query.append(f"{table}_max_update_date = {table}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]")
624
- spark_query.append(f"{table}_max_source_reference_date = {table}.agg(max(to_timestamp('{table_details_mapping[table][1].replace(table+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]")
625
- spark_query.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{table}',{table}_max_update_date,{table}_max_source_reference_date, max_batch_id, config)")
626
- spark_query.append('\n')
627
-
628
- for query in script:
629
- spark_query.append(query)
630
- spark_query.append('\n')
631
-
632
- spark_query1 = []
633
- spark_query1.append('\n')
634
- for query in proc_query:
635
- if f'{temp_table_schema}.{temp_table}\n' in query:
636
- final_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE)
637
- final_tables = [table.split('.')[2].strip() for pair in final_tables for table in pair if table and table.split('.')[2].strip() != temp_table][0]
638
- if 'INCR1_' in final_tables:
639
- spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['incr2df_path'])")
640
- else:
641
- spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['resultdf_path'])")
642
- spark_query1.append(f'''cur.execute(""" {query} """)''')
643
- spark_query1.append('\n')
644
-
645
- with open('template.txt') as file:
646
- template = file.read()
647
-
648
- result = template.replace('INSERT_CODE_1', '\n\t\t'.join(spark_query))
649
- result = result.replace('INSERT_CODE_2', '\t\t'.join(spark_query1))
 
 
 
650
 
651
- return result
 
652
 
 
 
 
 
 
653
 
 
 
 
 
 
 
 
 
 
 
 
 
 
654
 
655
- # st.set_page_config(page_title='AUTOMATED SOURCE TO TARGET MAPPING', layout= 'wide')
656
- # st.markdown("""
657
- # <style>
 
 
 
 
 
 
 
 
 
 
 
658
 
659
- # /* Remove blank space at top and bottom */
660
- # .block-container {
661
- # padding-top: 1.9rem;
662
- # padding-bottom: 1rem;
663
- # }
664
-
665
- # /* Remove blank space at the center canvas */
666
- # .st-emotion-cache-z5fcl4 {
667
- # position: relative;
668
- # top: -62px;
669
- # }
670
-
671
- # /* Make the toolbar transparent and the content below it clickable */
672
- # .st-emotion-cache-18ni7ap {
673
- # pointer-events: none;
674
- # background: rgb(255 255 255 / 0%)
675
- # }
676
- # .st-emotion-cache-zq5wmm {
677
- # pointer-events: auto;
678
- # background: rgb(255 255 255);
679
- # border-radius: 5px;
680
- # }
681
- # </style>
682
- # """, unsafe_allow_html=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
683
  st.subheader('AUTOMATED SOURCE TO TARGET MAPPING')
684
  mode= st.selectbox('Select Mode of Mapping',('Supervised Mapping(You Have Sufficient Sample Data in Target Template)', 'Unsupervised Mapping(You Do Not Have Sufficient Sample Data in Target Template)'), index=None,placeholder='Select category of table')
685
  if mode == 'Supervised Mapping(You Have Sufficient Sample Data in Target Template)':
 
54
  }
55
  </style>
56
  """, unsafe_allow_html=True)
57
+
58
+ def read_excel(path, sheet):
59
+ df = pd.read_excel(path, sheet_name = sheet, dtype = 'str')
60
+ return df
61
+
62
+ def split_join_condition(join_condition):
63
+ conditions = []
64
+ condition = ''
65
+ bracket_count = 0
66
+
67
+ for char in join_condition:
68
+ if char == '(':
69
+ bracket_count += 1
70
+ elif char == ')':
71
+ bracket_count -+ 1
72
+ if char == ',' and bracket_count == 0:
 
 
 
 
 
 
 
 
 
73
  conditions.append(condition.strip())
74
+ condition = ''
75
+ else:
76
+ condition += char
77
+ if condition:
78
+ conditions.append(condition.strip())
79
+
80
+ return conditions
81
+
82
+ def join_incr(join_conditions):
83
+ join = []
84
+ join_pattern = re.compile(r'(\w+\.\w+)\s*=\s*(\w+\w.\w+)', re.IGNORECASE)
85
+ for join_condition in join_conditions:
86
+ parts = re.split(r'\sAND\s|\sOR\s', join_condition, flags = re.IGNORECASE)
87
+ temp = [x.strip() for x in parts if join_pattern.match(x.strip())]
88
+ join.append(' AND '.join(temp))
89
+ return join
90
+
91
+ def generate_sql(temp_table):
92
+ proc_query = []
93
+ base_table = None
94
 
95
+ source_table_schema = 'MAIN.GOLD'
96
+ temp_table_schema = 'MAIN.GOLD'
97
+ base_pk = []
98
 
99
+ join_fields = set()
 
 
 
 
 
 
 
100
 
101
+ for _,row in df.iterrows():
102
+ source_table = row['Source Table']
103
+ primary_key = row['Primary Key']
104
+ source_column = row['Source Column']
105
+ alias = row['Alias']
106
+ joining_keys = row['Joining Keys']
107
+
108
+ if not base_table:
109
+ if primary_key == 'Y':
110
+ base_table = source_table
111
+ base_pk.append(joining_keys)
112
+
113
+ if pd.notna(joining_keys):
114
+ keys = [x.strip() for x in joining_keys.split(',')]
115
+ for x in keys:
116
+ if x not in join_fields:
117
+ join_fields.add(x)
118
+
119
+ unique_cols = ['Source Table', 'Joining Keys', 'Primary Key', 'Join Type','Join Tables','Join Condition']
120
+ unique_df = df.drop_duplicates(subset = unique_cols)
121
+
122
+ incremantal_mapping = {}
123
+ incr_joins = {}
124
+
125
+ for _,row in unique_df.iterrows():
126
+
127
+ source_table = row['Source Table']
128
+ source_column = row['Source Column']
129
+ joining_keys = row['Joining Keys']
130
+ primary_key = row['Primary Key']
131
+ direct_derived = row['Direct/Derived']
132
+ join_type = row['Join Type']
133
+ join_tables = row['Join Tables']
134
+ join_condition = row['Join Condition']
135
+
136
+ if source_table == base_table:
137
+ if primary_key == 'Y':
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
  key = (source_table, joining_keys, join_type, join_tables, join_condition)
139
  key1 = source_table
140
+ else:
141
+ continue
142
+ else:
143
+ key = (source_table, joining_keys, join_type, join_tables, join_condition)
144
+ key1 = source_table
145
+ if pd.notna(direct_derived) and pd.notna(source_table) and pd.notna(source_column):
146
+ if key not in incremantal_mapping:
147
+ incremantal_mapping[key] = {
148
+ 'source_table': source_table,
149
+ 'joining_keys':joining_keys,
150
+ 'join_type': join_type,
151
+ 'join_tables': join_tables,
152
+ 'join_condition': join_condition
153
+ }
154
+ if key1 not in incr_joins:
155
+ if pd.notna(direct_derived) and direct_derived == 'DERIVED':
156
+ incr_joins[key1] = {
157
  'join_type': join_type,
158
+ 'join_tables': ', '.join([x.strip() for x in join_tables.split(',') if x != base_table]),
159
  'join_condition': join_condition
160
  }
161
+ incremental_df = pd.DataFrame(incremantal_mapping.values())
162
+ incr_join_grps = incremental_df.groupby(['source_table'])
163
+ proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.{temp_table}_INCR;')
164
+
165
+ incr_table_join_info = {}
166
+ for _,row in incremental_df.iterrows():
167
+ source_table = row['source_table']
168
+
169
+ if source_table != base_table:
170
+ joining_keys = row['joining_keys']
171
+ join_type = row['join_type']
172
+ join_tables = [x.strip() for x in row['join_tables'].split(',')]
173
+ index = join_tables.index(source_table)
174
+ join_condition = [x.strip() for x in row['join_condition'].split(',')][0:index]
175
+ incr_table_join_info[source_table] = ', '.join(join_condition)
176
+
177
+ incr_query = []
178
+ incr_cols = ''
179
+ incr_tables = []
180
+ incr_join = {}
181
+
182
+ for _, group in incr_join_grps:
 
 
 
 
 
183
 
184
+ for table in _.split():
185
+ if base_table != table:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
 
187
+ join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')]
188
+ join_keys = [t.strip() for t in ','.join(base_pk).split(',')]
189
+ join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')]
190
+ join_cond = split_join_condition(incr_table_join_info[table])
191
+ join_condition = join_incr(join_cond)
192
+ source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
193
+
194
+ join_key_list = []
195
+ for x in join_keys:
196
+ join_key_list.append(f'{base_table}.{x}')
197
+ join_key = ', '.join(join_key_list)
198
+
199
+ for y in source_table:
200
  sql = f"""
201
+ INSERT INTO {temp_table_schema}.{temp_table}_INCR
202
+ (
203
+ SELECT {join_key}, {table_details_mapping[y][0]}, {table_details_mapping[y][1]}, '{y}', 1, CURRENT_TIMESTAMP
204
+ FROM {source_table_schema}.{base_table} {base_table}"""
205
+
206
+ incr_join_text = ''
207
+ for i in range(len(join_condition)):
208
+ sql += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
209
+ incr_join_text += f'\n\t{join_type[i]} JOIN {source_table_schema}.{join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
210
+ incr_join[y] = incr_join_text
211
+
212
+ sql += f"""
213
+ WHERE COALESCE({join_tables[i+1]}.operation,'NA') <> 'D'
214
+ AND TO_TIMESTAMP( CAST(SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),1,4) || '-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),5,2) ||'-' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),7,2) || ' ' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),9,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),11,2) ||':' || SUBSTRING(({join_tables[i+1]}._hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{y}')
215
+ );"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216
 
217
+ incr_query.append(sql)
218
+ incr_tables.append(y)
219
+
220
+ else:
221
+ source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
222
+ join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')]
223
+
224
+ join_key_list = []
225
+ for x in join_keys:
226
+ join_key_list.append(f'{base_table}.{x}')
227
+ join_key = ', '.join(join_key_list)
228
 
229
+ incr_cols = join_key
 
 
 
 
230
  sql = f"""
231
+ INSERT INTO {temp_table_schema}.{temp_table}_INCR
232
+ (
233
+ SELECT {join_key}, {table_details_mapping[base_table][0]}, {table_details_mapping[base_table][1]}, '{base_table}', 1, CURRENT_TIMESTAMP
234
+ FROM {source_table_schema}.{base_table} {base_table}
235
+ WHERE COALESCE(operation,'NA') <> 'D'
236
+ AND TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS') > (SELECT MAX(max_update_date) FROM audit.reportingdb_audit_tbl_{temp_table} WHERE mart_table_name='{temp_table}' and src_table_name='{base_table}')
237
+ );"""
 
 
238
  proc_query.append(sql)
239
+ incr_tables.append(base_table)
240
+
241
+ proc_query.append('\n'.join(incr_query))
242
+ proc_query.append(f'TRUNCATE TABLE {temp_table_schema}.INCR1_{temp_table};')
243
+
244
+ sql = f"""
245
+ INSERT INTO {temp_table_schema}.INCR1_{temp_table}
246
+ (
247
+ SELECT DISTINCT {incr_cols.replace(f'{base_table}.', '')}
248
+ FROM {temp_table_schema}.{temp_table}_INCR
249
+ );"""
250
+
251
+ proc_query.append(sql)
252
+
253
+ incr_table_dict = {}
254
+ for table in incr_tables:
255
+ if table == base_table:
256
+ incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}'
257
+ else:
258
+ p = [x for x in incr_join[table].split('\n\t') if len(x) > 1]
259
+ if len(p) == 1:
260
+ incr_table_dict[table] = f'{temp_table_schema}.INCR2_{table}'
261
  else:
262
+ incr_table_dict[table] = f'{source_table_schema}.{table}'
263
+
264
+ s = []
265
+ for table in incr_tables:
266
+ incr2_sql_list = []
267
+
268
+ if table == base_table:
269
+ for key in incr_cols.replace(f'{base_table}.', '').split(','):
270
+ incr2_sql_list.append(f"{base_table}.{key} = A.{key}")
271
+ incr2_sql_join = ' AND '.join(incr2_sql_list)
272
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
273
  sql = f"""
274
+ CREATE TABLE {temp_table_schema}.INCR2_{table}
275
+ AS
276
  SELECT
277
+ {table}.*
278
+ FROM
279
+ {source_table_schema}.{table} {table}
280
+ INNER JOIN
281
+ {temp_table_schema}.INCR1_{temp_table} A ON {incr2_sql_join}; """
282
+ proc_query.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};')
 
 
283
  proc_query.append(sql)
284
 
285
+ else:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
286
 
287
+ p = [x for x in incr_join[table].split('\n\t') if len(x) > 1]
288
+ if len(p) == 1:
289
+ sql = f"""
290
+ CREATE TABLE {temp_table_schema}.INCR2_{table}
291
+ AS
292
+ SELECT
293
+ {table}.*
294
+ FROM
295
+ {temp_table_schema}.INCR2_{base_table} {base_table} {incr_join[table]};"""
296
+ s.append(f'DROP TABLE IF EXISTS {temp_table_schema}.INCR2_{table};')
297
+ s.append(sql)
298
 
299
+ for x in s:
300
+ proc_query.append(x)
301
+
302
+ select_clause = []
303
+ from_clause = []
304
+ where_clause = []
305
+
306
+ for _,row in df.iterrows():
307
+ field_name = row['Field_Name']
308
+ source_table = row['Source Table']
309
+ source_column = row['Source Column']
310
+ joining_keys = row['Joining Keys']
311
+ primary_key = row['Primary Key']
312
+ direct_derived = row['Direct/Derived']
313
+ join_type = row['Join Type']
314
+ join_tables = row['Join Tables']
315
+ join_condition = row['Join Condition']
316
+ column_operation = row['Column Operations']
317
+ alias = row['Alias']
318
+ granularity = row['Granularity']
319
+ filter_condition = row['Filter Condition']
320
+ clauses = row['Clauses']
321
+ ordering = row['Ordering']
322
+
323
+ if pd.notna(direct_derived):
324
+ if pd.notna(column_operation):
325
+ if len(column_operation.split()) == 1:
326
+ select_expr = f'{column_operation.upper()}({source_table}.{source_column})'
327
+ else:
328
+ select_expr = column_operation
329
+ else:
330
+ if pd.notna(source_table):
331
+ select_expr = f'{source_table}.{source_column}'
332
+ else:
333
+ select_expr = source_column
 
 
 
 
 
 
 
 
 
334
 
335
+ if source_column not in join_fields:
336
+ if pd.notna(alias):
337
+ select_expr += f' AS {alias}'
338
+ else:
339
+ if pd.notna(column_operation) and pd.notna(source_column):
340
+ select_expr += f' AS {source_column}'
341
+
342
+ if direct_derived.upper() == 'DIRECT':
343
+ select_clause.append(select_expr)
344
+ elif direct_derived.upper() == 'DERIVED_BASE':
345
+ select_clause.append(select_expr)
346
 
347
+ if pd.notna(filter_condition):
348
+ where_clause.append(filter_condition)
349
+
350
+ select_query = ',\n\t'.join(select_clause)
351
+ sql_query = f"CREATE TABLE {temp_table_schema}.{base_table}_BASE\nAS \n\tSELECT \n\t{select_query} \nFROM\n\t{incr_table_dict[base_table]} {base_table}"
352
+ if where_clause:
353
+ sql_query += f"\nWHERE {' AND'.join(where_clause)}"
354
+ sql_query += ';'
355
+ proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{base_table}_BASE;")
356
+ proc_query.append(sql_query)
357
+
358
+ df['Clauses'].fillna('', inplace = True)
359
+ df['Ordering'].fillna('', inplace = True)
360
+ c = 1
361
+ temp_base_table = f'{base_table}_BASE'
362
+ grp_cols = ['Join Condition', 'Clauses', 'Ordering']
363
+ join_grps = df[df['Direct/Derived'] == 'DERIVED'].groupby(['Join Condition', 'Clauses', 'Ordering'])
364
+ temp_tables_sql = []
365
+ for (join_condition,clauses,ordering), group in join_grps:
366
+ if pd.notna(group['Direct/Derived'].iloc[0]):
367
+ if group['Direct/Derived'].iloc[0].upper() == 'DERIVED':
368
+ join_tables = [t.strip() for t in group['Join Tables'].iloc[0].split(',')]
369
+ join_keys = [t.strip() for t in group['Joining Keys'].iloc[0].split(',')]
370
+ join_type = [t.strip() for t in group['Join Type'].iloc[0].split(',')]
371
+ join_condition = split_join_condition(group['Join Condition'].iloc[0])
372
+ temp_table_name = f"TEMP_{group['Source Table'].iloc[0]}"
373
+ source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')]
374
+ alias = [t.strip() for t in (','.join(group['Alias'])).split(',')]
375
+ source_table = [t.strip() for t in (','.join(group['Source Table'])).split(',')]
376
+
377
+ base_cols = []
378
+ for join_key in join_keys:
379
+ base_cols.append(f'{join_tables[0]}.{join_key}')
380
+
381
+ for s_table,col,alias in zip(source_table,source_column,alias):
382
+ if pd.notna(group['Column Operations'].iloc[0]):
383
+ if len(group['Column Operations'].iloc[0].split()) == 1:
384
+ select_expr = f"{group['Column Operations'].iloc[0].upper()}({s_table}.{col})"
385
+ else:
386
+ select_expr = group['Column Operations'].iloc[0]
387
+ else:
388
+ if pd.notna(s_table):
389
+ select_expr = f"{s_table}.{col}"
390
+ else:
391
+ select_expr = col
392
 
393
+ if alias:
394
+ select_expr += f" AS {alias}"
395
+ base_cols.append(select_expr)
396
+
397
+ if ordering:
398
+ base_cols.append(f"{ordering} AS RN")
399
+
400
+ sql = ',\n\t\t'.join(base_cols)
401
+
402
+ join_sql = f"SELECT \n\t\t{sql} \nFROM\n\t{incr_table_dict[base_table]} {join_tables[0]}"
403
+ for i in range(len(join_type)):
404
+ join_sql += f'\n\t{join_type[i]} JOIN {incr_table_dict[join_tables[i+1]]} {join_tables[i+1]} ON {join_condition[i]}'
405
+ if clauses:
406
+ join_sql += f'\n\t{clauses}'
407
+ join_sql += ';'
408
+
409
+ proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.{temp_table_name};")
410
+ proc_query.append(f"CREATE TABLE {temp_table_schema}.{temp_table_name}\nAS \n\t{join_sql}")
411
+
412
+ granularity = [t.strip() for t in group['Granularity'].iloc[0].split(',')]
413
+
414
+ sql = []
415
+ for key in join_keys:
416
+ sql.append(f"A.{key} = B.{key}")
417
+
418
+ temp_cols = []
419
+ temp_cols.append('A.*')
420
+
421
+ source_column = [t.strip() for t in (','.join(group['Source Column'])).split(',')]
422
+ alias = [t.strip() for t in (','.join(group['Alias'])).split(',')]
423
+
424
+ for col,alias in zip(source_column,alias):
425
+ select_expr = f"B.{col}"
426
+ if alias:
427
+ select_expr = f"B.{alias}"
428
+ else:
429
+ select_expr = f"B.{col}"
430
+ temp_cols.append(select_expr)
431
+
432
+ temp_select_query = ',\n\t\t'.join(temp_cols)
433
+
434
+ proc_query.append(f"DROP TABLE IF EXISTS {temp_table_schema}.TEMP_{temp_table}_{c};")
435
+
436
+ base_sql = f"CREATE TABLE {temp_table_schema}.TEMP_{temp_table}_{c}\nAS \n\tSELECT \n\t\t{temp_select_query} \nFROM\n\t{temp_table_schema}.{temp_base_table} AS A"
437
+ base_sql += f"\n\tLEFT OUTER JOIN {temp_table_schema}.{temp_table_name} B ON {' AND '.join(sql)}"
438
+
439
+ if '1:1' in granularity and len(ordering) > 1:
440
+ base_sql += f" AND B.RN = 1"
441
+ base_sql += ';'
442
+
443
+ temp_base_table = f'TEMP_{temp_table}_{c}'
444
+ c += 1
445
+ proc_query.append(base_sql)
446
 
447
+ fin_table_name = temp_table
448
+ fin_table_cols = []
449
 
450
+ for _,row in df.iterrows():
451
+ field_name = row['Field_Name']
452
+ source_table = row['Source Table']
453
+ source_column = row['Source Column']
454
+ alias = row['Alias']
455
 
456
+ if pd.notna(row['Direct/Derived']):
457
+ if (source_column in join_fields):
458
+ fin_table_cols.append(f'{source_column} AS "{field_name}"')
459
+ else:
460
+ fin_table_cols.append(f'"{field_name}"')
461
+
462
+ fin_table_cols = ',\n\t\t'.join(fin_table_cols)
463
+ fin_sql = f"INSERT INTO {temp_table_schema}.{fin_table_name}\n\tSELECT \n\t\t{fin_table_cols} \nFROM\n\t{temp_table_schema}.TEMP_{temp_table}_{c-1};"
464
+
465
+
466
+ condition_col = '_'.join(incr_cols.replace(f'{base_table}.', '').split(','))
467
+ proc_query.append(f"DELETE FROM {temp_table_schema}.{fin_table_name}\nWHERE {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} IN (SELECT {'_'.join(incr_cols.replace(f'{base_table}.', '').split(','))} FROM {temp_table_schema}.INCR1_{temp_table});")
468
+ proc_query.append(fin_sql)
469
 
470
+ for table in incr_tables:
471
+ sql = f"""
472
+ INSERT INTO audit.reportingdb_audit_tbl_{temp_table}
473
+ (
474
+ SELECT
475
+ '{temp_table}' as mart_table_name,
476
+ '{table}' as src_table_name,
477
+ coalesce( max(TO_TIMESTAMP( CAST(SUBSTRING((_hoodie_commit_time),1,4) || '-' || SUBSTRING((_hoodie_commit_time),5,2) ||'-' || SUBSTRING((_hoodie_commit_time),7,2) || ' ' || SUBSTRING((_hoodie_commit_time),9,2) ||':' || SUBSTRING((_hoodie_commit_time),11,2) ||':' || SUBSTRING((_hoodie_commit_time),13,2) AS VARCHAR(30)), 'YYYY-MM-DD HH:MI:SS')),(select max(max_update_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) max_update_date,
478
+ CURRENT_TIMESTAMP as load_timestamp,
479
+ coalesce(max(prev_updt_ts),(select max(source_reference_date) from audit.reportingdb_audit_tbl_{temp_table} where Mart_Table_Name='{temp_table}' and Src_Table_Name= '{table}')) AS source_reference_date,
480
+ max(nvl(batch_number,0))+1
481
+ FROM {temp_table_schema}.{temp_table}_INCR where table_name = '{table}'
482
+ );"""
483
+ proc_query.append(sql)
484
 
485
+ return base_table, base_pk, proc_query, incr_join_grps, incr_table_join_info, incr_join, temp_table_schema
486
+
487
+ def create_df(query, table_df_mapping, table_usage_count):
488
+ script = []
489
+ query = ' '.join(query.split()).strip()
490
+ match = re.match(r'CREATE TABLE (\w+\.\w+\.\w+) AS (SELECT .+)', query, re.IGNORECASE)
491
+ source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE)
492
+ source_tables = [table for pair in source_tables for table in pair if table]
493
+
494
+ if not match:
495
+ raise ValueError('Invalid SQL')
496
+ table_name = match.group(1).split('.')[2]
497
+ select_statement = match.group(2)
498
+ create_script = f'{table_name} = spark.sql(""" {select_statement} """)'
499
+ persist_script = f'{table_name} = {table_name}.persist()'
500
+ view_script = f'{table_name}.createOrReplaceTempView("{table_name}")'
501
+
502
+ for table in source_tables:
503
+ create_script = create_script.replace(table, table_df_mapping[table])
504
+
505
+ script.append(f"\n\t\t######################---------Creating table {create_script.split('=')[0].strip()}-------############################")
506
+ script.append(create_script)
507
+ script.append(persist_script)
508
+ script.append(view_script)
509
+ script.append(f'''print("{create_script.split('=')[0].strip()} count: ", {create_script.split('=')[0].strip()}.count()''')
510
+
511
+ if 'INCR2_' in table_name:
512
+ x = table_name.split('INCR2_')[1]
513
+ if x in table_details_mapping.keys():
514
+ script.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################")
515
+ script.append(f"{x}_max_update_date = INCR2_{x}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]")
516
+ script.append(f"{x}_max_source_reference_date = INCR2_{x}.agg(max(to_timestamp('{table_details_mapping[x][1].replace(x+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]")
517
+ script.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{x}',{x}_max_update_date,{x}_max_source_reference_date, max_batch_id, config)")
518
+ script.append('\n')
519
+
520
+ for table in source_tables:
521
+ table_usage_count[table.split('.')[2]] -= 1
522
+
523
+ for table in source_tables:
524
+ if table_usage_count[table.split('.')[2]] == 0 and 'INCR1_' not in table:
525
+ unpersist_script = f"{table.split('.')[2]}.unpersist()"
526
+ script.append(unpersist_script)
527
+
528
+ return '\n\t\t'.join(script)
529
+
530
+ def generate_spark(proc_query, incr_join_grps, base_table, base_pk, incr_table_join_info, incr_join, temp_table_schema):
531
+ table_usage_count = defaultdict(int)
532
+ table_df_mapping = {}
533
+
534
+ for query in proc_query:
535
+ if 'CREATE TABLE' or 'DELETE' in query:
536
+ source_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE)
537
+ source_tables = [table for pair in source_tables for table in pair if table]
538
+ for table in source_tables:
539
+ table_usage_count[table.split('.')[2]] += 1
540
+ if 'DELETE' not in query:
541
+ table_df_mapping[table] = table.split('.')[2]
542
+
543
+ script = []
544
+ for query in proc_query:
545
+ if 'CREATE TABLE' in query:
546
+ script.append(create_df(query, table_df_mapping,table_usage_count))
547
+
548
+ spark_query = []
549
+ spark_query.append("\t\t######################---------Reading source data -------############################")
550
+ for table in table_details_mapping.keys():
551
+ spark_query.append(f'{table} = read_file(spark, config, \"{table}\").filter("{table_details_mapping[table][2]}")')
552
+ spark_query.append(f'{table} = {table}.persist()')
553
+ spark_query.append(f'{table}.createOrReplaceTempView("{table}")')
554
+ spark_query.append(f'print("{table} count: ", {table}.count()')
555
+ spark_query.append('\n')
556
+
557
+ spark_query.append("\n\t\t######################---------Reading records-------############################")
558
+ for table in table_details_mapping.keys():
559
+ spark_query.append(f"{table}_max_update_date = read_max_update_date(redshift_conn, config['application_name'],'{table}', config)")
560
+ spark_query.append(f'{table}_max_update_date = {table}_max_update_date[0][0]')
561
+ spark_query.append('\n')
562
+
563
+ incr1_spark = []
564
+ temp_incr1 = []
565
+ for _, group in incr_join_grps:
566
+ for table in _.split():
567
+ if base_table != table:
568
+ join_tables = [t.strip() for t in group['join_tables'].iloc[0].split(',')]
569
+ join_keys = [t.strip() for t in ','.join(base_pk).split(',')]
570
+ join_type = [t.strip() for t in group['join_type'].iloc[0].split(',')]
571
+ join_cond = split_join_condition(incr_table_join_info[table])
572
+ join_condition = join_incr(join_cond)
573
+ source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
574
+
575
+ join_key_list = []
576
+ for x in join_keys:
577
+ join_key_list.append(f'{base_table}.{x}')
578
+ join_key = ', '.join(join_key_list)
579
+
580
+ for y in source_table:
581
+ sql = f"""SELECT {join_key} FROM {base_table} {base_table}"""
582
+
583
+ incr_join_text = ''
584
+ i=0
585
+ for i in range(len(join_condition)):
586
+ sql += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
587
+ incr_join_text += f' {join_type[i]} JOIN {join_tables[i+1]} {join_tables[i+1]} ON {join_condition[i]}'
588
+
589
+ sql += f''' WHERE {join_tables[i+1]}._hoodie_commit_time > cast('"""+str({join_tables[i+1]}_max_update_date)+"""' as timestamp)'''
590
+ temp_incr1.append(sql)
591
+
592
+ else:
593
+ source_table = [t.strip() for t in group['source_table'].iloc[0].split(',')]
594
+ join_keys = [t.strip() for t in group['joining_keys'].iloc[0].split(',')]
595
+
596
+ join_key_list = []
597
+ for x in join_keys:
598
+ join_key_list.append(f'{base_table}.{x}')
599
+ join_key = ', '.join(join_key_list)
600
+
601
+ sql = f'''SELECT {join_key} FROM {base_table} {base_table} WHERE {base_table}._hoodie_commit_time > cast('"""+str({base_table}_max_update_date)+"""' as timestamp)'''
602
+ incr1_spark.append(sql)
603
+ for i in temp_incr1:
604
+ incr1_spark.append(i)
605
+ incr1_spark = '\nUNION\n'.join(incr1_spark)
606
+ spark_query.append("\n\t\t######################---------Creating INCR1-------############################")
607
+ spark_query.append(f'INCR1_{temp_table} = spark.sql(""" {incr1_spark} """)')
608
+ spark_query.append(f'\n\t\tINCR1_{temp_table} = INCR1_{temp_table}.dropDuplicates()')
609
+ spark_query.append(f'INCR1_{temp_table} = INCR1_{temp_table}.persist()')
610
+ spark_query.append(f'INCR1_{temp_table}.createOrReplaceTempView("INCR1_{temp_table}")')
611
+ spark_query.append(f'print("INCR1_{temp_table} count: ", INCR1_{temp_table}.count())')
612
+
613
+ spark_query.append("\n\t\t######################---------Creating INCR2-------############################")
614
+ for table in table_details_mapping.keys():
615
+ if table in incr_join.keys():
616
+ p = [x for x in incr_join[table].split('\n\t') if len(x) > 1]
617
+ if len(p) > 1:
618
+ spark_query.append(f"\n\t\t######################---------Updating the max_update_date in audit-------############################")
619
+ spark_query.append(f"{table}_max_update_date = {table}.agg({{'_hoodie_commit_time' : 'max'}}).first()[0]")
620
+ spark_query.append(f"{table}_max_source_reference_date = {table}.agg(max(to_timestamp('{table_details_mapping[table][1].replace(table+'.','')}','yyyy-MM-dd-HH.mm.ss.SSSSSS'))).first()[0]")
621
+ spark_query.append(f"insert_max_update_date(spark,redshift_conn, config['application_name'],'{table}',{table}_max_update_date,{table}_max_source_reference_date, max_batch_id, config)")
622
+ spark_query.append('\n')
623
+
624
+ for query in script:
625
+ spark_query.append(query)
626
+ spark_query.append('\n')
627
+
628
+ spark_query1 = []
629
+ spark_query1.append('\n')
630
+ for query in proc_query:
631
+ if f'{temp_table_schema}.{temp_table}\n' in query:
632
+ final_tables = re.findall(r'\bFROM\s+(\w+\.\w+\.\w+)|\bJOIN\s+(\w+\.\w+\.\w+)', query, re.IGNORECASE)
633
+ final_tables = [table.split('.')[2].strip() for pair in final_tables for table in pair if table and table.split('.')[2].strip() != temp_table][0]
634
+ if 'INCR1_' in final_tables:
635
+ spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['incr2df_path'])")
636
+ else:
637
+ spark_query.append(f"{final_tables}.write.mode('overwrite').parquet(config['resultdf_path'])")
638
+ spark_query1.append(f'''cur.execute(""" {query} """)''')
639
+ spark_query1.append('\n')
640
+
641
+ with open('template.txt') as file:
642
+ template = file.read()
643
+
644
+ result = template.replace('INSERT_CODE_1', '\n\t\t'.join(spark_query))
645
+ result = result.replace('INSERT_CODE_2', '\t\t'.join(spark_query1))
646
+
647
+ return result
648
+
649
+
650
+
651
+ # st.set_page_config(page_title='AUTOMATED SOURCE TO TARGET MAPPING', layout= 'wide')
652
+ # st.markdown("""
653
+ # <style>
654
+
655
+ # /* Remove blank space at top and bottom */
656
+ # .block-container {
657
+ # padding-top: 1.9rem;
658
+ # padding-bottom: 1rem;
659
+ # }
660
+
661
+ # /* Remove blank space at the center canvas */
662
+ # .st-emotion-cache-z5fcl4 {
663
+ # position: relative;
664
+ # top: -62px;
665
+ # }
666
+
667
+ # /* Make the toolbar transparent and the content below it clickable */
668
+ # .st-emotion-cache-18ni7ap {
669
+ # pointer-events: none;
670
+ # background: rgb(255 255 255 / 0%)
671
+ # }
672
+ # .st-emotion-cache-zq5wmm {
673
+ # pointer-events: auto;
674
+ # background: rgb(255 255 255);
675
+ # border-radius: 5px;
676
+ # }
677
+ # </style>
678
+ # """, unsafe_allow_html=True)
679
+
680
+ ######
681
+ def main():
682
+ # st.title('PAGE TITLE') # Change this for each page
683
+ sidebar()
684
+ ########
685
  st.subheader('AUTOMATED SOURCE TO TARGET MAPPING')
686
  mode= st.selectbox('Select Mode of Mapping',('Supervised Mapping(You Have Sufficient Sample Data in Target Template)', 'Unsupervised Mapping(You Do Not Have Sufficient Sample Data in Target Template)'), index=None,placeholder='Select category of table')
687
  if mode == 'Supervised Mapping(You Have Sufficient Sample Data in Target Template)':