Python SQLite3中SQL事务在出入库操作中的应用

发表时间: 2023-08-17 13:57

数据库中的事务是指对数据库执行一批操作,在同一个事务当中,这些操作最终要么全部执行成功,要么全部失败,不会存在部分成功的情况。从而保证数据的一致性和安全性。

事务的四大特性:

原子性(Atomicity):事务是数据库的逻辑工作单位,事务中包含的各操作要么都做,要么都不做

一致性(Consistency):事务开始前和结束后,数据库的完整性约束没有被破坏 。

隔离型(Isolation):一个事务的执行不能被其它事务干扰。即一个事务内部的操作及使用的数据对其它并发事务是隔离的,并发执行的各个事务之间不能互相干扰。

持久性(Durability):指一个事务一旦提交,它对数据库中的数据的改变就应该是永久性的。接下来的其它操作或故障不应该对其执行结果有任何影响。

下面通过出入库实例演示事务在工作中的应用。

一、思路

1、首先创建基础数据表,设计好表头。

wheat_InBound 入库明细表

wheat_InBound_sum 库存表(入库汇总表)

wheat_OutBound 出库明细表

wheat_OutBound_sum 出库汇总表

2、事务的应用

新增入库

新增出库

二、效果如下:

1、创建基础数据表及功能菜单


这里我们看到成功创建了四个基础数据表,如下图所示:

2、事务在新增入库中的应用

这里应用了事务,在新增入库时,同时将数据插入了 wheat_InBound 入库明细表和 wheat_InBound_sum 库存表(入库汇总表)两个表中,若其中有一个失败,数据将回滚,保证了数据的一致性。

3、事务在新增出库中的应用

这里也应用了事务,在新增出库时,同时将数据插入了 wheat_OutBound 出库明细表 和 wheat_OutBound_sum 出库汇总表两个表中,并且同时更新了 wheat_InBound_sum 库存表(入库汇总表),若其中有一个失败,数据将回滚,保证了数据的一致性。

4、查询

三、代码

def create_db():    print("正在进行数据库初始化...")    if os.path.exists(path):        cn = sqlite3.connect(path)        cur = cn.cursor()        print("成功连接 mydata.db 数据库并取得游标对象!")    else:        cn = sqlite3.connect(path)        cur = cn.cursor()        print('成功创建 mydata.db 数据库并获得游标对象!')    sql1 = '''create table if not exists wheat_InBound(                        id integer primary key AutoIncrement,                        date txt,                        product_name varchar(20),                        description varchar(25),                        price decimal(4,2),                        qty int)                    '''    sql2 = '''create table if not exists wheat_OutBound(                            id integer primary key AutoIncrement,                            date txt,                            customer varchar(20),                            address varchar(100),                            product_name varchar(20),                            description varchar(25),                            price decimal(4,2),                            qty int)                        '''    sql3 = '''create table if not exists wheat_InBound_sum(                                id integer primary key AutoIncrement,                                date txt,                                product_name varchar(20),                                description varchar(25),                                price decimal(4,2),                                qty int)                            '''    sql4 = '''create table if not exists wheat_OutBound_sum(                                id integer primary key AutoIncrement,                                date txt,                                customer varchar(20),                                address varchar(100),                                product_name varchar(20),                                description varchar(25),                                price decimal(4,2),                                qty int)                            '''    sql = 'begin;'  # 开启事务    try:        cur.execute(sql)        cur.execute(sql1)        cur.execute(sql2)        cur.execute(sql3)        cur.execute(sql4)        cn.commit()  # 提交事务        print('成功创建 wheat_InBound 表!')        print('成功创建 wheat_InBound_sum 表!')        print('成功创建 wheat_OutBound 表!')        print('成功创建 wheat_OutBound_sum 表!')    except Exception as e:        print(e)        cn.rollback()  # 事务回滚        print('Sorry,创建失败,已回滚 !')    finally:        cur.close()        cn.close()        print('数据库初始化完成 !')        print('successfully !')def InBound():    print("商品入库")    date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")    product_name = input("输入商品名称:")    description = input("输入商品描述:")    price = input("输入商品价格:")    qty = input("输入入库数量:")    sql1 = "insert into wheat_InBound(date,product_name,description,price,qty) values('%s','%s','%s','%s','%s')" \           % (date, product_name, description, price, qty)    sql3 = "delete from wheat_InBound_sum;"    sql4 = "update sqlite_sequence set seq = 0 where name = 'wheat_InBound_sum';"    sql2 = f"insert into wheat_InBound_sum (date, product_name ,description ,qty) " \           f"select date, product_name,description," \           f"sum(qty) as sum from wheat_InBound group by product_name,description ;"    cn = None    curs = None    sql_begin = 'begin;'  # 开启事务    try:        cn = sqlite3.connect(path)        curs = cn.cursor()        curs.execute(sql_begin)        curs.execute(sql1)        curs.execute(sql3)        curs.execute(sql4)        curs.execute(sql2)        cn.commit()     # 提交事务        print("Insert wheat_InBound successfully! 入库成功 !")        print("Insert wheat_InBound_sum successfully! 汇总入库成功 !")    except Exception as e:        print(e)        cn.rollback()  # 事务回滚        print('回滚成功 !')    finally:        curs.close()        cn.close()        print('Closed sqlite3 !')def InBound_display():    print("-------------------------------入库明细查询结果------------------------------")    try:        sql = "select * from wheat_InBound ;"        cn1 = sqlite3.connect(path)        df = pd.read_sql(sql, cn1)        print(df)        print('------------------------------Finished!-----------------------------------')    except Exception as e:        print(e)def OutBound():    print("--------------------------商品出库-----------------------------")    date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")    customer = input("please input customer name:")    product_name = input("输入商品名称:")    description = input("输入商品描述:")    price = input("输入商品价格:")    qty = input("输入出库数量:")    sql1 = "insert into wheat_OutBound(date, customer,product_name,description,price,qty) " \           "values('%s','%s','%s','%s','%s','%s')" \           % (date, customer, product_name, description, price, qty)    sql2 = f"update wheat_InBound_sum set qty = qty-{qty} where " \           f"product_name = '{product_name}' and description = '{description}'"    sql3 = "delete from wheat_OutBound_sum;"    sql4 = "update sqlite_sequence set seq = 0 where name = 'wheat_OutBound_sum';"    sql5 = f"insert into wheat_OutBound_sum (date, customer, product_name ,description ,qty) " \           f"select date, customer,product_name,description," \           f"sum(qty) as sum from wheat_OutBound group by product_name,description ;"    cn = None    curs = None    sql_begin = 'begin;'  # 开启事务    try:        cn = sqlite3.connect(path)        curs = cn.cursor()        curs.execute(sql_begin)        curs.execute(sql1)        curs.execute(sql2)        curs.execute(sql3)        curs.execute(sql4)        curs.execute(sql5)        cn.commit()  # 事务提交        print("wheat_OutBound successfully! 出库成功!")        print("wheat_OutBound_sum update successfully! 汇总出库更新成功!")        print("wheat_InBound_sum update successfully! 库存更新成功!")    except Exception as e:        print(e)        cn.rollback()  # 事务回滚        print('回滚成功 !')    finally:        curs.close()        cn.close()        print('Closed sqlite3 !')def OutBound_display():    print("-----------------------------------出库明细查询结果------------------------------------")    try:        sql = "select * from wheat_OutBound ;"        with sqlite3.connect(path) as cn:            df = pd.read_sql(sql, cn)            print(df)            print('-------------------------------------Finished!--------------------------------------')    except Exception as e:        print(e)    print('\n')    print("--------------------出库汇总结果---------------------")    try:        sql = "select product_name,description,qty from wheat_OutBound_sum ;"        with sqlite3.connect(path) as cn:            df = pd.read_sql(sql, cn)            df.loc['sum'] = df['qty'].sum(axis=0)            df.iloc[-1, 0:2] = '-'            print(df)            print('----------------------Finished!---------------------')    except Exception as e:        print(e)def stock():    print("---------------------库存查询结果--------------------------")    try:        sql = "select product_name,description,qty from wheat_InBound_sum ;"        cn1 = sqlite3.connect(path)        df = pd.read_sql(sql, cn1)        print(df)        print('----------------------Finished!--------------------------')    except Exception as e:        print(e)def delete():    print("Delete product")    while True:        print("""            1. 删除入库信息            2. 删除出库信息            0. 退出        """)        choice = input("请输入您的选项:")        if choice == '1':            product_id = input("请输入要删除的入库商品ID:")            try:                # sql = "delete from InBound where id = {} ".format(product_id)                sql = f"delete from wheat_InBound where id = {product_id}"                with sqlite3.connect(path) as conn:                    cur = conn.cursor()                    cur.execute(sql)                    # cn.commit()                    print('Deleted successfully !')            except Exception as e:                print(e)        elif choice == '2':            product_id = input("请输入要删除的出库商品ID:")            product_name = input("输入商品名称:")            description = input("输入商品描述:")            qty = input("请输入要删除的商品数量:")            conn = None            cur = None            sql_begin = 'begin transaction;'            try:                # sql = "delete from InBound where id = {} ".format(product_id)                sql1 = f"delete from wheat_OutBound where id = {product_id}"                sql2 = f"update wheat_OutBound_sum set qty = qty-{qty} where " \                       f"product_name = '{product_name}' and description = '{description}'"                with sqlite3.connect(path) as conn:                    cur = conn.cursor()                    cur.execute(sql_begin)                    cur.execute(sql1)                    cur.execute(sql2)                    conn.commit()                    print('Deleted successfully !')            except Exception as e:                print(e)                conn.rollback()            finally:                cur.close()                conn.close()        elif choice == 'q' or choice == '0' or choice == 'Q':            print("exit!")            break        else:            print('Error!,输入错误 !')            breakdef update():    print("update product")    product_id = input("请输入要修改的商品ID:")    product_name = input("请输入要正确的商品名称:")    qty = input("请输入要正确的商品数量:")    try:        sql = f"update wheat_InBound set product_name = '{product_name}',qty = {qty} where id = {product_id} "        with sqlite3.connect(path) as cn:            cur = cn.cursor()            cur.execute(sql)            cn.commit()            print('Updated successfully !')    except Exception as e:        print(e)if __name__ == '__main__':    import sqlite3    import os    import pandas as pd    import datetime    pd.set_option('display.unicode.ambiguous_as_wide', True)    pd.set_option('display.unicode.east_asian_width', True)    pd.set_option('expand_frame_repr', False)    pd.set_option('display.max_columns', None, 'display.max_rows', None)    pd.set_option('display.max_rows', 5000)    path = r'D:\pythonProjectSqlite\mydata.db'    create_db()    msg = """                欢迎使用商品出入库管理系统 1.0         --------------------------            1. 入库明细查询            2. 出库明细查询            3. 库存查询            4. 新增入库            5. 新增出库            6. 删除商品            7. 修改商品            q. 退出系统 !        --------------------------        """    while True:        print(msg)        handle = input('请选择操作项目:')        if handle == 'q' or handle == 'Q' or handle == '0':            print('退出系统 !')            break        elif handle == '1':            InBound_display()        elif handle == '2':            OutBound_display()        elif handle == '3':            stock()        elif handle == '4':            InBound()        elif handle == '5':            OutBound()        elif handle == '6':            delete()        elif handle == '7':            update()        else:            print('Error!,输入错误 !')