Designing a Data Source Discovery App - Part 13: Summarizing Interview Data Continued...

by DL Keeshin


March 05, 2025


images/future_kds_discovery_erd_20250305.png
See larger

This is a continuation of my previous blog post where I began describing the design of the kDS Discovery App’s data analytics summarizing process.

Parsing Topic Summaries

The first_summary_parse.py is a Python script that parses useful insights from topic-level summaries stored in the stage.summary_by_topic table. Here's what the Python script does...

  • Connects to the Database: It grabs connection details from environment variables and runs queries to fetch and insert data.
  • 
    db_params = {
        "dbname": os.getenv("DBSURVEY"),
        "user": os.getenv("DBUSER"),
        "password": os.getenv("DBPASSWORD"),
        "host": os.getenv("DBHOST"),
        "port": os.getenv("DBPORT")
    }
  • Pulls Topic Summaries: Retrieves all topic-based summaries from the database, which contain structured insights from interview responses.
  • 
    def get_summaries(connection):
        """Fetch all rows from stage.summary_by_topic"""
        return execute_query(connection, """
            SELECT by_topic_id, control_id, topic_, summary_, summary_date, create_date
            FROM stage.summary_by_topic
            ORDER BY create_date;
        """)
    }
  • Finds Follow-Up Questions: Looks through each summary’s JSON structure to find follow-up questions and stores them in stage.followup_question_by_topic.
  • 
    def insert_followup_question(connection, by_topic_id, question, summary_date, create_date):
        """Insert a follow-up question into stage.followup_question_by_topic"""
        if not question:
            return
       
        # Convert the question to JSON format
        question_json = json.dumps({"follow_up_question": question})
        
        execute_query(connection, """
            INSERT INTO stage.followup_question_by_topic 
            (by_topic_id, syntax_, start_date, create_date, source_)
            VALUES (%s, %s, %s, %s, 'data parser script');
        """, (by_topic_id, question_json, summary_date, create_date), fetch=False)
        
        print(f"Inserted follow-up question for by_topic_id: {by_topic_id}")
    }
  • Tracks Data Flows: Logs data movement details in stage.data_flow_by_topic.
  • 
    def process_data_flows(connection, by_topic_id, analysis, create_date):
        """Process and insert data flows, handling both the new array format and legacy single fields"""
        
        # Check if we have the new data_flows array format
        if "data_flows" in analysis and isinstance(analysis["data_flows"], list):
            # Process multiple data flows
            for data_flow in analysis["data_flows"]:
                insert_data_flow(connection, by_topic_id, data_flow, create_date)
        else:
            # Handle legacy format with single source/destination fields
            source = analysis.get("noted_data_flow_source")
            destination = analysis.get("noted_data_flow_destination")
            
            if source or destination:
                legacy_data_flow = {
                    "source": source,
                    "destination": destination
                }
                insert_data_flow(connection, by_topic_id, legacy_data_flow, create_date)
    
    
    }
    
    def insert_data_flow(connection, by_topic_id, data_flow, create_date):
        """Insert a single data flow record into stage.data_flow_by_topic"""
        source = data_flow.get("source")
        destination = data_flow.get("destination")
        
        if not (source or destination):
            return
        
        # Convert source and destination to JSON
        source_json = json.dumps({"source": source}) if source else None
        destination_json = json.dumps({"destination": destination}) if destination else None
        
        execute_query(connection, """
            INSERT INTO stage.data_flow_by_topic 
            (by_topic_id, noted_data_flow_source, noted_data_flow_destination, create_date, source_)
            VALUES (%s, %s, %s, %s, 'data parser script');
        """, (by_topic_id, source_json, destination_json, create_date), fetch=False)
        
        print(f"Inserted data flow (source: {source}, destination: {destination}) for by_topic_id: {by_topic_id}"
    }
  • Logs Solutions in Use: Extracts mentions of key solutions and tools, saving them in stage.solution_in_use_by_topic.
  • 	
    def insert_solution_in_use(connection, by_topic_id, solution, create_date):
        """Insert a single solution in use record into stage.solution_in_use_by_topic"""
        if not solution:
            return
        
        # Convert solution to JSON
        solution_json = json.dumps({"solution": solution})
        
        execute_query(connection, """
            INSERT INTO stage.solution_in_use_by_topic 
            (by_topic_id, description_, create_date, source_)
            VALUES (%s, %s, %s, 'data parser script');
        """, (by_topic_id, solution_json, create_date), fetch=False)
        
        print(f"Inserted solution in use '{solution}' for by_topic_id: {by_topic_id}")
    					
    def process_solutions(connection, by_topic_id, analysis, create_date):
        """Process and insert solutions in use, handling both the new array format and legacy single field"""
        
        # Check if we have the new solutions_in_use array format
        if "solutions_in_use" in analysis and isinstance(analysis["solutions_in_use"], list):
            # Process multiple solutions
            for solution in analysis["solutions_in_use"]:
                insert_solution_in_use(connection, by_topic_id, solution, create_date)
        else:
            # Handle legacy format with single solution field
            solution = analysis.get("solution_in_use")
            if solution:
                insert_solution_in_use(connection, by_topic_id, solution, create_date)
    }

