积分活动实战:PostgreSQL应用案例解析

发表时间: 2024-03-06 21:59

本文以通过设计和完成一个积分小活动应用的案例,展示应用是如何一步步通过利用PostgreSQL数据库相关知识实现的。

1、积分活动说明

在一个系统中,每个用户都有自己的积分。用户可以使用自己的积分来参与积分活动。积分活动每一期接受3次积分参与,从3次参与中随机选择1次作为获胜记录,对应的用户将获得该期参与的全部积分。

2、制作ERD图

ERD是“Entity-Relationship Diagram”(实体-关系图)的缩写,是一种用于数据库设计和概念模型设计的图形化表示方法。ERD图展示了数据模型中的实体(表)、实体属性(列)以及实体之间的关系。本应用尽可能的简化以突出关键要素,共三张表:用户、期号与参与记录表。

(1)用户表 t_user

列名

数据类型

类型

可空值

描述

user_id

varchar(20)

主键

用户ID

points

bigint


用户积分

(2)期号表t_issue

列名

数据类型

类型

可空值

描述

issue_id

varchar(20)

主键

期号

over_status

boolean


完成状态

over_status_at

timestamp


状态时间

user_id

varchar(20)

外键

获奖用户

award_points

bigint


获奖积分

(3)用户参与记录表t_user_issue_act

列名

数据类型

类型

可空值

描述

act_id

bigint

主键

参与活动ID

issue_id

varchar(20)

外键

期号

user_id

varchar(20)

外键

用户ID

act_points

bigint


参与积分

act_at

timestamp


参与时间

is_win

boolean


是否中奖

over_status

boolean


完成状态

(4)ERD图

在ERD图中,主要的组件包括:

实体(Entities):通常表示为矩形,代表数据模型中的表或类。

属性(Attributes):表示实体的特性,通常显示在实体矩形内部或旁边。

关系(Relationships):表示实体之间的联系,通常用菱形表示,并且通过线连接到相关的实体。

键(Keys):用于唯一标识实体的属性,主键通常用下划线表示。

联系类型(Cardinality):表示实体之间关系的类型和数量,如一对一(1:1)、一对多(1:M)或多对多(M:N)。

3、生成模型、数据准备

(1)模型生成

在数据为中执行以下语句创建相关模型。

BEGIN;CREATE TABLE IF NOT EXISTS public.t_user(    user_id character varying(20) NOT NULL,    points bigint NOT NULL DEFAULT 0,    PRIMARY KEY (user_id));COMMENT ON TABLE public.t_user    IS '用户表(含积分)';CREATE TABLE IF NOT EXISTS public.t_issue(    issue_id character varying(20) NOT NULL,    over_status boolean NOT NULL DEFAULT false,    over_status_at timestamp without time zone NOT NULL,    user_id character varying(20),    award_points bigint DEFAULT 0,    PRIMARY KEY (issue_id));COMMENT ON TABLE public.t_issue    IS '期号';COMMENT ON COLUMN public.t_issue.issue_id    IS '期号';COMMENT ON COLUMN public.t_issue.over_status    IS '完成状态';COMMENT ON COLUMN public.t_issue.over_status_at    IS '状态时间';COMMENT ON COLUMN public.t_issue.user_id    IS '获奖用户';COMMENT ON COLUMN public.t_issue.award_points    IS '获奖积分';CREATE TABLE IF NOT EXISTS public.t_user_issue_act(    act_id bigint NOT NULL,    issue_id character varying(20) NOT NULL,    user_id character varying(20) NOT NULL,    act_points bigint,    act_at timestamp without time zone,    is_win boolean,    over_status boolean NOT NULL DEFAULT false,    PRIMARY KEY (act_id));COMMENT ON TABLE public.t_user_issue_act    IS '用户参与活动';ALTER TABLE IF EXISTS public.t_issue    ADD FOREIGN KEY (user_id)    REFERENCES public.t_user (user_id) MATCH SIMPLE    ON UPDATE NO ACTION    ON DELETE NO ACTION    NOT VALID;ALTER TABLE IF EXISTS public.t_user_issue_act    ADD FOREIGN KEY (issue_id)    REFERENCES public.t_issue (issue_id) MATCH SIMPLE    ON UPDATE NO ACTION    ON DELETE NO ACTION    NOT VALID;ALTER TABLE IF EXISTS public.t_user_issue_act    ADD FOREIGN KEY (user_id)    REFERENCES public.t_user (user_id) MATCH SIMPLE    ON UPDATE NO ACTION    ON DELETE NO ACTION    NOT VALID;END;

