How to Create an AI Application That Can Chat with Massive SQL Databases

Published on:

Introduction

You possibly can simply create a easy utility that may chat with SQL Database. However right here’s the issue with that. You possibly can’t make it work seamlessly in the case of dealing with and dealing with giant databases. If the database is big, it’s impractical to incorporate the whole checklist of columns and tables within the immediate context. This text explains how one can bypass this impediment by creating an AI utility that may chat with large SQL databases.

Creating an AI Utility to Chat with Large SQL Databases

The next code initiates a easy Streamlit utility that allows customers to hook up with an SQL database and chat with it.

import streamlit as st
import requests
import os
import pandas as pd
from uuid import uuid4
import psycopg2
from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI, AzureOpenAI
from langchain.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv

# Create mandatory folders
folders_to_create = ['csvs']
for folder_name in folders_to_create:
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
        print(f"Folder '{folder_name}' created.")
    else:
        print(f"Folder '{folder_name}' already exists.")

# Load the OpenAI API key
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
llm = OpenAI(openai_api_key=openai_api_key)
chat_llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)

def get_basic_table_details(cursor):
    question = """
    SELECT table_name, column_name, data_type
    FROM information_schema.columns
    WHERE table_name IN (
        SELECT tablename FROM pg_tables WHERE schemaname="public"
    );"""
    cursor.execute(question)
    return cursor.fetchall()

def save_db_details(db_uri):
    unique_id = str(uuid4()).change("-", "_")
    connection = psycopg2.join(db_uri)
    cursor = connection.cursor()
    tables_and_columns = get_basic_table_details(cursor)
    df = pd.DataFrame(tables_and_columns, columns=['table_name', 'column_name', 'data_type'])
    filename_t = f'csvs/tables_{unique_id}.csv'
    df.to_csv(filename_t, index=False)
    cursor.shut()
    connection.shut()
    return unique_id

def generate_template_for_sql(question, table_info, db_uri):
    template = ChatPromptTemplate.from_messages([
        SystemMessage(content=f"You are an assistant that can write SQL Queries. Given the text below, write a SQL query that answers the user's question. DB connection string is {db_uri} Here is a detailed description of the table(s): {table_info} Prepend and append the SQL query with three backticks '```'"),
        HumanMessagePromptTemplate.from_template("{text}")
    ])
    reply = chat_llm(template.format_messages(textual content=question))
    return reply.content material

def get_the_output_from_llm(question, unique_id, db_uri):
    filename_t = f'csvs/tables_{unique_id}.csv'
    df = pd.read_csv(filename_t)
    table_info = ''
    for desk in df['table_name'].distinctive():
        table_info += f'Details about desk {desk}:n'
        table_info += df[df['table_name'] == desk].to_string(index=False) + 'nn'
    return generate_template_for_sql(question, table_info, db_uri)

def execute_the_solution(answer, db_uri):
    connection = psycopg2.join(db_uri)
    cursor = connection.cursor()
    _, final_query, _ = answer.break up("```")
    cursor.execute(final_query.strip())
    outcome = cursor.fetchall()
    return str(outcome)

def connect_with_db(uri):
    st.session_state.db_uri = uri
    st.session_state.unique_id = save_db_details(uri)
    return {"message": "Database connection established!"}

def send_message(message):
    answer = get_the_output_from_llm(message, st.session_state.unique_id, st.session_state.db_uri)
    outcome = execute_the_solution(answer, st.session_state.db_uri)
    return {"message": answer + "nn" + "Outcome:n" + outcome}

# Streamlit interface setup
st.subheader("Directions")
st.markdown("1. Enter your RDS Database URI beneath.n2. ")

The basic technique for simplifying the immediate entails sending solely the related tables and column names that pertain to the consumer’s question. To realize this, we are able to generate embeddings for the desk and column names, dynamically retrieve essentially the most pertinent ones based mostly on the consumer’s enter, and embrace these within the immediate. On this article, we’ll make the most of ChromaDB as our vector database. Nonetheless, alternate options like Pinecone, Milvus, or any appropriate vector database can be utilized.

- Advertisement -

The way to Simplify the Immediate

Let’s start by putting in ChromaDB.

pip set up chromadb

First, we’ll arrange an extra folder named ‘vectors’ alongside the ‘csvs’ folder to retailer embeddings of desk and column names. This folder can even comprise different pertinent database particulars, such because the overseas keys that hyperlink completely different tables, and potential values for the WHERE clause.

def generate_embeddings(filename, storage_folder):
    csv_loader = CSVLoader(file_path=filename, encoding="utf8")
    dataset = csv_loader.load()
    vector_database = Chroma.from_documents(dataset, embedding=embeddings, persist_directory=storage_folder)
    vector_database.persist()

