探索PostgreSQL的高级SQL特性

发表时间: 2022-12-22 17:05

目录

1.with 查询

2.批量插入

3.returning 返回修改数据

4.upsert

5.数据抽样

6.聚合函数

7.窗口函数

1.with 查询

with是pg支持的高级sql特性,简称为CTE,with查询在复杂查询中定义一个辅助语句(可以理解为定义临时表),常用语复杂查询或者递归查询场景

mytest=> with cte as (

mytest(> select generate_series(1,10)

mytest(> )

mytest-> select * from cte ;

generate_series

-----------------

1

2

3

4

5

6

7

8

9

10

(10 rows)

-- 创建测试表

create table BONUS

(

ename VARCHAR(10),

job VARCHAR(9),

sal int4,

comm int4

);

create table DEPT

(

deptno int4 not null,

dname VARCHAR(14),

loc VARCHAR(13)

);

create table EMP

(

empno int4 not null,

ename VARCHAR(10),

job VARCHAR(9),

mgr int4,

hiredate DATE,

sal int4,

comm int4,

deptno int4

);

create table SALGRADE

(

grade int4,

losal int4,

hisal int4

);

-- 测试数据

{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}

{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}

{"deptno":30,"dname":"SALES","loc":"CHICAGO"}

{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}

{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":"1980-12-17","sal":800,"comm":null,"deptno":20}

{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-20","sal":1600,"comm":300,"deptno":30}

{"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-22","sal":1250,"comm":500,"deptno":30}

{"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":"1981-04-02","sal":2975,"comm":null,"deptno":20}

{"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-28","sal":1250,"comm":1400,"deptno":30}

{"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":"1981-05-01","sal":2850,"comm":null,"deptno":30}

{"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":"1981-06-09","sal":2450,"comm":null,"deptno":10}

{"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":"1987-04-19","sal":3000,"comm":null,"deptno":20}

{"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":null,"hiredate":"1981-11-17","sal":5000,"comm":null,"deptno":10}

{"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-08","sal":1500,"comm":0,"deptno":30}

{"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":"1987-05-23","sal":1100,"comm":null,"deptno":20}

{"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":"1981-12-03","sal":950,"comm":null,"deptno":30}

{"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":"1981-12-03","sal":3000,"comm":null,"deptno":20}

{"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7782,"hiredate":"1982-01-23","sal":1300,"comm":null,"deptno":10}

{"grade":1,"losal":700,"hisal":1200}

{"grade":2,"losal":1201,"hisal":1400}

{"grade":3,"losal":1401,"hisal":2000}

{"grade":4,"losal":2001,"hisal":3000}

{"grade":5,"losal":3001,"hisal":9999}

with cte1 as (

select deptno,sum(sal) from emp

group by deptno

),

cte2 as (

select deptno,dname from dept

)

select * from cte1

left join cte2 on cte1.deptno= cte2.deptno

;

1.1 递归查询使用CTE

with查询的一个重要属性是recursive ,使用recursive属性可以引用自己的输出,从而实现递归,一般用于层次结构或者树状结构

with recursive t (x) as (

select 1

union

select x + 1 from t where x < 10

)

select sum(x) from t;

mytest=# with recursive t (x) as (

mytest(# select 1

mytest(# union

mytest(# select x + 1 from t where x < 10

mytest(# )

mytest-#

mytest-# select sum(x) from t;

sum

-----

55

(1 row)

说明:x从1开始,union加1 后的值,循环知道x < 10 结束,然后x值求和

1.2 简单的递归查询

-- 创建测试表

create table test_area(id int4,name varchar(32),pid int4);

--测试数据

[root@dongjj-pc-3 file]# cat insert.txt

1 中国 0

2 辽宁 1

3 山东 1

4 沈阳 2

5 大连 2

6 济南 3

7 和平区 4

8 沈河区 4

--导入册数数据

mytest=# COPY test_area from '/file/insert.txt';

COPY 8

mytest=# select * from test_area;

id | name | pid

----+--------+-----

1 | 中国 | 0

2 | 辽宁 | 1

3 | 山东 | 1

4 | 沈阳 | 2

5 | 大连 | 2

6 | 济南 | 3

7 | 和平区 | 4

8 | 沈河区 | 4

(8 rows)

-- 查询出所有的结构树

with recursive r as (

select * from test_area

union all

select t.* from test_area t ,r where t.id = r.pid

)

select * from r

order by id;

mytest=# with recursive r as (

mytest(# select * from test_area

mytest(# union all

mytest(# select t.* from test_area t ,r where t.id = r.pid

mytest(# )

mytest-# select * from r

mytest-# order by id;

id | name | pid

----+--------+-----

1 | 中国 | 0

1 | 中国 | 0

1 | 中国 | 0

1 | 中国 | 0

1 | 中国 | 0

1 | 中国 | 0

1 | 中国 | 0

1 | 中国 | 0

2 | 辽宁 | 1

2 | 辽宁 | 1

2 | 辽宁 | 1

2 | 辽宁 | 1

2 | 辽宁 | 1

3 | 山东 | 1

3 | 山东 | 1

4 | 沈阳 | 2

4 | 沈阳 | 2

4 | 沈阳 | 2

5 | 大连 | 2

6 | 济南 | 3

7 | 和平区 | 4

8 | 沈河区 | 4

(22 rows)

--查询辽宁的结构树

mytest=# with recursive r as (

mytest(# select * from test_area where id = 7

mytest(# union all

mytest(# select t.* from test_area t ,r where t.id = r.pid

mytest(# )

mytest-# select * from r

mytest-# order by id;

id | name | pid

----+--------+-----

1 | 中国 | 0

2 | 辽宁 | 1

4 | 沈阳 | 2

7 | 和平区 | 4

(4 rows)

--层级name拼接

with recursive r as (

select * from test_area where id = 7

union all

select t.* from test_area t ,r where t.id = r.pid

)

select string_agg(name,'') from (select name from r order by id) n;

mytest=# with recursive r as (

mytest(# select * from test_area where id = 7

mytest(# union all

mytest(# select t.* from test_area t ,r where t.id = r.pid

mytest(# )

mytest-# select string_agg(name,'') from (select name from r order by id) n;

string_agg

--------------------

中国辽宁沈阳和平区

查找当前节点及当前节点的父节点,也可以查询当前节点及当前节点的子节点,只需改下where 条件

with recursive r as (

select * from test_area where id = 4

union all

select t.* from test_area t ,r where t.pid = r.id

)

select * from r

order by id;

mytest=# with recursive r as (

mytest(# select * from test_area where id = 4

mytest(# union all

mytest(# select t.* from test_area t ,r where t.pid = r.id

mytest(# )

mytest-# select * from r

mytest-# order by id;

id | name | pid

----+--------+-----

4 | 沈阳 | 2

7 | 和平区 | 4

8 | 沈河区 | 4

(3 rows)

CTE的优点:

可以简化SQL代码,减少SQL嵌套层数,提高SQL代码可读性

cte语句,已经预处理,减少计算次数,在主查询中可以多次使用

2.批量插入

批量插入就是一次性插入多条数据,主要用于提高插入的效率


2.1 使用insert into select 方式

--创建测试表

create table test_betch1 (id int4,name varchar(20));

mytest=# select count(*) from test_1;

count

--------

500000

(1 row)

mytest=# select * from test_1 limit 10;;

id | name | create_time

----+-----------+----------------------------

1 | 1_francs | 2022-11-09 12:22:04.733582

2 | 2_francs | 2022-11-09 12:22:04.73376

3 | 3_francs | 2022-11-09 12:22:04.733765

4 | 4_francs | 2022-11-09 12:22:04.733767

5 | 5_francs | 2022-11-09 12:22:04.733769

6 | 6_francs | 2022-11-09 12:22:04.73377

7 | 7_francs | 2022-11-09 12:22:04.733772

8 | 8_francs | 2022-11-09 12:22:04.733773

9 | 9_francs | 2022-11-09 12:22:04.733775

10 | 10_francs | 2022-11-09 12:22:04.733776

(10 rows)

mytest=# insert into test_betch1(id ,name)

mytest-# select id ,name from test_1 limit 10000;

INSERT 0 10000

mytest=# select * from test_betch1 limit 10;

id | name

----+-----------

1 | 1_francs

2 | 2_francs

3 | 3_francs

4 | 4_francs

5 | 5_francs

6 | 6_francs

7 | 7_francs

8 | 8_francs

9 | 9_francs

10 | 10_francs

(10 rows)

2.2 使用 insert into values (),()...()方式

--复制表

mytest=# create table test_betch2 as select * from test_betch1 where 1=2;

SELECT 0

--插入数据

insert into test_betch2 (id,name) values (1,'a'),(2,'b'),(3,'c'),(4,'d');

mytest=# insert into test_betch2 (id,name) values (1,'a'),(2,'b'),(3,'c'),(4,'d');

INSERT 0 4

mytest=# select * from test_betch2;

id | name

----+------

1 | a

2 | b

3 | c

4 | d

(4 rows)

2.3 使用COPY或者\copy元命令的方式

此方式已经讲过,可以参考
https://blog.csdn.net/u010438126/article/details/127772311

3.returning 返回修改数据

pg的returning特性可以返回DML修改的数据。

有以下的场景:

3.1 insert语句后接returning属性返回插入数据

--创建测试表

mytest=# create table test_returning as select * from test_betch1 where 1=2;

SELECT 0

mytest=# insert into test_returning(id,name) values (1,'a') returning *;

id | name

----+------

1 | a

(1 row)


--可以指定返回的字段,只要把* 改为指定的字段就行,* 表是返回所有的字段

mytest=# insert into test_returning(id,name) values (2,'b') returning id;

id

----

2

(1 row)

3.2 update语句后接returning属性返回更新后新值

mytest=# update test_returning set name = 'c' where id =1 returning *;

id | name

----+------

1 | c

(1 row)

mytest=# update test_returning set name = 'd' where id =2 returning name;

name

------

d

(1 row)

3.3 delete语句后接returning属性返回删除的数据

mytest=# delete from test_returning where id =1 returning *;

id | name

----+------

1 | c

(1 row)

mytest=# delete from test_returning where id =2 returning name;

name

------

d

(1 row)

4.upsert

pg的upsert特性是指insert ... on conflict update ,用来解决在数据插入过程中发生数据冲突的情况。

比如在发生违反用户自定义约束,在日志数据应用场景中,通常会在事物中批量插入日志数据,如果有一条数据违反约束,会导致整个事物失败,反生回滚,则upsert特性解决此类问题。

--创建测试表

create table user_logins(

user_name text primary key ,

login_cnt int4,

last_login_time TIMESTAMP(0) WITHOUT TIME ZONE

);

insert into user_logins (user_name,login_cnt) VALUES ('zhangsan',1);

因为在user_name上有主键,所以插入重复数据会报错

insert into user_logins (user_name,login_cnt) VALUES ('lisi',1),('zhangsan',2);

mytest=> insert into user_logins (user_name,login_cnt) VALUES ('lisi',1),('zhangsan',2);

ERROR: duplicate key value violates unique constraint "user_logins_pkey"

DETAIL: Key (user_name)=(zhangsan) already exists.

所以为避免此类情况,使用upsert特性处理冲突数据,同事更新冲突数据

insert into user_logins (user_name,login_cnt) VALUES ('lisi',1),('zhangsan',2)

on conflict(user_name)

do update set

login_cnt=user_logins.login_cnt+excluded.login_cnt,last_login_time=now();

mytest=> select * from user_logins;

user_name | login_cnt | last_login_time

-----------+-----------+-----------------

zhangsan | 1 |

mytest=> insert into user_logins (user_name,login_cnt) VALUES ('lisi',1),('zhangsan',2)

mytest-> on conflict(user_name)

mytest-> do update set

mytest-> login_cnt=user_logins.login_cnt+excluded.login_cnt,last_login_time=now();

INSERT 0 2

mytest=> select * from user_logins;

user_name | login_cnt | last_login_time

-----------+-----------+---------------------

lisi | 1 |

zhangsan | 3 | 2022-11-11 18:30:35

(2 rows)

也可以指定当发生数据冲突后什么都不做,只需要指定do nothing 就行了

insert into user_logins (user_name,login_cnt) values ('wangwu',1),('zhangsan',1)

on conflict(user_name) do nothing;

mytest=> insert into user_logins (user_name,login_cnt) values ('wangwu',1),('zhangsan',1)

mytest-> on conflict(user_name) do nothing;

INSERT 0 1

mytest=> select * from user_logins;

user_name | login_cnt | last_login_time

-----------+-----------+---------------------

lisi | 1 |

zhangsan | 3 | 2022-11-11 18:30:35

wangwu | 1 |

(3 rows)

具体的upsert语法可以参考
https://www.jb51.net/article/203506.htm

5.数据抽样

数据抽样在数据处理方面经常用到,特别是数据量特别大的时候,随机查询一定量的数据操作很场景,9.5版本之前通过order by random()方式实现数据抽样,但是性能很低

mytest=> \timing

Timing is on.

mytest=> select * from test_1 order by random() limit 1;

id | name | create_time

--------+---------------+----------------------------

380165 | 380165_francs | 2022-11-09 12:22:05.822893

(1 row)

Time: 109.904 ms

mytest=> select * from test_1 order by random() limit 1;

id | name | create_time

--------+---------------+----------------------------

229878 | 229878_francs | 2022-11-09 12:22:05.428547

(1 row)

Time: 135.264 ms

mytest=> explain ANALYZE select * from test_1 order by random() limit 1;

QUERY PLAN

---------------------------------------------------------------------------------------------------------------------------

Limit (cost=12417.00..12417.00 rows=1 width=33) (actual time=120.778..120.780 rows=1 loops=1)

-> Sort (cost=12417.00..13667.00 rows=500000 width=33) (actual time=120.776..120.777 rows=1 loops=1)

Sort Key: (random())

Sort Method: top-N heapsort Memory: 25kB

-> Seq Scan on test_1 (cost=0.00..9917.00 rows=500000 width=33) (actual time=0.010..53.321 rows=500000 loops=1)

Planning Time: 0.086 ms

Execution Time: 120.805 ms

(7 rows)

Time: 123.638 ms

通过执行计划已经知道,这种方式进行了全表扫描和排序,效率非常低,当数据量大的时候,性能很低。

9.5版本以后的数据抽样,主要有两种方法:

5.1 system抽样方式

--创建测试表

create table test_sample(

id int4,

message text,

create_time TIMESTAMP(6) WITHOUT TIME ZONE DEFAULT clock_timestamp()

);

insert into test_sample (id ,message)

select n , md5(random()::text) from generate_series(1,1500000) n;

mytest=> create table test_sample(

mytest(> id int4,

mytest(> message text,

mytest(> create_time TIMESTAMP(6) WITHOUT TIME ZONE DEFAULT clock_timestamp()

mytest(> );

CREATE TABLE

mytest=> insert into test_sample (id ,message)

mytest-> select n , md5(random()::text) from generate_series(1,1500000) n;

INSERT 0 1500000

mytest=> select * from test_sample limit 1;

id | message | create_time

----+----------------------------------+----------------------------

1 | a8f7f7c931c82e8dd4e85542c74209db | 2022-11-11 18:53:55.281162

(1 row)

--设置抽样因子为0.01 ,也就是放回count(*)*0.01% = 150条记录

-- select * from test_sample tablesample system(0.01);

--执行计划如下

--explain ANALYZE select * from test_sample tablesample system(0.01);

mytest=> explain ANALYZE select * from test_sample tablesample system(0.01);

QUERY PLAN

-------------------------------------------------------------------------------------------------------------

Sample Scan on test_sample (cost=0.00..5.50 rows=150 width=45) (actual time=0.027..0.151 rows=214 loops=1)

Sampling: system ('0.01'::real)

Planning Time: 0.045 ms

Execution Time: 0.170 ms

(4 rows)

从执行计划可以看出进行了Sample Scan扫描方式,抽样是system,执行计划0.045 ms,性能比较好,实际返回214条数据。

为神马是214条数据?

--查看表占用的数据块数量

--select relname ,relpages from pg_class where relname = 'test_sample';

mytest=> select relname ,relpages from pg_class where relname = 'test_sample';

relname | relpages

-------------+----------

test_sample | 14019

也就是1000000/14019 =214 条记录

mytest=> select count(*) from test_sample tablesample system(0.01);

count

-------

214

(1 row)

mytest=> select count(*) from test_sample tablesample system(0.01);

count

-------

107

(1 row)

mytest=> select count(*) from test_sample tablesample system(0.01);

count

-------

321

(1 row)

mytest=> select count(*) from test_sample tablesample system(0.01);

count

-------

107

(1 row)

5.2 bernoulli

此种方式比上述的方式效率低

--select count(*) from test_sample tablesample bernoulli(0.01);

mytest=> explain analyze select count(*) from test_sample tablesample bernoulli(0.01);

QUERY PLAN

-----------------------------------------------------------------------------------------------------------------------

Aggregate (cost=14020.88..14020.89 rows=1 width=8) (actual time=23.333..23.334 rows=1 loops=1)

-> Sample Scan on test_sample (cost=0.00..14020.50 rows=150 width=0) (actual time=0.224..23.290 rows=162 loops=1)

Sampling: bernoulli ('0.01'::real)

Planning Time: 0.160 ms

Execution Time: 23.373 ms

(5 rows)

6.聚合函数

聚合函数主要有 AVG,SUM,MIN,MAX,COUNT

--创建测试表

create table test_jh (p_name text,c_name text);

mytest=# COPY test_jh from '/file/insert.txt';

COPY 5

mytest=#

select p_name ,string_agg(c_name,',') c_names

from test_jh

GROUP by p_name ;

mytest=# select p_name ,string_agg(c_name,',') c_names

mytest-# from test_jh

mytest-# GROUP by p_name ;

p_name | c_names

--------+----------------

日本 | 东京,大阪

中国 | 台北,香港,上海

(2 rows)

select p_name ,array_agg(c_name) c_names

from test_jh

GROUP by p_name ;

mytest=# select p_name ,array_agg(c_name) c_names

mytest-# from test_jh

mytest-# GROUP by p_name ;

p_name | c_names

--------+------------------

日本 | {东京,大阪}

中国 | {台北,香港,上海}

(2 rows)

7.窗口函数

pg内置的窗口函数有:

row_num(),

rank(),

lag()

还可以接over属性

avg() over()

row_number() over(partition by ... order by ...)

rank() over(partition by ... order by ...)

dense_rank() over(partition by ... order by ...)

lag(字段,+/-行偏移值)over()

first_value(计算字段)over(partition by 分组字段 order by 计算字段 asc/desc) -- 用来取结果集每个分组的第一行数据的字段值

last_value(计算字段)over(partition by 分组字段 order by 计算字段 asc/desc) -- 用来取结果集每个分组的最后一行数据的字段值

nth_value(计算字段,指定行数)over(partition by 分组字段) -- 用来取结果集每个分组的最后一行数据的字段值