python

DBUtils

import os

import pyodbc

from framework.utils.report_utils import ReportUtils

data = None
cursor = None
db_rows_list = []


class DBUtils:

    cursor: pyodbc.Cursor
    data

    def __connect(servername=None, database=None, username=None, password=None):
        """Connect to SQL server

        Args:
            servername (str, optional): DB servername . Defaults to None.
            database (str, optional): database name. Defaults to None.
            username (str, optional): username. Defaults to None.
            password (str, optional): password. Defaults to None.

        Returns:
            _type_: connection
        """
        servername = os.getenv("database_server") if servername == None else servername
        database = os.getenv("database") if database == None else database
        username = os.getenv("db_uid") if username == None else username
        password = os.getenv("db_pwd") if password == None else password
        conn = pyodbc.connect("Driver={SQL Server Native Client 11.0};"
                              "Server=" + servername + ";"
                              "Database=" + database + ";"
                              "uid=" + username + ";pwd=" + password)
        return conn

    def read(query, servername=None, database=None, username=None, password=None):
        """Execute Select SQL statement

        Args:
            query (str): SQL query string
            servername (str, optional): DB servername . Defaults to None.
            database (str, optional): database name. Defaults to None.
            username (str, optional): username. Defaults to None.
            password (str, optional): password. Defaults to None.

        Raises:
            pyodbc.Error: Any sql based error

        Returns:
            class: DBUtils
        """
        try:
            connect = DBUtils.__connect(servername, database, username, password)
            DBUtils.cursor = connect.cursor()
            DBUtils.data = DBUtils.cursor.execute(query)
            ReportUtils.log("---Executed select query-----")
        except pyodbc.Error as ex:
            sqlstate = ex.args[0]
            ReportUtils.log(f"\n ****Error****: {ex}, State: {sqlstate}", ReportUtils.level_info)
            raise pyodbc.Error
        except Exception as e:
            DBUtils.data = None
            ReportUtils.log(f"SQL exception----Setting data to none \n {e}")

        return DBUtils

    def update(query, servername=None, database=None, username=None, password=None):
        """Execute Update and Insert SQL statement

        Args:
            query (str): SQL query string
            servername (str, optional): DB servername . Defaults to None.
            database (str, optional): database name. Defaults to None.
            username (str, optional): username. Defaults to None.
            password (str, optional): password. Defaults to None.

        Raises:
            pyodbc.Error: Any sql based error

        Returns:
            class: DBUtils
        """
        try:
            connect = DBUtils.__connect(servername, database, username, password)
            DBUtils.cursor = connect.cursor()
            DBUtils.data = DBUtils.cursor.execute(query)
            connect.commit()
            ReportUtils.log("---Executed insert/update query-----")
        except pyodbc.Error as ex:
            sqlstate = ex.args[0]
            ReportUtils.log(f"\n ****Error****: {ex}, State: {sqlstate}", ReportUtils.level_info)
            raise pyodbc.Error
        except Exception as e:
            DBUtils.data = None
            ReportUtils.log(f"SQL exception----Setting data to none \n {e}")

        return DBUtils

    def query_to_json(query, servername=None, database=None, username=None, password=None):
        """Returns SQL query output to json array object Note: This needs to be parsed later

        Args:
            query (str): SQL query string
            servername (str, optional): DB servername . Defaults to None.
            database (str, optional): database name. Defaults to None.
            username (str, optional): username. Defaults to None.
            password (str, optional): password. Defaults to None.

        Returns:
            _type_: _description_
        """
        connect = DBUtils.__connect(servername, database, username, password)
        cursor = connect.cursor()
        data = cursor.execute(query)
        columns = [column[0] for column in cursor.description]
        json_array = []
        rows = data.fetchall()
        for row in rows:
            formatted_row = [str(i) for i in row]
            json_array.append(dict(zip(columns, map(str, formatted_row))))
        return json_array
    
    def get_first_row():
        """Get only First row from SQL

        Returns:
            list: List of data
        """
        row = None
        try:
            row = DBUtils.data.fetchone()[0]
        except Exception as e:
            row = None
            ReportUtils.log(f"SQL exception----Setting data to none \n {e}")
        return row

    def get_all_row():
        """Get data for multi rows

        Returns:
            list: get multiple rows of data from connected sql
        """
        rows = None
        try:
            rows = DBUtils.data.fetchall()
        except Exception as e:
            rows = None
            ReportUtils.log(f"SQL exception----Setting data to none \n {e}")
        return rows

    def assert_row_count(expected_row_count):
        """Check and verify if row count is matching

        Args:
            expected_row_count (int): expected row count should match with get row
        """
        assert expected_row_count == DBUtils.get_first_row(), "Row count is not matching"

    def close_connection():
        """ Close any open DB connection """
        if DBUtils.cursor is not None:
            DBUtils.cursor.close()
            
    def call_stored_proc(procName, *args):
        conn = DBUtils.__connect()
        sql = """SET NOCOUNT ON;
            DECLARE @ret int
            EXEC @ret = %s %s
            SELECT @ret""" % (procName, ','.join(['?'] * len(args)))
        output = (conn.execute(sql, args).fetchone()[0])
        conn.commit()
        return output
Was this helpful?