We may even first test whether or not the consumer’s question wants any details about tables or if the consumer is as an alternative asking about simply the overall schema of the database.

- Advertisement -
def check_user_intent_for_database_info_or_sql(question):
    # Outline a template for the dialog
    prompt_template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                "Based on the provided text, the user is asking a question about databases. "
                "Determine if the user seeks information about the database schema or if they want to write a SQL query. "
                "Respond with 'yes' if the user is seeking information about the database schema and 'no' if they intend to write a SQL query."
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    
    # Generate a response utilizing a language mannequin
    response = chat_llm(prompt_template.format_messages(textual content=question))
    print(response.content material)
    return response.content material

The consumer responds with both ‘sure’ or ‘no’. If the reply is ‘sure’, a immediate is generated.

def generate_sql_query_prompt(question, db_uri):
    # Configure the chat template with system and human messages
    prompt_template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                "As an assistant tasked with writing SQL queries, create a SQL query based on the text below. "
                "Enclose the SQL query within three backticks '```' for clarity. "
                "Aim to use 'SELECT' queries as much as possible. "
                f"The connection string for the database is {db_uri}."
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    
    # Generate and print the reply utilizing a language mannequin
    response = chat_llm.prompt_template.format_messages(textual content=question))
    print(response.content material)
    return response.content material

If the reply is ‘no’, it signifies that the consumer’s question particularly requires the names of tables and columns inside these tables. We are going to then determine essentially the most related tables & columns, and assemble a string from these to incorporate in our immediate.

See also  Navigating the Job Market: Tips for Prompt Engineering Job Seekers

Subsequent, we’ll confirm that our vectors have been efficiently created and that each one different elements are functioning appropriately. Beneath is the whole code up so far.

import streamlit as st
import requests
import os
import pandas as pd
from uuid import uuid4
import psycopg2

from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate

from langchain.llms import OpenAI, AzureOpenAI
from langchain.chat_models import ChatOpenAI, AzureChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
from langchain.vectorstores import Chroma
from langchain.document_loaders.csv_loader import CSVLoader

# Create mandatory folders for knowledge storage
folders_to_create = ['csvs', 'vectors']
for folder_name in folders_to_create:
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
        print(f"Folder '{folder_name}' created.")
    else:
        print(f"Folder '{folder_name}' already exists.")

# Load API key from atmosphere variable
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")

# Initialize language fashions and embeddings
llm = OpenAI(openai_api_key=openai_api_key)
chat_llm = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)

# Operate to retrieve fundamental desk particulars from the database
def get_basic_table_details(cursor):
    cursor.execute("""
        SELECT c.table_name, c.column_name, c.data_type
        FROM information_schema.columns c
        WHERE c.table_name IN (
            SELECT tablename
            FROM pg_tables
            WHERE schemaname="public"
        );""")
    return cursor.fetchall()

# Operate to create vector databases from CSV recordsdata
def create_vectors(filename, persist_directory):
    loader = CSVLoader(file_path=filename, encoding="utf8")
    knowledge = loader.load()
    vectordb = Chroma.from_documents(knowledge, embedding=embeddings, persist_directory=persist_directory)
    vectordb.persist()

# Operate to save lots of database particulars and generate vectors
def save_db_details(db_uri):
    unique_id = str(uuid4()).change("-", "_")
    connection = psycopg2.join(db_uri)
    cursor = connection.cursor()
    tables_and_columns = get_basic_table_details(cursor)
    df = pd.DataFrame(tables_and_columns, columns=['table_name', 'column_name', 'data_type'])
    filename_t="csvs/tables_" + unique_id + '.csv'
    df.to_csv(filename_t, index=False)
    create_vectors(filename_t, "./vectors/tables_"+ unique_id)
    cursor.shut()
    connection.shut()
    return unique_id

# Operate to generate SQL question templates
def generate_template_for_sql(question, table_info, db_uri):
    template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                f"You are an assistant that can write SQL Queries. Given the text below, write a SQL query that answers the user's question. DB connection string is {db_uri}. Here is a detailed description of the table(s): {table_info} Prepend and append the SQL query with three backticks '```'"
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    return chat_llm(template.format_messages(textual content=question)).content material

# Operate to find out if consumer's question is about normal schema info or SQL
def check_if_users_query_want_general_schema_information_or_sql(question):
    template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                f"In the given text, the user is asking a question about the database. Determine whether the user wants information about the database schema or wants to write a SQL query. Answer 'yes' for schema information and 'no' for SQL query."
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    reply = chat_llm(template.format_messages(textual content=question))
    print(reply.content material)
    return reply.content material