Creating Executive Summaries

Once the topic summaries are parsed and loaded to separate tables, generate_second_summary.py gathers everything together into an executive summary. Here's what it does,

  • Finds Unique Interviews: Grabs all distinct `control_id` values from stage.summary_by_topic.
  • 	
    def get_distinct_control_ids(connection):
        """Get all distinct control_ids from stage.summary_by_topic"""
        result = execute_query(connection, """
            SELECT DISTINCT control_id FROM stage.summary_by_topic ORDER BY control_id;
        """)
        return [row[0] for row in result] if result else []
    }
  • Gathers Topic Summaries: Pulls together all topic summaries linked to a specific `control_id`.
  • 	
    def get_topic_summaries_by_control_id(connection, control_id):
        """Get all topic summaries for a specific control_id"""
        result = execute_query(connection, """
            SELECT topic_, summary_ FROM stage.summary_by_topic
            WHERE control_id = %s ORDER BY topic_;
        """, (control_id,))
        return result if result else []
    }
  • Adds Business Context: Fetches extra details like industry, function, and role.
  • 	
    def get_business_info_by_control_id(connection, control_id):
        """Get business information (industry, function, role) for this control_id"""
        result = execute_query(connection, """
            SELECT DISTINCT a.industry_description, a.function_name, a.role_name 
            FROM interview.vw_answer_lookup a
            JOIN temp.summary_control sc ON a.interview_id = sc.interview_id
            WHERE sc.control_id = %s
            LIMIT 1;
        """, (control_id,))
        if result:
            return {
                "industry_description": result[0][0],
                "function_name": result[0][1],
                "role_name": result[0][2]
            }
        return None
    }
  • Generates an Executive Summary Prompt: To send to OpenAI’s API and get back a structured JSON summary.
  • 	
    def create_executive_summary_prompt(control_id, business_info, summaries):
        """Create prompt for generating an executive summary"""
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        # Format each topic summary for the prompt
        summary_texts = []
        for topic, summary_json in summaries:
            try:
                summary_data = json.loads(summary_json) if isinstance(summary_json, str) else summary_json
                if "analysis" in summary_data and len(summary_data["analysis"]) > 0:
                    analysis = summary_data["analysis"][0]
                    summary_text = analysis.get("summary", "No summary available")
                    summary_texts.append(f"Topic: {topic}\nSummary: {summary_text}")
            except (json.JSONDecodeError, KeyError) as e:
                print(f"Error parsing summary for topic '{topic}': {e}")
                summary_texts.append(f"Topic: {topic}\nSummary: Unable to parse summary data")
        
        all_summaries_text = "\n\n".join(summary_texts)
        
        # Create the prompt
        role_info = ""
        if business_info:
            role_info = f"from a {business_info['role_name']} in the {business_info['function_name']} of a {business_info['industry_description']} organization"
        
        return f"""
        Create an executive summary of the following interview topic summaries {role_info}. 
        Synthesize the key findings, identify common themes, important data flows, and notable solutions in use across all topics.
        Include the control_id {control_id} in the output. Respond in the following JSON format:
    
        {{
          "executive_summary": {{
            "control_id": "{control_id}",
            "summary": "",
            "summary_date": "{current_time}",
            "key_themes": [
              "",
              ""
            ],
            "major_data_flows": [
              {{
                "source": "",
                "destination": ""
              }},
              {{
                "source": "",
                "destination": ""
              }}
            ],
            "key_solutions": [
              "",
              ""
            ],
            "strategic_recommendations": [
              "",
              ""
            ]
          }}
        }}
    
        Here are the topic summaries:
        {all_summaries_text}
        """
    }
  • Saves the Summary: Stores the final executive summary in stage.summary_by_interview.
  • 	
    def insert_executive_summary(connection, control_id, summary_json):
        """Insert executive summary into stage.summary_by_interview table"""
        execute_query(connection, """
            INSERT INTO stage.summary_by_interivew (control_id, summary_, summary_date, create_date, source_)
            VALUES (%s, %s, NOW(), CURRENT_DATE, 'executive summary script');
        """, (control_id, json.dumps(summary_json)), fetch=False)
    }

Wrapping Up

These two scripts, along with the one described in my previous post, show how SQL, an LLM like ChatGPT, and JSON work together effectively to summarize data at both the topic and interview levels. As I mentioned in my previous post, we will move most of the pSQL queries described here into pSQL functions for data retrieval and stored procedures for data modification. This shift will make the code easier to maintain, improve performance, and ensure better separation between logic and database operations. Additionally, any inline JSON will be moved to a `jsonb` field in PostgreSQL to enhance query performance and flexibility.

Also, as mentioned previously, my company, kDS LLC, is actively seeking organizations interested in testing a beta version of the kDS Data Source Discovery App. If your organization is interested, please let us know. We’d love to collaborate. Tell us in the comments below.

As always, thanks for stopping by!

Leave a Comment: