Redis是一种开源的内存数据结构存储,用作数据库、缓存和消息代理。 它支持多种数据结构,如字符串、哈希、列表、集合、有序集合等。'
Redis 的字符串(String)数据类型是最基本和最常用的数据类型之一。它可以存储各种类型的数据,如文本、数字、二进制数据等。以下是字符串类型的一些常见应用场景及其示例代码。
import redis# 连接到 Redis 服务器r = redis.Redis(host='localhost', port=6379, db=0)user_id = "user:1000"user_info = { "name": "Alice", "email": "alice@example.com", "age": "30"}# 设置用户信息缓存,设置过期时间为 3600 秒(1 小时)r.setex(user_id, 3600, str(user_info))# 获取用户信息cached_user_info = r.get(user_id)if cached_user_info: print(f"缓存的用户信息: {cached_user_info.decode()}")else: print("缓存已过期或不存在")
article_id = "article:123"# 初始化阅读次数r.set(article_id, 0)# 每次读取文章时,增加阅读次数r.incr(article_id)# 获取当前阅读次数read_count = r.get(article_id)print(f"文章 {article_id} 的阅读次数为: {read_count.decode()}")
config_key = "site:config"config = { "site_name": "My Awesome Site", "maintenance_mode": "off"}# 存储配置信息r.set(config_key, str(config))# 获取配置信息site_config = r.get(config_key)if site_config: print(f"网站配置信息: {site_config.decode()}")else: print("配置信息不存在")
import timelock_key = "resource:lock"lock_value = "unique_lock_id"# 获取锁,设置锁的过期时间为 10 秒if r.set(lock_key, lock_value, nx=True, ex=10): print("成功获取锁") try: # 执行需要加锁的操作 print("正在执行操作...") time.sleep(5) # 模拟操作 finally: # 释放锁 if r.get(lock_key) == lock_value: r.delete(lock_key) print("成功释放锁")else: print("获取锁失败,资源正在被使用")
Redis 的哈希(Hash)数据类型是一个键值对集合,适合存储对象及其属性。以下是哈希类型的一些常见应用场景及其示例代码。
import redis# 连接到 Redis 服务器r = redis.Redis(host='localhost', port=6379, db=0)user_id = "user:1000"user_info = { "name": "Alice", "email": "alice@example.com", "age": "30"}# 存储用户信息r.hmset(user_id, user_info)# 获取用户信息retrieved_user_info = r.hgetall(user_id)print("用户信息:")for key, value in retrieved_user_info.items(): print(f"{key.decode()}: {value.decode()}")
product_id = "product:123"product_info = { "name": "Laptop", "price": "999.99", "stock": "50"}# 存储商品属性r.hmset(product_id, product_info)# 获取商品属性retrieved_product_info = r.hgetall(product_id)print("商品属性:")for key, value in retrieved_product_info.items(): print(f"{key.decode()}: {value.decode()}")
session_id = "session:456"session_data = { "user_id": "1000", "login_time": "2023-05-29 10:00:00", "status": "active"}# 存储会话数据r.hmset(session_id, session_data)# 获取会话数据retrieved_session_data = r.hgetall(session_id)print("会话数据:")for key, value in retrieved_session_data.items(): print(f"{key.decode()}: {value.decode()}")
config_key = "site:config"site_config = { "site_name": "My Awesome Site", "maintenance_mode": "off", "max_users": "10000"}# 存储网站配置信息r.hmset(config_key, site_config)# 获取网站配置信息retrieved_site_config = r.hgetall(config_key)print("网站配置信息:")for key, value in retrieved_site_config.items(): print(f"{key.decode()}: {value.decode()}")
以下是使用 hset 方法的示例:
# 存储用户信息for key, value in user_info.items(): r.hset(user_id, key, value)# 存储商品属性for key, value in product_info.items(): r.hset(product_id, key, value)# 存储会话数据for key, value in session_data.items(): r.hset(session_id, key, value)# 存储网站配置信息for key, value in site_config.items(): r.hset(config_key, key, value)
Redis 列表(List)是一个按插入顺序排序的字符串列表,支持从列表的两端推入和弹出元素,并且支持阻塞操作。以下是 Redis 列表的一些常见应用场景及其示例代码。
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)queue_key = 'task_queue'# 推送任务到队列r.rpush(queue_key, 'task1')r.rpush(queue_key, 'task2')r.rpush(queue_key, 'task3')# 从队列中取出任务并处理while True: task = r.lpop(queue_key) if task is None: break print(f'Processing {task.decode()}')
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)chat_key = 'chat:messages'# 存储消息r.rpush(chat_key, 'Hello!')r.rpush(chat_key, 'How are you?')r.rpush(chat_key, 'Goodbye!')# 获取最新的消息(从列表的右侧)latest_messages = r.lrange(chat_key, -3, -1)print('Latest messages:')for message in latest_messages: print(message.decode())
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)todo_list_key = 'todo:list'# 添加待办事项r.rpush(todo_list_key, 'Buy milk')r.rpush(todo_list_key, 'Walk the dog')r.rpush(todo_list_key, 'Read a book')# 获取所有待办事项todo_items = r.lrange(todo_list_key, 0, -1)print('To-Do List:')for item in todo_items: print(f'- {item.decode()}')# 完成第一个待办事项completed_item = r.lpop(todo_list_key)print(f'Completed: {completed_item.decode()}')
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)leaderboard_key = 'game:leaderboard'# 添加玩家分数r.rpush(leaderboard_key, 'Alice:1000')r.rpush(leaderboard_key, 'Bob:1500')r.rpush(leaderboard_key, 'Charlie:1200')# 获取排行榜leaderboard = r.lrange(leaderboard_key, 0, -1)print('Leaderboard:')for rank, player in enumerate(leaderboard, start=1): print(f'{rank}. {player.decode()}')
Redis 集合(Set)是一种无序的字符串集合,集合中的元素是唯一的,不允许重复。集合类型支持多种集合操作,如交集、并集、差集等。以下是 Redis 集合的一些常见应用场景及其示例代码。
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)user_id = 'user:1000'tags_key = f'{user_id}:tags'# 添加标签r.sadd(tags_key, 'music', 'movies', 'sports')# 获取所有标签tags = r.smembers(tags_key)print('User tags:')for tag in tags: print(tag.decode())# 检查是否有某个标签has_music_tag = r.sismember(tags_key, 'music')print(f'User has music tag: {has_music_tag}')
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)unique_ips_key = 'unique:ips'# 添加访问 IPr.sadd(unique_ips_key, '192.168.1.1', '192.168.1.2', '192.168.1.1') # 192.168.1.1 重复# 获取所有唯一 IPunique_ips = r.smembers(unique_ips_key)print('Unique IPs:')for ip in unique_ips: print(ip.decode())
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)user1_friends_key = 'user:1000:friends'user2_friends_key = 'user:2000:friends'# 添加用户好友r.sadd(user1_friends_key, 'friend1', 'friend2', 'friend3')r.sadd(user2_friends_key, 'friend2', 'friend3', 'friend4')# 获取共同好友common_friends = r.sinter(user1_friends_key, user2_friends_key)print('Common friends:')for friend in common_friends: print(friend.decode())
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)role_key = 'role:admin:permissions'# 添加权限r.sadd(role_key, 'read', 'write', 'delete')# 获取所有权限permissions = r.smembers(role_key)print('Admin permissions:')for permission in permissions: print(permission.decode())# 检查是否有某个权限has_delete_permission = r.sismember(role_key, 'delete')print(f'Admin has delete permission: {has_delete_permission}')
Redis 有序集合(Sorted Set)是一种带有分数的集合,集合中的每个成员都有一个分数,Redis 会按照分数的大小进行排序。以下是 Redis 有序集合的一些常见应用场景及其示例代码。
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)leaderboard_key = 'game:leaderboard'# 添加玩家分数r.zadd(leaderboard_key, {'Alice': 1000, 'Bob': 1500, 'Charlie': 1200})# 获取排行榜前 N 名top_players = r.zrevrange(leaderboard_key, 0, 2, withscores=True)print('Top players:')for player, score in top_players: print(f'{player.decode()}: {score}')
import redisimport time# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)delay_queue_key = 'delay:queue'# 添加任务到延时队列,任务的执行时间为当前时间加上延迟时间(秒)delay = 10 # 延迟10秒task_id = 'task1'execution_time = time.time() + delayr.zadd(delay_queue_key, {task_id: execution_time})# 获取并执行到期的任务while True: now = time.time() tasks = r.zrangebyscore(delay_queue_key, 0, now) if tasks: for task in tasks: print(f'Executing {task.decode()}') r.zrem(delay_queue_key, task) time.sleep(1)
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)article_clicks_key = 'articles:clicks'# 添加文章点击率r.zadd(article_clicks_key, {'article1': 100, 'article2': 150, 'article3': 120})# 获取点击率最高的前 N 篇文章top_articles = r.zrevrange(article_clicks_key, 0, 2, withscores=True)print('Top articles:')for article, clicks in top_articles: print(f'{article.decode()}: {clicks}')
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)priority_queue_key = 'priority:queue'# 添加任务到优先级队列,分数代表优先级,分数越高优先级越高r.zadd(priority_queue_key, {'task1': 1, 'task2': 3, 'task3': 2})# 获取并处理优先级最高的任务while True: task = r.zrevrange(priority_queue_key, 0, 0) if task: task_id = task[0] print(f'Processing {task_id.decode()}') r.zrem(priority_queue_key, task_id) else: break
位图(Bitmap)在 Redis 中是一种高效的按位存储数据的方式,主要用于处理需要进行按位操作的场景。以下是位图的一些常见应用场景及其示例代码。
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)user_id = 'user:1000'sign_key = f'{user_id}:sign'# 用户在某天签到,例如第5天day = 5r.setbit(sign_key, day, 1)# 检查用户在某天是否签到is_signed = r.getbit(sign_key, day)print(f'用户在第{day}天是否签到:{"是" if is_signed else "否"}')# 统计用户总签到天数total_sign_days = r.bitcount(sign_key)print(f'用户总共签到天数:{total_sign_days}')
import redisimport time# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)online_key = 'user:online_status'user_id = 1000# 假设每个时间片为10分钟time_slot = int(time.time() // 600)# 设置用户在线状态r.setbit(f'{online_key}:{user_id}', time_slot, 1)# 检查用户在某个时间片是否在线is_online = r.getbit(f'{online_key}:{user_id}', time_slot)print(f'用户在时间片{time_slot}是否在线:{"是" if is_online else "否"}')# 统计用户总在线时间片数total_online_slots = r.bitcount(f'{online_key}:{user_id}')print(f'用户总共在线时间片数:{total_online_slots}')
import redisimport time# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)activity_key = 'user:activity'user_id = 1000# 假设每个时间片为1小时time_slot = int(time.time() // 3600)# 设置用户活动状态r.setbit(f'{activity_key}:{user_id}', time_slot, 1)# 检查用户在某个时间片是否有活动is_active = r.getbit(f'{activity_key}:{user_id}', time_slot)print(f'用户在时间片{time_slot}是否有活动:{"是" if is_active else "否"}')# 统计用户总活动时间片数total_active_slots = r.bitcount(f'{activity_key}:{user_id}')print(f'用户总共活动时间片数:{total_active_slots}')
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)ab_test_key = 'ab_test:group'user_id = 1000# 将用户分配到组A(0)或组B(1)import randomgroup = random.choice([0, 1])r.setbit(ab_test_key, user_id, group)# 检查用户所属的组user_group = r.getbit(ab_test_key, user_id)print(f'用户 {user_id} 属于组 {"A" if user_group == 0 else "B"}')
HyperLogLog 是 Redis 中的一种用于基数(唯一值)统计的数据结构,能够在极小的内存占用下提供高精度的基数估计。它特别适用于需要快速计算大量数据集唯一元素数量的场景。
以下示例展示了如何使用 Redis 的 HyperLogLog 进行基数统计。
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)uv_key = 'website:uv'# 模拟添加每日访客daily_visitors = ['user1', 'user2', 'user3', 'user4', 'user5']for visitor in daily_visitors: r.pfadd(uv_key, visitor)# 获取独立访客数量uv_count = r.pfcount(uv_key)print(f'独立访客数量:{uv_count}')
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)unique_events_key = 'events:unique'# 模拟添加实时事件events = ['event1', 'event2', 'event3', 'event4', 'event1']for event in events: r.pfadd(unique_events_key, event)# 获取唯一事件数量unique_events_count = r.pfcount(unique_events_key)print(f'唯一事件数量:{unique_events_count}')
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)user_id = 'user:1000'friends_key = f'{user_id}:friends'# 模拟添加好友friends = ['friend1', 'friend2', 'friend3', 'friend4', 'friend5', 'friend1']for friend in friends: r.pfadd(friends_key, friend)# 获取唯一好友数量unique_friends_count = r.pfcount(friends_key)print(f'用户 {user_id} 的唯一好友数量:{unique_friends_count}')
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)ad_key = 'ad:display:unique'# 模拟添加广告展示displays = ['user1', 'user2', 'user3', 'user4', 'user2']for display in displays: r.pfadd(ad_key, display)# 获取广告的独立展示数量unique_displays_count = r.pfcount(ad_key)print(f'广告的独立展示数量:{unique_displays_count}')
Redis 的地理空间(Geospatial)数据类型允许存储地理位置信息,并且可以执行各种地理空间操作,如半径查询和位置距离计算。以下是地理空间的一些常见应用场景及其示例代码。
以下示例展示了如何使用 Redis 的地理空间数据类型进行地理位置存储、附近地点搜索和距离计算。
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)geo_key = 'locations'# 添加地理位置信息r.geoadd(geo_key, 13.361389, 38.115556, 'Palermo')r.geoadd(geo_key, 15.087269, 37.502669, 'Catania')# 获取地理位置信息locations = r.geopos(geo_key, 'Palermo', 'Catania')print('Locations:')for loc in locations: print(loc)
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)geo_key = 'locations'# 添加地理位置信息r.geoadd(geo_key, 13.361389, 38.115556, 'Palermo')r.geoadd(geo_key, 15.087269, 37.502669, 'Catania')r.geoadd(geo_key, 13.583333, 37.316667, 'Agrigento')# 查找某个位置附近的地点nearby_locations = r.georadius(geo_key, 15.0, 37.5, 100, unit='km')print('Nearby locations within 100 km:')for loc in nearby_locations: print(loc.decode())
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)geo_key = 'locations'# 添加地理位置信息r.geoadd(geo_key, 13.361389, 38.115556, 'Palermo')r.geoadd(geo_key, 15.087269, 37.502669, 'Catania')# 计算两个位置之间的距离distance = r.geodist(geo_key, 'Palermo', 'Catania', unit='km')print(f'Distance between Palermo and Catania: {distance} km')
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)geo_key = 'locations'# 添加地理位置信息r.geoadd(geo_key, 13.361389, 38.115556, 'Palermo')r.geoadd(geo_key, 15.087269, 37.502669, 'Catania')# 检查某个位置是否在指定半径内radius = 100 # 单位:千米center_location = (15.0, 37.5)locations_within_radius = r.georadius(geo_key, center_location[0], center_location[1], radius, unit='km')# 检查某个位置是否在结果列表中target_location = 'Catania'is_within_radius = target_location.encode() in locations_within_radiusprint(f'{target_location} 是否在 {radius} km 半径内: {"是" if is_within_radius else "否"}')
Redis 的流(Streams)数据类型是一种高效的日志和消息存储数据结构,适用于实时数据处理。以下是流的一些常见应用场景及其示例代码。
以下示例展示了如何使用 Redis 的流数据类型进行数据添加、读取和消费。
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)stream_key = 'mystream'# 添加日志消息到流r.xadd(stream_key, {'event': 'login', 'user': 'alice'})r.xadd(stream_key, {'event': 'purchase', 'user': 'bob', 'amount': '100'})r.xadd(stream_key, {'event': 'logout', 'user': 'alice'})print('日志消息已添加到流中。')
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)stream_key = 'mystream'# 读取流中的消息entries = r.xrange(stream_key, count=10)print('读取流中的消息:')for entry in entries: entry_id, fields = entry print(f'ID: {entry_id}') for field, value in fields.items(): print(f' {field.decode()}: {value.decode()}')
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)stream_key = 'mystream'group_name = 'mygroup'consumer_name = 'consumer1'# 创建消费者组try: r.xgroup_create(stream_key, group_name, id='0', mkstream=True)except redis.exceptions.ResponseError as e: if 'BUSYGROUP Consumer Group name already exists' not in str(e): raise# 消费流中的消息entries = r.xreadgroup(group_name, consumer_name, {stream_key: '>'}, count=10)print('消费流中的消息:')for stream, messages in entries: for message in messages: message_id, fields = message print(f'ID: {message_id}') for field, value in fields.items(): print(f' {field.decode()}: {value.decode()}')# 确认消息处理完毕for stream, messages in entries: for message in messages: message_id, _ = message r.xack(stream_key, group_name, message_id)
import redis# 连接到本地的 Redis 服务r = redis.Redis(host='localhost', port=6379, db=0)stream_key = 'mystream'group_name = 'mygroup'consumer_name = 'consumer1'# 获取未确认的消息pending_entries = r.xpending(stream_key, group_name)print('未确认的消息:')for entry in pending_entries['consumers']: print(f'消费者: {entry["name"]}, 未确认的消息数: {entry["pending"]}')# 读取并处理未确认的消息entries = r.xreadgroup(group_name, consumer_name, {stream_key: '0'}, count=10)print('处理未确认的消息:')for stream, messages in entries: for message in messages: message_id, fields = message print(f'ID: {message_id}') for field, value in fields.items(): print(f' {field.decode()}: {value.decode()}') # 确认消息处理完毕 r.xack(stream_key, group_name, message_id)