# Operate to immediate when consumer needs normal database info
def prompt_when_user_want_general_db_information(question, db_uri):
    template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                "You are an assistant who writes SQL queries. Given the text below, write a SQL query that answers the user's question. Prepend and append the SQL query with three backticks '```' Write select query whenever possible Connection string to this database is {db_uri}"
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    reply = chat_llm(template.format_messages(textual content=question))
    print(reply.content material)
    return reply.content material

# Operate to course of consumer queries and generate outputs based mostly on whether or not it is about normal schema or particular SQL question
def get_the_output_from_llm(question, unique_id, db_uri):
    filename_t="csvs/tables_" + unique_id + '.csv'
    df = pd.read_csv(filename_t)
    table_info = ''
    for desk in df['table_name'].distinctive():
        table_info += 'Details about desk ' + desk + ':n'
        table_info += df[df['table_name'] == desk].to_string(index=False) + 'nnn'
    answer_to_question_general_schema = check_if_users_query_want_general_schema_information_or_sql(question)
    if answer_to_question_general_schema == "sure":
        return prompt_when_user_want_general_db_information(question, db_uri)
    return generate_template_for_sql(question, table_info, db_uri)

# Operate to execute SQL options
def execute_the_solution(answer, db_uri):
    connection = psycopg2.join(db_uri)
    cursor = connection.cursor()
    _, final_query, _ = answer.break up("```")
    final_query = final_query.strip('sql')
    cursor.execute(final_query)
    outcome = cursor.fetchall()
    return str(outcome)

# Streamlit app setup and interplay dealing with
if __name__ == "__main__":
    st.subheader("Directions")
    st.markdown("""
        1. Enter the URI of your RDS Database within the textual content field beneath.
        2. Click on the **Begin Chat** button to begin the chat.
        3. Enter your message within the textual content field beneath and press **Enter** to ship the message to the API.
    """)
    chat_history = []
    uri = st.text_input("Enter the RDS Database URI")
    if st.button("Begin Chat"):
        if not uri:
            st.warning("Please enter a sound database URI.")
        else:
            st.information("Connecting to the API and beginning the chat...")
            chat_response = connect_with_db(uri)
            if "error" in chat_response:
                st.error("Error: Failed to begin the chat. Please test the URI and take a look at once more.")
            else:
                st.success("Chat began efficiently!")
    st.subheader("Chat with the API")
    if "messages" not in st.session_state:
        st.session_state.messages = []
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])
    if immediate := st.chat_input("What's up?"):
        st.chat_message("consumer").markdown(immediate)
        st.session_state.messages.append({"function": "consumer", "content material": immediate})
        response = send_message(immediate)["message"]
        with st.chat_message("assistant"):
            st.markdown(response)
        st.session_state.messages.append({"function": "assistant", "content material": response})
    st.write("It is a easy Streamlit app for beginning a chat with an RDS Database.")

Within the subsequent step, we are going to carry out vector retrieval to determine essentially the most related tables. As soon as these tables are chosen, we are going to collect all of the column particulars to offer further context for our immediate. We are going to then compile this info right into a string to incorporate within the immediate.

# Initialize the vector database for storing desk embeddings
vectordb = Chroma(embedding_function=embeddings, persist_directory=f"./vectors/tables_{unique_id}")
retriever = vectordb.as_retriever()
docs = retriever.get_relevant_documents(question)
print(docs)

# Accumulating the related tables and their columns
relevant_tables = []
relevant_table_details = []

for doc in docs:
    table_info = doc.page_content.break up("n")
    table_name = table_info[0].break up(":")[1].strip()
    column_name = table_info[1].break up(":")[1].strip()
    data_type = table_info[2].break up(":")[1].strip()
    relevant_tables.append(table_name)
    relevant_table_details.append((table_name, column_name, data_type))

# Load knowledge about all tables from a CSV file
filename_t = f'csvs/tables_{unique_id}.csv'
df = pd.read_csv(filename_t)

# Assemble a descriptive string for every related desk, together with all columns and their knowledge varieties
table_info = ''
for desk in relevant_tables:
    table_info += f'Details about desk {desk}:n'
    table_info += df[df['table_name'] == desk].to_string(index=False) + 'nnn'

def create_sql_query_template(question, relevant_tables, table_info):
    tables_list = ",".be part of(relevant_tables)
    chat_template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                f"As an assistant capable of composing SQL queries, please write a query that resolves the user's inquiry based on the text provided. "
                f"Consider SQL tables named '{tables_list}'. "
                f"Below is a detailed description of these table(s): "
                f"{table_info}"
                "Enclose the SQL query within three backticks '```' for proper formatting."
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    
    response = chat_llm(chat_template.format_messages(textual content=question))
    print(response.content material)
    return response.content material

