by DL Keeshin
March 05, 2025
This is a continuation of my previous blog post where I began describing the design of the kDS Discovery App’s data analytics summarizing process.
The first_summary_parse.py is a Python script that parses useful insights from topic-level summaries stored in the stage.summary_by_topic table. Here's what the Python script does...
db_params = {
"dbname": os.getenv("DBSURVEY"),
"user": os.getenv("DBUSER"),
"password": os.getenv("DBPASSWORD"),
"host": os.getenv("DBHOST"),
"port": os.getenv("DBPORT")
}
def get_summaries(connection):
"""Fetch all rows from stage.summary_by_topic"""
return execute_query(connection, """
SELECT by_topic_id, control_id, topic_, summary_, summary_date, create_date
FROM stage.summary_by_topic
ORDER BY create_date;
""")
}
def insert_followup_question(connection, by_topic_id, question, summary_date, create_date):
"""Insert a follow-up question into stage.followup_question_by_topic"""
if not question:
return
# Convert the question to JSON format
question_json = json.dumps({"follow_up_question": question})
execute_query(connection, """
INSERT INTO stage.followup_question_by_topic
(by_topic_id, syntax_, start_date, create_date, source_)
VALUES (%s, %s, %s, %s, 'data parser script');
""", (by_topic_id, question_json, summary_date, create_date), fetch=False)
print(f"Inserted follow-up question for by_topic_id: {by_topic_id}")
}
def process_data_flows(connection, by_topic_id, analysis, create_date):
"""Process and insert data flows, handling both the new array format and legacy single fields"""
# Check if we have the new data_flows array format
if "data_flows" in analysis and isinstance(analysis["data_flows"], list):
# Process multiple data flows
for data_flow in analysis["data_flows"]:
insert_data_flow(connection, by_topic_id, data_flow, create_date)
else:
# Handle legacy format with single source/destination fields
source = analysis.get("noted_data_flow_source")
destination = analysis.get("noted_data_flow_destination")
if source or destination:
legacy_data_flow = {
"source": source,
"destination": destination
}
insert_data_flow(connection, by_topic_id, legacy_data_flow, create_date)
}
def insert_data_flow(connection, by_topic_id, data_flow, create_date):
"""Insert a single data flow record into stage.data_flow_by_topic"""
source = data_flow.get("source")
destination = data_flow.get("destination")
if not (source or destination):
return
# Convert source and destination to JSON
source_json = json.dumps({"source": source}) if source else None
destination_json = json.dumps({"destination": destination}) if destination else None
execute_query(connection, """
INSERT INTO stage.data_flow_by_topic
(by_topic_id, noted_data_flow_source, noted_data_flow_destination, create_date, source_)
VALUES (%s, %s, %s, %s, 'data parser script');
""", (by_topic_id, source_json, destination_json, create_date), fetch=False)
print(f"Inserted data flow (source: {source}, destination: {destination}) for by_topic_id: {by_topic_id}"
}
def insert_solution_in_use(connection, by_topic_id, solution, create_date):
"""Insert a single solution in use record into stage.solution_in_use_by_topic"""
if not solution:
return
# Convert solution to JSON
solution_json = json.dumps({"solution": solution})
execute_query(connection, """
INSERT INTO stage.solution_in_use_by_topic
(by_topic_id, description_, create_date, source_)
VALUES (%s, %s, %s, 'data parser script');
""", (by_topic_id, solution_json, create_date), fetch=False)
print(f"Inserted solution in use '{solution}' for by_topic_id: {by_topic_id}")
def process_solutions(connection, by_topic_id, analysis, create_date):
"""Process and insert solutions in use, handling both the new array format and legacy single field"""
# Check if we have the new solutions_in_use array format
if "solutions_in_use" in analysis and isinstance(analysis["solutions_in_use"], list):
# Process multiple solutions
for solution in analysis["solutions_in_use"]:
insert_solution_in_use(connection, by_topic_id, solution, create_date)
else:
# Handle legacy format with single solution field
solution = analysis.get("solution_in_use")
if solution:
insert_solution_in_use(connection, by_topic_id, solution, create_date)
}
Once the topic summaries are parsed and loaded to separate tables, generate_second_summary.py gathers everything together into an executive summary. Here's what it does,
def get_distinct_control_ids(connection):
"""Get all distinct control_ids from stage.summary_by_topic"""
result = execute_query(connection, """
SELECT DISTINCT control_id FROM stage.summary_by_topic ORDER BY control_id;
""")
return [row[0] for row in result] if result else []
}
def get_topic_summaries_by_control_id(connection, control_id):
"""Get all topic summaries for a specific control_id"""
result = execute_query(connection, """
SELECT topic_, summary_ FROM stage.summary_by_topic
WHERE control_id = %s ORDER BY topic_;
""", (control_id,))
return result if result else []
}
def get_business_info_by_control_id(connection, control_id):
"""Get business information (industry, function, role) for this control_id"""
result = execute_query(connection, """
SELECT DISTINCT a.industry_description, a.function_name, a.role_name
FROM interview.vw_answer_lookup a
JOIN temp.summary_control sc ON a.interview_id = sc.interview_id
WHERE sc.control_id = %s
LIMIT 1;
""", (control_id,))
if result:
return {
"industry_description": result[0][0],
"function_name": result[0][1],
"role_name": result[0][2]
}
return None
}
def create_executive_summary_prompt(control_id, business_info, summaries):
"""Create prompt for generating an executive summary"""
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Format each topic summary for the prompt
summary_texts = []
for topic, summary_json in summaries:
try:
summary_data = json.loads(summary_json) if isinstance(summary_json, str) else summary_json
if "analysis" in summary_data and len(summary_data["analysis"]) > 0:
analysis = summary_data["analysis"][0]
summary_text = analysis.get("summary", "No summary available")
summary_texts.append(f"Topic: {topic}\nSummary: {summary_text}")
except (json.JSONDecodeError, KeyError) as e:
print(f"Error parsing summary for topic '{topic}': {e}")
summary_texts.append(f"Topic: {topic}\nSummary: Unable to parse summary data")
all_summaries_text = "\n\n".join(summary_texts)
# Create the prompt
role_info = ""
if business_info:
role_info = f"from a {business_info['role_name']} in the {business_info['function_name']} of a {business_info['industry_description']} organization"
return f"""
Create an executive summary of the following interview topic summaries {role_info}.
Synthesize the key findings, identify common themes, important data flows, and notable solutions in use across all topics.
Include the control_id {control_id} in the output. Respond in the following JSON format:
{{
"executive_summary": {{
"control_id": "{control_id}",
"summary": "",
"summary_date": "{current_time}",
"key_themes": [
"",
""
],
"major_data_flows": [
{{
"source": "",
"destination": ""
}},
{{
"source": "",
"destination": ""
}}
],
"key_solutions": [
"",
""
],
"strategic_recommendations": [
"",
""
]
}}
}}
Here are the topic summaries:
{all_summaries_text}
"""
}
def insert_executive_summary(connection, control_id, summary_json):
"""Insert executive summary into stage.summary_by_interview table"""
execute_query(connection, """
INSERT INTO stage.summary_by_interivew (control_id, summary_, summary_date, create_date, source_)
VALUES (%s, %s, NOW(), CURRENT_DATE, 'executive summary script');
""", (control_id, json.dumps(summary_json)), fetch=False)
}
These two scripts, along with the one described in my previous post, show how SQL, an LLM like ChatGPT, and JSON work together effectively to summarize data at both the topic and interview levels. As I mentioned in my previous post, we will move most of the pSQL queries described here into pSQL functions for data retrieval and stored procedures for data modification. This shift will make the code easier to maintain, improve performance, and ensure better separation between logic and database operations. Additionally, any inline JSON will be moved to a `jsonb` field in PostgreSQL to enhance query performance and flexibility.
Also, as mentioned previously, my company, kDS LLC, is actively seeking organizations interested in testing a beta version of the kDS Data Source Discovery App. If your organization is interested, please let us know. We’d love to collaborate. Tell us in the comments below.
As always, thanks for stopping by!