Redis+threading实现多线程消息队列的使用示例
列表
lpush左插入、rpush右插入、lrange查询集合
127.0.0.1:6379> lpush list v1 (integer) 1 127.0.0.1:6379> lpush list v2 (integer) 2 127.0.0.1:6379> lpush list v3 (integer) 3 127.0.0.1:6379> LRANGE list 0 -1 1) "v3" 2) "v2" 3) "v1" 127.0.0.1:6379> LRANGE list 0 1 1) "v3" 2) "v2" 127.0.0.1:6379> LRANGE list 0 0 1) "v3" 127.0.0.1:6379> rpush list rv0 (integer) 4 127.0.0.1:6379> lrange list 0 -1 1) "v3" 2) "v2" 3) "v1" 4) "rv0"
lpop左移除、rpop右移除
127.0.0.1:6379> lrange list 0 -1 1) "v3" 2) "v2" 3) "v1" 4) "rv0" 127.0.0.1:6379> lpop list "v3" 127.0.0.1:6379> lrange list 0 -1 1) "v2" 2) "v1" 3) "rv0" 127.0.0.1:6379> rpop list "rv0" 127.0.0.1:6379> lrange list 0 -1 1) "v2" 2) "v1"
lindex下标查询、llen长度查询
127.0.0.1:6379> lrange list 0 -1 1) "v4" 2) "v3" 3) "v2" 4) "v1" 127.0.0.1:6379> lindex list 1 "v3" 127.0.0.1:6379> lindex list 0 "v4" 127.0.0.1:6379> llen list (integer) 4
blpop、brpop
BRPOP 是 Redis 的一个阻塞式列表弹出命令,用于从指定的一个或多个列表中弹出最后一个元素。它和BLPOP 不同之处在于它是从列表的尾部弹出元素,而不是从头部。
这种阻塞式弹出操作通常用于实现消息队列。如果列表为空,就会阻塞等待直到有消息可供处理。timeout: 阻塞超时时间,如果所有指定的列表都为空,命令将阻塞直到有元素可弹出或超时。
# 连接到本地 Redis 服务器 r = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True) a = ['item1','item2','item3','item4','item5','item6'] # 将元素推入列表 r.rpush('my_queue',*a) # 使用 blpop 弹出元素 result = r.blpop('my_queue', timeout=5) # 设置超时时间为 5 秒
['item1', 'item2', 'item3', 'item4', 'item5', 'item6']
['item2', 'item3', 'item4', 'item5', 'item6']
字符串
set、incr递增、decr递减
- 虽然输入是int类型,但是set会自动转换为string
r.set('my_queue', 5) # 对 key 为 'my_queue' 的值执行递减操作 value = r.decr('my_queue') # 获取递减后的值 print(f'New value: {value}') new_value = r.incr('my_queue') print(f'New value: {new_value}')
New value: 4
New value: 5
可以看到就算是赋值也已经改变了my_queue键的值。
keys取键、get取值、delete
# 将元素推入列表 r.setnx('my_queue:ddd:count',123) r.setnx('my_queue:aaa:count',456) r.setnx('my_queue',789) r.setnx('my_queue',101) r.set('my_queue:kkk:count',159) print(r.keys('*')) keys = r.keys('my_queue:*:count') # 打印匹配的键列表 print(keys) value = r.get(keys[0]) print(value) r.delete('my_queue:ddd:count') print(r.keys('*'))
['my_queue:kkk:count', 'my_queue', 'my_queue:aaa:count', 'my_queue:ddd:count']
['my_queue:kkk:count', 'my_queue:aaa:count', 'my_queue:ddd:count']
159
['my_queue:kkk:count', 'my_queue', 'my_queue:aaa:count']
关于为什么delete了还能取到值(
delete只是删除了redis队列的键值对,keys是已经赋过值了所以不受影响。
r.setnx(f'rtp_task:{1}:{123}:count',123) r.setnx(f'rtp_task:{2}:{456}:count',456) r.setnx(f'rtp_task:{3}:{789}:count',789) r.setnx(f'rtp_task:{4}:{101}:count',101) r.setnx(f'rtp_task:{5}:{159}:count',159) keys = r.keys('rtp_task:*:count') r.delete('rtp_task:1:123:count') print(r.keys("*")) print(keys)
['rtp_task:4:101:count', 'rtp_task:2:456:count', 'rtp_task:3:789:count', 'rtp_task:5:159:count']
['rtp_task:4:101:count', 'rtp_task:2:456:count', 'rtp_task:3:789:count', 'rtp_task:1:123:count', 'rtp_task:5:159:count']
setnx
含义(setnx = SET if Not eXists):如果不存在,则set。
r.setnx('my_queue:ddd:count',123) print(r.get('my_queue:ddd:count')) 123
r.setnx('my_queue:ddd:count',123) r.setnx('my_queue:ddd:count',456) print(r.get('my_queue:ddd:count')) 123
threading
Thread
创建线程
在创建线程时,传递参数需要是一个可迭代的对象,如果只有一个参数,需要在参数后面添加逗号,以表示它是一个元组而不是一个单一的值。
如果写成 args=(a,),它会被解释为一个包含单一元素的元组,而如果你写成 args=(a),它会被解释为 args=a,这样就不再是一个元组。
a = "this is message" def iptest(message): print(message) t = threading.Thread(target=iptest, args=(a,))
start、join
a = "this is message" def iptest(message): print(message) t = threading.Thread(target=iptest, args=(a,)) t.start()
this is message
join方法的作用是确保thread子线程执行完毕后才能执行下一个线程。
没加join前:
def iptest(): print("message1\n") for i in range(10): # time.sleep() 函数推迟调用线程的运行,可通过参数secs指秒数,表示进程挂起的时间。 time.sleep(0.1) print('message2') def main(): add_thread = threading.Thread(target=iptest, name="T2") add_thread.start() print("done") if __name__ == '__main__': main()
message1
donemessage2
加join后
message1
message2
done
消息队列
import redis import threading import time import json # 连接到本地 Redis 服务器 r = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True) def producer(queue_name): # 生产者线程,模拟向队列中推送任务 for i in range(5): message = {'task_id': i, 'data': f'Task {i}'} r.rpush(queue_name, json.dumps(message)) time.sleep(1) def consumer(queue_name, worker_id): while True: # 消费者线程,使用 blpop 从队列中阻塞获取任务 message = r.blpop(queue_name, timeout=10) if message: task = json.loads(message[1]) print(f"Worker {worker_id} processing task: {task} \n") # 模拟任务处理时间 time.sleep(2) # 模拟任务处理完成后,更新任务计数 r.decr(f'rtp_task:{task["task_id"]}:count') num_task = r.get(f'rtp_task:{task["task_id"]}:count') print(f'taskid{task["task_id"]},num_task{num_task}') if __name__ == '__main__': # 设置初始任务计数 for i in range(5): r.setnx(f'rtp_task:{i}:count', 3) # 创建一个生产者线程 producer_thread = threading.Thread(target=producer, args=('product',)) producer_thread.start() # 创建多个消费者线程 num_consumers = 3 consumer_threads = [] for i in range(num_consumers): consumer_thread = threading.Thread(target=consumer, args=('product', i + 1)) consumer_threads.append(consumer_thread) consumer_thread.start() # 等待生产者线程和消费者线程完成 producer_thread.join() for consumer_thread in consumer_threads: consumer_thread.join()
Worker 2 processing task: {'task_id': 0, 'data': 'Task 0'}
Worker 1 processing task: {'task_id': 1, 'data': 'Task 1'}
Worker 3 processing task: {'task_id': 2, 'data': 'Task 2'}
taskid0,num_task2
taskid1,num_task2
Worker 2 processing task: {'task_id': 3, 'data': 'Task 3'}taskid2,num_task2
Worker 1 processing task: {'task_id': 4, 'data': 'Task 4'}taskid3,num_task2
taskid4,num_task2
到此这篇关于Redis+threading实现多线程消息队列的使用示例的文章就介绍到这了,更多相关Redis threading多线程消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
详解redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作
这篇文章主要介绍了详解redis大幅性能提升之使用管道(PipeLine)和批量(Batch)操作 ,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。2016-12-12
最新评论