import time import threading from isc_video_record.db.redis import redis_connect def test_setnx(): with redis_connect() as pipe: res = pipe.set('b', '1', nx=True, ex=60) time.sleep(2) print('设置结果', res) ths = [] for i in range(100): t = threading.Thread(target=test_setnx, name='thread_{}'.format(i)) t.start() ths.append(t) for t in ths: t.join()