[Python] Using Python for SQL Schema and Table Validation and Logging Progress to Microsoft Teams

Level of Difficulty: Intermediate to Senior.

So you’ve reached created a database, you’ve created schemas and tables. You’ve managed to create an automation Python script that communicates with the database, however, you’ve reached a crossroad where you need to validate your SQL tables. Some of the validation might include ensuring that the tables exist and were created correctly before referencing them in the script. In the case of a failure, a message should be sent to a Microsoft Teams channel.

If that’s the case, this post might have just the code to help!

What are the steps?

The steps that the script will be following are:

  1. Create a webhook to your Teams channel
  2. If a schema was provided
    1. Get all the tables in the schema
    2. Validate that all the tables have a primary key
  3. If a schema was not provided but tables were
    1. Validate that each table exists
    2. Validate that each table has a primary key

Create a Webhook to a Teams Channel

To create a webhook to your teams channel, click on the three dots next to the channel name and select connectors:

Search for “webhook” and select “Configure” on the Incoming Webhook connector:

Provide your webhook with a name and select “Create”:

Be sure to copy your webhook and then hit “Done”:

Create The Python Script That Does The Validation

Import Libraries

import pandas as pd

import pyodbc
from sqlalchemy import create_engine, event
import urllib

import pymsteams

Create a Connect to SQL Method

The purpose of this method is to allow the reusable method to be accessed each time SQL needs to be accessed.

def connect_to_sql(server, user, password, database, auth):
    
    driver = '{SQL Server}'
    
    if user == None:

        params = urllib.parse.quote_plus(r'DRIVER={};SERVER={};DATABASE={};'.format(driver, server, database))
        conn_str = 'mssql+pyodbc:///?odbc_connect={}'.format(params)
        engine = create_engine(conn_str)
        
        return engine
    
    else:
        
        params = urllib.parse.quote_plus(r'DRIVER={};SERVER={};DATABASE={};UID={};PWD={}'.format(driver, server, database, user, password))
        conn_str = 'mssql+pyodbc:///?odbc_connect={}'.format(params)
        engine = create_engine(conn_str)
        
        return engine

Create Send Teams Message Method

The method below enables the sending of a teams message to a channel. In order to configure this to work, add the webhook configured above to the method below.

def send_teams_message(text):
    
    webhook = '<add webhook>'
    
    # You must create the connectorcard object with the Microsoft Webhook URL
    myTeamsMessage = pymsteams.connectorcard(webhook)

    # Add text to the message.
    myTeamsMessage.text(text)

    # send the message.
    myTeamsMessage.send()

Create Validate Primary Keys Method

This method ensures that primary keys are validated and returns the status as well as the text to be sent to MS Teams.

def ValidatePrimaryKeys(engine):
    
    query = "select schema_name(tab.schema_id) as [schema_name], tab.[name] as table_name from sys.tables tab left outer join sys.indexes pk on tab.object_id = pk.object_id and pk.is_primary_key = 1 where pk.object_id is null order by schema_name(tab.schema_id), tab.[name]"

    df = pd.read_sql(query, engine)
    
    keysValid = True
    
    text = ""
    
    if len(df) > 0:

        text = 'Primary Key Validation Failed.\n\n'

        for index, element in df.iterrows():

            text += '{} does not have a primary key\n\n'.format(element['table_name'])
            
            keysValid = False

        # send_teams_message(text)
        
    return keysValid, text

Create Validate Tables Method

This method validates that the tables exist within the current database and returns the status as well as the text to be sent to MS Teams.

def ValidateTables(engine, tables):

    query = "select tab.[name] as table_name from sys.tables tab"

    df = pd.read_sql(query, engine)

    tables_split = []
    
    tables = tables.replace(' ','')

    if ';' in tables:

        tables_split = tables.split(';')

    elif ',' in tables:

        tables_split = tables.split(',')
    
    elif len(tables) != 0:
        
        tables_split = [tables]

    text = ""

    tablesValid = True

    for table in tables_split:

        if table not in df['table_name'].tolist() and (table != '' and table != None):

            text += 'Table not found in database: {}\n\n'.format(table)
            tablesValid = False

    if tablesValid:

        text = 'Table Validation Passed\n\n'

    else:

        text = 'Table Validation Failed\n\n' + text

    #send_teams_message(text)
    
    return tablesValid, text