One closing factor we are able to do is to provide details about overseas keys to the immediate.

import streamlit as st
import os
import pandas as pd
from uuid import uuid4
import psycopg2

from langchain.prompts import ChatPromptTemplate
from langchain.prompts.chat import SystemMessage, HumanMessagePromptTemplate
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.embeddings import OpenAIEmbeddings
from dotenv import load_dotenv
from langchain.vectorstores import Chroma
from langchain.document_loaders.csv_loader import CSVLoader

# Guarantee mandatory directories exist
folders_to_create = ['csvs', 'vectors']
for folder in folders_to_create:
    os.makedirs(folder, exist_ok=True)
    print(f"Listing '{folder}' checked or created.")

# Load atmosphere and API keys
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")

# Initialize language fashions and embeddings
language_model = OpenAI(openai_api_key=openai_api_key)
chat_language_model = ChatOpenAI(openai_api_key=openai_api_key, temperature=0.4)
embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)

def fetch_table_details(cursor):
    sql = """
        SELECT table_name, column_name, data_type
        FROM information_schema.columns
        WHERE table_schema="public";
    """
    cursor.execute(sql)
    return cursor.fetchall()

def fetch_foreign_key_details(cursor):
    sql = """
        SELECT conrelid::regclass AS table_name, conname AS foreign_key,
               pg_get_constraintdef(oid) AS constraint_definition
        FROM pg_constraint
        WHERE contype="f" AND connamespace="public"::regnamespace;
    """
    cursor.execute(sql)
    return cursor.fetchall()

def create_vector_database(knowledge, listing):
    loader = CSVLoader(knowledge=knowledge, encoding="utf8")
    document_data = loader.load()
    vector_db = Chroma(embeddings, persist_directory=listing)
    vector_db.from_documents(document_data)
    vector_db.persist()

def save_database_details(uri):
    unique_id = str(uuid4()).change("-", "_")
    conn = psycopg2.join(uri)
    cur = conn.cursor()
    particulars = fetch_table_details(cur)
    df = pd.DataFrame(particulars, columns=['table_name', 'column_name', 'data_type'])
    csv_path = f'csvs/tables_{unique_id}.csv'
    df.to_csv(csv_path, index=False)
    create_vector_database(df, f"./vectors/tables_{unique_id}")
    
    foreign_keys = fetch_foreign_key_details(cur)
    fk_df = pd.DataFrame(foreign_keys, columns=['table_name', 'foreign_key', 'constraint_definition'])
    fk_csv_path = f'csvs/foreign_keys_{unique_id}.csv'
    fk_df.to_csv(fk_csv_path, index=False)
    
    cur.shut()
    conn.shut()
    return unique_id

def generate_sql_query_template(question, db_uri):
    template = ChatPromptTemplate.from_messages([
        SystemMessage(
            content=(
                f"You are an assistant capable of composing SQL queries. Use the details provided to write a relevant SQL query for the question below. DB connection string is {db_uri}."
                "Enclose the SQL query with three backticks '```'."
            )
        ),
        HumanMessagePromptTemplate.from_template("{text}"),
    ])
    response = chat_language_model(template.format_messages(textual content=question))
    return response.content material

# Streamlit utility setup
st.title("Database Interplay Software")
uri = st.text_input("Enter the RDS Database URI")
if st.button("Connect with Database"):
    if uri:
        strive:
            unique_id = save_database_details(uri)
            st.success(f"Linked to database and knowledge saved with ID: {unique_id}")
        besides Exception as e:
            st.error(f"Failed to attach: {str(e)}")
    else:
        st.warning("Please enter a sound database URI.")

In related methods, we are able to preserve enhancing this utility by including fallbacks. In every fallback, we are able to preserve including further info.

See also  Nominations open for 6th Annual VentureBeat Women in AI Awards

Conclusion

This text presents a novel strategy to creating an AI utility able to seamlessly interacting with large SQL databases via chat. We now have addressed the problem of dealing with giant databases the place it’s impractical to incorporate the whole checklist of columns and tables within the immediate. Our proposed answer dynamically retrieves related desk and column names based mostly on consumer queries. We make sure the immediate contains solely pertinent info, enhancing consumer expertise and effectivity. That is completed by leveraging vector databases like ChromaDB for embedding technology and retrieval.

- Advertisement -

We now have demonstrated how one can streamline the interplay course of via step-by-step implementation and code examples. In the meantime, we additionally labored on constantly bettering the applying’s performance. With additional enhancements comparable to incorporating overseas keys and extra fallbacks, this utility holds promise for various database interplay eventualities.

- Advertisment -

Related

- Advertisment -

Leave a Reply

Please enter your comment!
Please enter your name here