(2)数据准备

我们在用户表中插入3个用户,初始积分均为100,用于模拟演示。

insert into t_user values       ('Alice', 100),        ('Bob', 100),              ('Charlie',100)

为期号表与用户参与记录表建立相应的序列。

CREATE SEQUENCE t_issue_seq START WITH 1 INCREMENT BY 1;CREATE SEQUENCE t_user_issue_act_seq START WITH 1 INCREMENT BY 1;

4、实现函数

(1)完整函数

参与活动的过程参数包括用户及参与的积分数,定义函数 act_in_issue(v_user_id, v_points)。为了测试更方便,实现中允许积分为负。完整函数实现如下:

CREATE OR REPLACE FUNCTION public.act_in_issue(    v_user_id character varying,    v_points bigint)    RETURNS character varying    LANGUAGE 'plpgsql'    COST 100    VOLATILE PARALLEL UNSAFEAS $BODY$DECLARE    lv_current_issue_id character varying;    lv_current_over_status boolean;    lv_act_count integer;    lv_winner_user_id character varying;    lv_winner_act_id bigint;    lv_user_points bigint;    lv_win_points bigint;    lv_message character varying;    lv_rec record;    lv_max_act int;BEGIN    lv_max_act := 3; -- 三次一轮    -- 检查当前期号是否已结束    SELECT issue_id, over_status INTO lv_current_issue_id, lv_current_over_status    FROM t_issue    ORDER BY over_status_at DESC    LIMIT 1    FOR UPDATE;    -- 如果当前期号已结束,则创建一个新期号    IF lv_current_over_status IS null OR lv_current_over_status THEN        -- 使用序列生成新的期号,并使用LPAD确保长度为10        INSERT INTO t_issue (issue_id, over_status_at) VALUES (LPAD(nextval('t_issue_seq')::text, 10, '0'), now())        RETURNING issue_id INTO lv_current_issue_id;    END IF;         -- 积分上下限以及用户积分是否足够    IF v_points<1 OR v_points>10000 THEN         RAISE NOTICE '一次积分范围为1-10000。';        -- 或者可以使用 RETURN; 终止存储过程        RETURN json_build_object('err_code', 101, 'err_msg', '一次积分范围为1-10000。');    END IF;        SELECT points INTO lv_user_points FROM t_user     WHERE user_id = v_user_id;    IF lv_user_points IS NULL THEN         RETURN json_build_object('err_code', 101, 'err_msg', '无此用户。');    END IF;    IF lv_user_points < v_points THEN        -- 用户积分不足        -- 不作处理:允许负数积分        RAISE NOTICE '积分不足';    END IF;        -- 记录用户的参与行为,并扣除相应的积分    INSERT INTO t_user_issue_act (act_id, issue_id, user_id, act_points, act_at, is_win, over_status)    VALUES (nextval('t_user_issue_act_seq'), lv_current_issue_id, v_user_id, v_points, now(), false, false);    UPDATE t_user    SET points = points - v_points    WHERE user_id = v_user_id;    -- 检查当前期号是否已有足够参与记录    SELECT COUNT(1) INTO lv_act_count    FROM t_user_issue_act    WHERE issue_id = lv_current_issue_id;    -- 如果当前期号已有足够参与,则随机选择一个中奖者    IF lv_act_count >= lv_max_act THEN        SELECT user_id, act_id INTO lv_winner_user_id, lv_winner_act_id        FROM t_user_issue_act        WHERE issue_id = lv_current_issue_id        ORDER BY random()        LIMIT 1;                -- 本期积分总数        SELECT SUM(act_points) INTO lv_win_points FROM t_user_issue_act WHERE issue_id = lv_current_issue_id;                -- 更新中奖用户的积分        UPDATE t_user        SET points = points + lv_win_points        WHERE user_id = lv_winner_user_id;        -- 更新中奖记录        UPDATE t_user_issue_act        SET is_win = (act_id=lv_winner_act_id), over_status = true        WHERE issue_id = lv_current_issue_id;        -- 结束当前期号        UPDATE t_issue        SET over_status = true, user_id = lv_winner_user_id, award_points = lv_win_points        WHERE issue_id = lv_current_issue_id;    END IF;        -- 活动进展情况消息    lv_message := '';    SELECT user_id, over_status, award_points INTO lv_winner_user_id, lv_current_over_status, lv_win_points    FROM t_issue    WHERE issue_id = lv_current_issue_id;        -- 结束状态信息    IF lv_current_over_status THEN        lv_message := lv_message || '第 ' || lv_current_issue_id || ' 期活动已结束' || chr(10);        lv_message := lv_message || lv_winner_user_id || ' 获得积分:' || lv_win_points || chr(10);     ELSE       lv_message := lv_message || '第 ' || lv_current_issue_id || ' 期活动进行中' || chr(10);            END IF;         -- 参与详情信息    lv_message := lv_message || '———————————' || chr(10);    lv_message := lv_message || '参与情况:' || chr(10);    FOR lv_rec IN ( SELECT ROW_NUMBER() OVER(ORDER BY act_id) order_no, user_id, act_points                      FROM t_user_issue_act                      WHERE issue_id = lv_current_issue_id)     LOOP        lv_message := lv_message || lv_rec.order_no || '、' || lv_rec.user_id || ':' || lv_rec.act_points || chr(10);    END LOOP;        -- 未结束时剩余信息    IF NOT lv_current_over_status THEN         SELECT COUNT(1) INTO lv_act_count FROM t_user_issue_act WHERE issue_id = lv_current_issue_id;       lv_message := lv_message || '———————————' || chr(10);        lv_message := lv_message || chr(10) || '离结束还剩 ' || (lv_max_act-lv_act_count) || ' 次。' || chr(10);     END IF;        RETURN json_build_object('err_code', 200, 'err_msg', lv_message);END $BODY$;

(2)相关要点

函数的定义

使用 CREATE OR REPLACE FUNCTION 来创建和替换一个函数。

CREATE OR REPLACE FUNCTION act_in_issue(v_user_id character varying, v_points bigint)RETURNS character varyingLANGUAGE plpgsqlAS $$ ...... $$;

序列的使用

使用nextval('t_issue_seq')获取序列t_issue_seq的下一个值,再通过LPAD函数在前面补足零,以确保期号长度为10。

LPAD(nextval('t_issue_seq')::text, 10, '0')

输出消息

使用 RAISE 进行消息输出。

RAISE NOTICE '一次积分范围为1-10000。';

使用格式化输出可以带变量。

RAISE NOTICE '消息:%', 变量

并发控制

通过 FOR UPDATE 加锁来控制并发操作。

在一个事务中使用SELECT ... FOR UPDATE语句时,它会为返回的每一行数据设置一个排他锁(X锁)。这意味着在事务结束之前,其他事务不能对这些行执行写操作,如UPDATE、DELETE或再次使用SELECT FOR UPDATE。

结果集遍历

通过 FOR var_rec IN (SELECT ...) LOOP ... END LOOP; 来遍历结果集。

FOR lv_rec IN ( SELECT ROW_NUMBER() OVER(ORDER BY act_id) order_no, user_id, act_points      FROM t_user_issue_act       WHERE issue_id = lv_current_issue_id) LOOP      lv_message := lv_message || lv_rec.order_no || '、' || lv_rec.user_id || ':' || lv_rec.act_points || chr(10);END LOOP;


5、测试验证

我们再通过一段代码,随机的选择一个用户,随机确定积分数量来参与活动,测试函数运行的效果,代码如下。

DO $$declare lv_user_id varchar;declare lv_points bigint;declare lv_mess varchar;begin  -- 从三个用户中随机选择一个  lv_user_id := (ARRAY['Alice','Charlie','Bob'])[FLOOR(RANDOM() * 3) + 1];  -- 随机 1-100 的积分  lv_points := FLOOR(RANDOM() * 100) + 1;  -- 参与一次活动取得结果  select act_in_issue(lv_user_id, lv_points) into lv_mess;   -- 显示本次活动结果  raise notice '消息:%', lv_mess::json->>'err_msg';end$$

运行三次出结果:

 0000000024 期活动已结束 Alice 获得积分:184———————————参与情况:1、Bob:412、Alice:663、Alice:77

运行效果如下图:

通过参与记录表,可以看到每期的获胜情况。

通过用户表可看到用户积分的变化。


6、交互应用

至此,已经准备好了数据库,我们需要一个用户界面供用户参与活动。这里,我们期待的效果是一个用户可以选择积分并点击参与活动,然后可以看到活动结果和自己积分的变化。这需要我们进行交互应用的开发,开发语言与框架的选择很多,开发语言如C#、Java、Python、Nodejs等。只要连接到数据库,再提供一个界面供操作即可。这里我们选择 Python WEB页面的方式来实现。

(1)Python访问数据库

Python 中可以通过 psycopg2 来连接 PostgreSQL 服务器访问和操作数据库。以下代码为连接至本地的测试数据库,执行 act_in_issue 函数得到返回的活动详情结果并显示。

import psycopg2import jsonconn = psycopg2.connect(host="localhost", port=5433, database="testdb", user="postgres", password="postgres#")cur = conn.cursor()cur.execute("select act_in_issue('Bob', 5) result")rows = cur.fetchall()cur.close()result = json.loads(rows[0][0])print(result["err_msg"])

运行结果如下图:

(2)WEB服务

这里我们选择 Flask,及其相关的一个扩展 Flask-SocketIO。Flask 是一个轻量级的Web框架,提供了许多内置的功能,如路由、模板引擎和数据库集成,使得开发Web服务变得非常简单。详细介绍请参考《Web服务Python实践》

在 WEB 服务中,我们为界面提供接口来调用数据库服务,主要有两个接口。一是参与活动接口,接收用户与积分参数,调用参与活动的函数得到结果,并发送一条活动消息。另一个是查询用户积分的接口。我们创建 act_issue_api.py 文件来书写WEB服务代码,详细代码实现如下。

from flask import Flask, render_template, request, jsonifyfrom flask_socketio import SocketIO from flask_cors import CORS  import psycopg2import json # postgresql 连接conn = psycopg2.connect(host="localhost", port=5433, database="testdb", user="postgres", password="postgres#")conn.autocommit = True  app = Flask(__name__)  # 允许跨域CORS(app, resources={r"/*": {"origins": "*"}}) # WebScoket 定义socketio = SocketIO(app, cors_allowed_origins="*", ping_timeout=6000, ping_interval=5000)# 参与活动@app.route('/act', methods=['GET'])def act():    userid = request.args['userid']    points = request.args['points']         # 调用参与活动函数    cur = conn.cursor()    param = (userid, points)    cur.execute("select act_in_issue(%s, %s) result", param)    rows = cur.fetchall()    cur.close()    result = json.loads(rows[0][0])    if result["err_code"] == 200:       # 通过 WebSocket 发送消息       socketio.emit("message", result["err_msg"])       return jsonify(result)@app.route('/get_points', methods=['GET'])def get_points():    userid = request.args['userid']         # 查询用户积分    cur = conn.cursor()    param = (userid,)    cur.execute("select points from t_user where user_id=%s", param)    rows = cur.fetchall()    cur.close()    points = 0    if len(rows) > 0:       points = rows[0][0]        return jsonify({ "err_code":200, "err_msg":"success", "points":points })    return jsonify({ "err_code":-1, "err_msg":"未找到用户" })# 首页页面,模板文件位于 ./templates 下,作为用户参与活动的界面@app.route('/')def index():    return render_template('index.html', async_mode=socketio.async_mode)# 启动 Web 服务if __name__ == '__main__':      socketio.run(app, host='0.0.0.0', port=5000)

参与活动接口及参数示例:

http://localhost:5000/act?userid=Bob&points=1

查询用户积分的接口及参数示例:

http://localhost:5000/get_points?userid=Bob

(3)WEB页面

在WEB服务代码中,已经指定了首页路由指向模板文件 index.html,这个文件默认位于代码文件同级目录 templates 文件夹下。页面我们实现以下效果:

  • 动态显示活动进度消息
  • 可点击指定积分参与活动(演示页面用户名直接输入)
  • 可显示用户当前积分

具体实现如下:

<!DOCTYPE html>  <html>  <head>      <meta name="viewport" content="width=device-width, initial-scale=1">    <title>积分活动</title>     <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.min.js"></script>      <script src="https://cdn.staticfile.org/jquery/1.7.1/jquery.min.js" crossorigin="anonymous"></script>      <style>        h3{ text-align:center; }        #board { padding:20px; margin:10px; border:solid 1px gray; border-radius:5px; height:400px; overflow-y:scroll; }        #userid { width:60px; font-size:18px; }        #act { text-align:center; padding:15px; }        #info { font-size:18px; }        button { font-size:18px; color:blue; }        p{padding:10px; font-size:14px;}        .message{ margin-bottom:10px; background:#eeeeee }</style>     <script>      // 创建 Socket.IO 客户端连接      var socket = io("http://localhost:5000");      // 监听服务器发送的消息      socket.on('message', function(message) {          $("#board").append("<p class='message'>" + message.replace(/\n/g, "<br />")+ "</p>");         document.getElementById("board").scrollTop = document.getElementById("board").scrollHeight;    });      // 参与活动    function issue_act(src) {           var userid = $("#userid").val();           var points = $(src).attr("points");          $.get("http://localhost:5000/act?userid=" + userid + "&points=" + points, {}, function(resp){           $("#resp").html("<p>" + JSON.stringify(resp) + "</p>");        });    }          // 查询用户积分    function query_points() {           var userid = $("#userid").val();            $.get("http://localhost:5000/get_points?userid=" + userid, {}, function(resp){           $("#resp").html("<p>" + JSON.stringify(resp) + "</p>");           if(resp.err_code == 200){              $("#info").html("<p>" + userid + " 当前积分:<b style='color:orange;'>" + resp.points + "</b></p>");           }        });        // 记录一下填写的用户名下次加载        localStorage.setItem("userid", userid);    }       $(function(){        if(localStorage.getItem("userid")){            $("#userid").val(localStorage.getItem("userid"));        }        // 定时查分        setInterval(query_points, 2000);    });</script>  </head>  <body>      <h3>参与积分活动</h3>      <div id="board"></div>         <div id="act">            <input type="text" id="userid" value="Bob" placeholder="用户名">        参与积分数(点击参与):        <button onclick="issue_act(this)" points="10">10</button>        <button onclick="issue_act(this)" points="20">20</button>        <button onclick="issue_act(this)" points="50">50</button>                <button onclick="issue_act(this)" points="75">75</button>          <div id="info"></div>        <div id="resp"></div>    </div>    </body>  </html>

在浏览器中访问 http://localhost:5000/ 打开页面。

或者直接双击打开 index.html 也可以,因为代码中设定了允许跨域访问。

效果如下图所示。

至此,演示案例完结。