Create Validate Schema Method

This method validates that the schema exists. Once the schema is validated, all tables in the schema are retrieved and their primary keys are validated.

def ValidateFromSchema(schemas, engine):
    
    text = ""
    tableText = ""
    
    schemas_split = []
    
    schemas = schemas.replace(' ','')

    if ';' in schemas:

        schemas_split = schemas.split(';')

    elif ',' in schemas:

        schemas_split = schemas.split(',')
    
    elif len(schemas) != 0:
        
        schemas_split = [schemas]
        
    isValid = True

    for schema in schemas_split:
        
        if (isValid):
        
            query = "SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{}'".format(schema)

            df = pd.read_sql(query, engine)
            
            if (len(df) > 0):

                query = "select t.name as table_name from sys.tables t where schema_name(t.schema_id) = '{}'".format(schema)
                
                df = pd.read_sql(query, engine)
                
                tables = ",".join(list(df["table_name"]))
                
                validateTables = ValidateTables(engine, tables)
                
                isValid = validateTables[0]
                tableText += "{}\n\n".format(validateTables[1])
                
            else:
                
                isValid = False
                
                text += "Schema Validation Failed\n\n"
                text += "Schema not found in database: {}\n\n".format(schema)
                    
    if (isValid):
        
        text = "Schema Validation Passed\n\n"
        
        text = "{}\n\n{}".format(text, tableText)
        
    return isValid, text

Create Validate SQL Method (Equivalent to “main”)

This method acts as the main method and encapsulates all the methods in the correct order and executes the proceeding tasks.

def ValidateSQL(project, server, database, schemas, tables, auth, username, password):
    
    engine = connect_to_sql(server, username, password, database, auth)
    
    summaryText = None
    
    if (schemas != None):
        
        validateSchemas = ValidateFromSchema(schemas, engine)
        
        isValid = validateSchemas[0]
        text = validateSchemas[1]
    
    else:
        
        validateTables = ValidateTables(engine, tables)
    
        isValid = validateTables[0]
        text = validateTables[1]

    if isValid:
        
        summaryText = 'Primary Key Validation Passed\n\n'
        
        validatePrimaryKeys = ValidatePrimaryKeys(engine)
        
        isKeysValid = validatePrimaryKeys[0]
        
        pkText = validatePrimaryKeys[1]
        
        if isKeysValid:
            
            text += summaryText
            
        else:
            
            text += pkText
        
    else:
        
        summaryText = text
        
    text = "<strong><u>{}<u><strong>:\n\n{}".format(project, text)
                
    send_teams_message(text)
    
    return isValid

Calling the Validate SQL Method

The below is how you’d initialise the variables and use them to call the ValidateSQL method.

server = '<server>'
user = '<user>'
password = '<password>'
database = '<database>'
auth = 'SQL' # or Windows
schemas = '<schemas comma separated>'
tables = '<tables comma separated>'
project = '<project>'

ValidateSQL(project, server, database, schemas, tables, auth, user, password)

And that’s a wrap Pandalorians! The Github repo containing the script is available here. Did this help? Did you get stuck anywhere? Do you have any comments or feedback? Please pop it down below or reach out – jacqui.jm77@gmail.com

Published by Jacqui Muller

I am an application architect and part time lecturer by current professions who enjoys dabbling in software development, RPA, IOT, advanced analytics, data engineering and business intelligence. I am aspiring to complete a PhD degree in Computer Science within the next three years. My competencies include a high level of computer literacy as well as programming in various languages. I am passionate about my field of study and occupation as I believe it has the ability and potential to impact lives - both drastically and positively. I come packaged with an ambition to succeed and make the world a better place.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: