1.with 查询
2.批量插入
3.returning 返回修改数据
4.upsert
5.数据抽样
6.聚合函数
7.窗口函数
with是pg支持的高级sql特性,简称为CTE,with查询在复杂查询中定义一个辅助语句(可以理解为定义临时表),常用语复杂查询或者递归查询场景
mytest=> with cte as (
mytest(> select generate_series(1,10)
mytest(> )
mytest-> select * from cte ;
generate_series
-----------------
1
2
3
4
5
6
7
8
9
10
(10 rows)
-- 创建测试表
create table BONUS
(
ename VARCHAR(10),
job VARCHAR(9),
sal int4,
comm int4
);
create table DEPT
(
deptno int4 not null,
dname VARCHAR(14),
loc VARCHAR(13)
);
create table EMP
(
empno int4 not null,
ename VARCHAR(10),
job VARCHAR(9),
mgr int4,
hiredate DATE,
sal int4,
comm int4,
deptno int4
);
create table SALGRADE
(
grade int4,
losal int4,
hisal int4
);
-- 测试数据
{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}
{"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}
{"deptno":30,"dname":"SALES","loc":"CHICAGO"}
{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}
{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":"1980-12-17","sal":800,"comm":null,"deptno":20}
{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-20","sal":1600,"comm":300,"deptno":30}
{"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-22","sal":1250,"comm":500,"deptno":30}
{"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":"1981-04-02","sal":2975,"comm":null,"deptno":20}
{"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-28","sal":1250,"comm":1400,"deptno":30}
{"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":"1981-05-01","sal":2850,"comm":null,"deptno":30}
{"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":"1981-06-09","sal":2450,"comm":null,"deptno":10}
{"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":"1987-04-19","sal":3000,"comm":null,"deptno":20}
{"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":null,"hiredate":"1981-11-17","sal":5000,"comm":null,"deptno":10}
{"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-08","sal":1500,"comm":0,"deptno":30}
{"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":"1987-05-23","sal":1100,"comm":null,"deptno":20}
{"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":"1981-12-03","sal":950,"comm":null,"deptno":30}
{"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":"1981-12-03","sal":3000,"comm":null,"deptno":20}
{"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7782,"hiredate":"1982-01-23","sal":1300,"comm":null,"deptno":10}
{"grade":1,"losal":700,"hisal":1200}
{"grade":2,"losal":1201,"hisal":1400}
{"grade":3,"losal":1401,"hisal":2000}
{"grade":4,"losal":2001,"hisal":3000}
{"grade":5,"losal":3001,"hisal":9999}
with cte1 as (
select deptno,sum(sal) from emp
group by deptno
),
cte2 as (
select deptno,dname from dept
)
select * from cte1
left join cte2 on cte1.deptno= cte2.deptno
;
with查询的一个重要属性是recursive ,使用recursive属性可以引用自己的输出,从而实现递归,一般用于层次结构或者树状结构
with recursive t (x) as (
select 1
union
select x + 1 from t where x < 10
)
select sum(x) from t;
mytest=# with recursive t (x) as (
mytest(# select 1
mytest(# union
mytest(# select x + 1 from t where x < 10
mytest(# )
mytest-#
mytest-# select sum(x) from t;
sum
-----
55
(1 row)
说明:x从1开始,union加1 后的值,循环知道x < 10 结束,然后x值求和
-- 创建测试表
create table test_area(id int4,name varchar(32),pid int4);
--测试数据
[root@dongjj-pc-3 file]# cat insert.txt
1 中国 0
2 辽宁 1
3 山东 1
4 沈阳 2
5 大连 2
6 济南 3
7 和平区 4
8 沈河区 4
--导入册数数据
mytest=# COPY test_area from '/file/insert.txt';
COPY 8
mytest=# select * from test_area;
id | name | pid
----+--------+-----
1 | 中国 | 0
2 | 辽宁 | 1
3 | 山东 | 1
4 | 沈阳 | 2
5 | 大连 | 2
6 | 济南 | 3
7 | 和平区 | 4
8 | 沈河区 | 4
(8 rows)
-- 查询出所有的结构树
with recursive r as (
select * from test_area
union all
select t.* from test_area t ,r where t.id = r.pid
)
select * from r
order by id;
mytest=# with recursive r as (
mytest(# select * from test_area
mytest(# union all
mytest(# select t.* from test_area t ,r where t.id = r.pid
mytest(# )
mytest-# select * from r
mytest-# order by id;
id | name | pid
----+--------+-----
1 | 中国 | 0
1 | 中国 | 0
1 | 中国 | 0
1 | 中国 | 0
1 | 中国 | 0
1 | 中国 | 0
1 | 中国 | 0
1 | 中国 | 0
2 | 辽宁 | 1
2 | 辽宁 | 1
2 | 辽宁 | 1
2 | 辽宁 | 1
2 | 辽宁 | 1
3 | 山东 | 1
3 | 山东 | 1
4 | 沈阳 | 2
4 | 沈阳 | 2
4 | 沈阳 | 2
5 | 大连 | 2
6 | 济南 | 3
7 | 和平区 | 4
8 | 沈河区 | 4
(22 rows)
--查询辽宁的结构树
mytest=# with recursive r as (
mytest(# select * from test_area where id = 7
mytest(# union all
mytest(# select t.* from test_area t ,r where t.id = r.pid
mytest(# )
mytest-# select * from r
mytest-# order by id;
id | name | pid
----+--------+-----
1 | 中国 | 0
2 | 辽宁 | 1
4 | 沈阳 | 2
7 | 和平区 | 4
(4 rows)
with recursive r as (
select * from test_area where id = 7
union all
select t.* from test_area t ,r where t.id = r.pid
)
select string_agg(name,'') from (select name from r order by id) n;
mytest=# with recursive r as (
mytest(# select * from test_area where id = 7
mytest(# union all
mytest(# select t.* from test_area t ,r where t.id = r.pid
mytest(# )
mytest-# select string_agg(name,'') from (select name from r order by id) n;
string_agg
--------------------
中国辽宁沈阳和平区
查找当前节点及当前节点的父节点,也可以查询当前节点及当前节点的子节点,只需改下where 条件
with recursive r as (
select * from test_area where id = 4
union all
select t.* from test_area t ,r where t.pid = r.id
)
select * from r
order by id;
mytest=# with recursive r as (
mytest(# select * from test_area where id = 4
mytest(# union all
mytest(# select t.* from test_area t ,r where t.pid = r.id
mytest(# )
mytest-# select * from r
mytest-# order by id;
id | name | pid
----+--------+-----
4 | 沈阳 | 2
7 | 和平区 | 4
8 | 沈河区 | 4
(3 rows)
可以简化SQL代码,减少SQL嵌套层数,提高SQL代码可读性
cte语句,已经预处理,减少计算次数,在主查询中可以多次使用
批量插入就是一次性插入多条数据,主要用于提高插入的效率
--创建测试表
create table test_betch1 (id int4,name varchar(20));
mytest=# select count(*) from test_1;
count
--------
500000
(1 row)
mytest=# select * from test_1 limit 10;;
id | name | create_time
----+-----------+----------------------------
1 | 1_francs | 2022-11-09 12:22:04.733582
2 | 2_francs | 2022-11-09 12:22:04.73376
3 | 3_francs | 2022-11-09 12:22:04.733765
4 | 4_francs | 2022-11-09 12:22:04.733767
5 | 5_francs | 2022-11-09 12:22:04.733769
6 | 6_francs | 2022-11-09 12:22:04.73377
7 | 7_francs | 2022-11-09 12:22:04.733772
8 | 8_francs | 2022-11-09 12:22:04.733773
9 | 9_francs | 2022-11-09 12:22:04.733775
10 | 10_francs | 2022-11-09 12:22:04.733776
(10 rows)
mytest=# insert into test_betch1(id ,name)
mytest-# select id ,name from test_1 limit 10000;
INSERT 0 10000
mytest=# select * from test_betch1 limit 10;
id | name
----+-----------
1 | 1_francs
2 | 2_francs
3 | 3_francs
4 | 4_francs
5 | 5_francs
6 | 6_francs
7 | 7_francs
8 | 8_francs
9 | 9_francs
10 | 10_francs
(10 rows)
--复制表
mytest=# create table test_betch2 as select * from test_betch1 where 1=2;
SELECT 0
--插入数据
insert into test_betch2 (id,name) values (1,'a'),(2,'b'),(3,'c'),(4,'d');
mytest=# insert into test_betch2 (id,name) values (1,'a'),(2,'b'),(3,'c'),(4,'d');
INSERT 0 4
mytest=# select * from test_betch2;
id | name
----+------
1 | a
2 | b
3 | c
4 | d
(4 rows)
此方式已经讲过,可以参考
https://blog.csdn.net/u010438126/article/details/127772311
pg的returning特性可以返回DML修改的数据。
有以下的场景:
--创建测试表
mytest=# create table test_returning as select * from test_betch1 where 1=2;
SELECT 0
mytest=# insert into test_returning(id,name) values (1,'a') returning *;
id | name
----+------
1 | a
(1 row)
--可以指定返回的字段,只要把* 改为指定的字段就行,* 表是返回所有的字段
mytest=# insert into test_returning(id,name) values (2,'b') returning id;
id
----
2
(1 row)
mytest=# update test_returning set name = 'c' where id =1 returning *;
id | name
----+------
1 | c
(1 row)
mytest=# update test_returning set name = 'd' where id =2 returning name;
name
------
d
(1 row)
mytest=# delete from test_returning where id =1 returning *;
id | name
----+------
1 | c
(1 row)
mytest=# delete from test_returning where id =2 returning name;
name
------
d
(1 row)
pg的upsert特性是指insert ... on conflict update ,用来解决在数据插入过程中发生数据冲突的情况。
比如在发生违反用户自定义约束,在日志数据应用场景中,通常会在事物中批量插入日志数据,如果有一条数据违反约束,会导致整个事物失败,反生回滚,则upsert特性解决此类问题。
--创建测试表
create table user_logins(
user_name text primary key ,
login_cnt int4,
last_login_time TIMESTAMP(0) WITHOUT TIME ZONE
);
insert into user_logins (user_name,login_cnt) VALUES ('zhangsan',1);
因为在user_name上有主键,所以插入重复数据会报错
insert into user_logins (user_name,login_cnt) VALUES ('lisi',1),('zhangsan',2);
mytest=> insert into user_logins (user_name,login_cnt) VALUES ('lisi',1),('zhangsan',2);
ERROR: duplicate key value violates unique constraint "user_logins_pkey"
DETAIL: Key (user_name)=(zhangsan) already exists.
所以为避免此类情况,使用upsert特性处理冲突数据,同事更新冲突数据
insert into user_logins (user_name,login_cnt) VALUES ('lisi',1),('zhangsan',2)
on conflict(user_name)
do update set
login_cnt=user_logins.login_cnt+excluded.login_cnt,last_login_time=now();
mytest=> select * from user_logins;
user_name | login_cnt | last_login_time
-----------+-----------+-----------------
zhangsan | 1 |
mytest=> insert into user_logins (user_name,login_cnt) VALUES ('lisi',1),('zhangsan',2)
mytest-> on conflict(user_name)
mytest-> do update set
mytest-> login_cnt=user_logins.login_cnt+excluded.login_cnt,last_login_time=now();
INSERT 0 2
mytest=> select * from user_logins;
user_name | login_cnt | last_login_time
-----------+-----------+---------------------
lisi | 1 |
zhangsan | 3 | 2022-11-11 18:30:35
(2 rows)
也可以指定当发生数据冲突后什么都不做,只需要指定do nothing 就行了
insert into user_logins (user_name,login_cnt) values ('wangwu',1),('zhangsan',1)
on conflict(user_name) do nothing;
mytest=> insert into user_logins (user_name,login_cnt) values ('wangwu',1),('zhangsan',1)
mytest-> on conflict(user_name) do nothing;
INSERT 0 1
mytest=> select * from user_logins;
user_name | login_cnt | last_login_time
-----------+-----------+---------------------
lisi | 1 |
zhangsan | 3 | 2022-11-11 18:30:35
wangwu | 1 |
(3 rows)
具体的upsert语法可以参考
https://www.jb51.net/article/203506.htm
数据抽样在数据处理方面经常用到,特别是数据量特别大的时候,随机查询一定量的数据操作很场景,9.5版本之前通过order by random()方式实现数据抽样,但是性能很低
mytest=> \timing
Timing is on.
mytest=> select * from test_1 order by random() limit 1;
id | name | create_time
--------+---------------+----------------------------
380165 | 380165_francs | 2022-11-09 12:22:05.822893
(1 row)
Time: 109.904 ms
mytest=> select * from test_1 order by random() limit 1;
id | name | create_time
--------+---------------+----------------------------
229878 | 229878_francs | 2022-11-09 12:22:05.428547
(1 row)
Time: 135.264 ms
mytest=> explain ANALYZE select * from test_1 order by random() limit 1;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------
Limit (cost=12417.00..12417.00 rows=1 width=33) (actual time=120.778..120.780 rows=1 loops=1)
-> Sort (cost=12417.00..13667.00 rows=500000 width=33) (actual time=120.776..120.777 rows=1 loops=1)
Sort Key: (random())
Sort Method: top-N heapsort Memory: 25kB
-> Seq Scan on test_1 (cost=0.00..9917.00 rows=500000 width=33) (actual time=0.010..53.321 rows=500000 loops=1)
Planning Time: 0.086 ms
Execution Time: 120.805 ms
(7 rows)
Time: 123.638 ms
通过执行计划已经知道,这种方式进行了全表扫描和排序,效率非常低,当数据量大的时候,性能很低。
9.5版本以后的数据抽样,主要有两种方法:
--创建测试表
create table test_sample(
id int4,
message text,
create_time TIMESTAMP(6) WITHOUT TIME ZONE DEFAULT clock_timestamp()
);
insert into test_sample (id ,message)
select n , md5(random()::text) from generate_series(1,1500000) n;
mytest=> create table test_sample(
mytest(> id int4,
mytest(> message text,
mytest(> create_time TIMESTAMP(6) WITHOUT TIME ZONE DEFAULT clock_timestamp()
mytest(> );
CREATE TABLE
mytest=> insert into test_sample (id ,message)
mytest-> select n , md5(random()::text) from generate_series(1,1500000) n;
INSERT 0 1500000
mytest=> select * from test_sample limit 1;
id | message | create_time
----+----------------------------------+----------------------------
1 | a8f7f7c931c82e8dd4e85542c74209db | 2022-11-11 18:53:55.281162
(1 row)
--设置抽样因子为0.01 ,也就是放回count(*)*0.01% = 150条记录
-- select * from test_sample tablesample system(0.01);
--执行计划如下
--explain ANALYZE select * from test_sample tablesample system(0.01);
mytest=> explain ANALYZE select * from test_sample tablesample system(0.01);
QUERY PLAN
-------------------------------------------------------------------------------------------------------------
Sample Scan on test_sample (cost=0.00..5.50 rows=150 width=45) (actual time=0.027..0.151 rows=214 loops=1)
Sampling: system ('0.01'::real)
Planning Time: 0.045 ms
Execution Time: 0.170 ms
(4 rows)
从执行计划可以看出进行了Sample Scan扫描方式,抽样是system,执行计划0.045 ms,性能比较好,实际返回214条数据。
为神马是214条数据?
--查看表占用的数据块数量
--select relname ,relpages from pg_class where relname = 'test_sample';
mytest=> select relname ,relpages from pg_class where relname = 'test_sample';
relname | relpages
-------------+----------
test_sample | 14019
也就是1000000/14019 =214 条记录
mytest=> select count(*) from test_sample tablesample system(0.01);
count
-------
214
(1 row)
mytest=> select count(*) from test_sample tablesample system(0.01);
count
-------
107
(1 row)
mytest=> select count(*) from test_sample tablesample system(0.01);
count
-------
321
(1 row)
mytest=> select count(*) from test_sample tablesample system(0.01);
count
-------
107
(1 row)
此种方式比上述的方式效率低
--select count(*) from test_sample tablesample bernoulli(0.01);
mytest=> explain analyze select count(*) from test_sample tablesample bernoulli(0.01);
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Aggregate (cost=14020.88..14020.89 rows=1 width=8) (actual time=23.333..23.334 rows=1 loops=1)
-> Sample Scan on test_sample (cost=0.00..14020.50 rows=150 width=0) (actual time=0.224..23.290 rows=162 loops=1)
Sampling: bernoulli ('0.01'::real)
Planning Time: 0.160 ms
Execution Time: 23.373 ms
(5 rows)
聚合函数主要有 AVG,SUM,MIN,MAX,COUNT
--创建测试表
create table test_jh (p_name text,c_name text);
mytest=# COPY test_jh from '/file/insert.txt';
COPY 5
mytest=#
select p_name ,string_agg(c_name,',') c_names
from test_jh
GROUP by p_name ;
mytest=# select p_name ,string_agg(c_name,',') c_names
mytest-# from test_jh
mytest-# GROUP by p_name ;
p_name | c_names
--------+----------------
日本 | 东京,大阪
中国 | 台北,香港,上海
(2 rows)
select p_name ,array_agg(c_name) c_names
from test_jh
GROUP by p_name ;
mytest=# select p_name ,array_agg(c_name) c_names
mytest-# from test_jh
mytest-# GROUP by p_name ;
p_name | c_names
--------+------------------
日本 | {东京,大阪}
中国 | {台北,香港,上海}
(2 rows)
pg内置的窗口函数有:
row_num(),
rank(),
lag()
还可以接over属性
avg() over()
row_number() over(partition by ... order by ...)
rank() over(partition by ... order by ...)
dense_rank() over(partition by ... order by ...)
lag(字段,+/-行偏移值)over()
first_value(计算字段)over(partition by 分组字段 order by 计算字段 asc/desc) -- 用来取结果集每个分组的第一行数据的字段值
last_value(计算字段)over(partition by 分组字段 order by 计算字段 asc/desc) -- 用来取结果集每个分组的最后一行数据的字段值
nth_value(计算字段,指定行数)over(partition by 分组字段) -- 用来取结果集每个分组的最后一行数据的字段值