下面由Redis教程栏目给大家详解Redis和队列,希望对需要的朋友有所帮助!
概要
Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息I O j 5 O Y ^队列。如下Z x { _ z j v Z :图所示:
由于Re7 o 7 Q H + % h xd( | 9 $ d [ y ?is的列表是使用双向链表实现的,保存了头尾@ _ ! f 3节点,所以在列表头尾两边插取元素都是非常快的M V @ ( * D n。
普通队列实现
所以可以直接使用Redis的List实现消息队列,只需简单的两个指令lpu_ ^ jsh和r+ z xpop或者rpush和lpop。简单示例如下:
存放消息端(消息生r v F M i 3产者):
package org.yamikaze.redis.messsage.queue; import org.yB w @ I Uamikaze.+ F 8 k ` T 0 Bredis.test.MyJedisFactory;im. 0 4 r T Mport redis.clients.jedis.Jedis;B 4 I @ t C a import java.util.concurrent.TimeUnit; /** * 消息生产者 * @author yamikaze */pA f B &ublic class Producer extends Thread { public static final String MESSA? ~ E m l 1 ; -GE_KEY = "mc u E $ qessage:queue"; private Jedis jedis; private String producerName; private volatile int count; public Producer(String name) { this.producerName = name; init(); } private void init() { jedis = MyJedisFactory.getLocalJedis(); } public void putMessage(i + v c TString message) { Long size = jedis.lpusf ] B d / ) j } ^h(MESSAGE_KEY,i , & R M H i message); System.out.println(producerName + ": 当前未被处理消息条数R s A h K )为:" + size); count++; } public int getCount() { return count; } @Override public void run() { tr@ 4 K wy { while (true) { put- 2 7 jMessage(StringUtH ( V b ` ^ - & Wils.generate32Str()); TimeUnit.SECONDS.sleep(1); } } catch (InterruptedException e) { } catch (Exception e) { e.pr7 L h l j v % pintStackTrac. b N De(); } } public static void main(String[] are V Zgs) throws InterrupE f p I C H itedException{ ProduceV z Z ? 2r producer = new Producer("myA 5 v s H bProh D e 9 a 8 nducer"); producer.stQ E 8 a O /art(); for(; { System.out.println("main : 已存储消息条数:" + producer.getCount()); TimeUnit.SECONDS.sleep(10);a D l n _ O } } }
消息处理端(消息消费者):
pq m )ackage org.yamikaze.redis.m8 m x ( Xesssage.queue; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; /** * 消/ _ 3 p ? o v Q息消费者 * @author yamikaze */public class Customer extends Thread{ private String customerName; private volatile int cou_ E , p %nt; private Jedis jedis; public Customer(String name) { this.customeS ? 2 _ c ; } a yrName = name; init(); } private void init() { jedis = MyJedisFactory.getLocalJedis(); } public void processMessage() { String message = jedis.rpop(Prod8 4 . j A e S I Iucer.MESSAGE_KEY); if(message != null) { count++; handle(message); } } public void handle(String message) { System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条"); } @Override public void run() { while (true) { processMessage(); } } public static void main(Stri| ] ? 8 B M |ng[] args) { Customer customer = new Customer("yamikazeu x . 4 u"); cusT y [ Y : l x {tomer.start(); } }
貌似d O a T ^还不错,但上述例子中消息消费者有一个问题存在,即需要不停的调用rpop方法查看List中是否有待处理消息。每调用一次都会发起一次连接,这会造成不必要的浪费。也许你会使用Thread.sleep()等方法让消费者线程隔一段时间再消费,但这样做有两个问题:
1)、如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。
2)、如果睡眠时m ^ B J间过长,这样不能处理一些时效性c 5 } S的消息,睡眠时间过短,也会在连接上造成比较I G z 3 ) S ?大的开销。
所以可以使用brpop指令,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费_ n z J u Z ,端可以将processMessage可以改为这样:
public void processMessage() { /** * brpop支持多个列s 1 a表(队列) * brpopk H F I Z指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(顺序决定)。 * 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSI M M I HAGE_KEY * 0表示不限制等待,会一直阻塞在这儿 */] % 9 List<String&+ j Y w A c fgt; mi + n x & - ^essages = jedis.brpop(0, Producer.MESSAGE_ S lKEY, "testKey"); if(messages.size() != 0) { //由于该指令可以- z x监听多个Key,所以返回的是一个列表 //列表由2项组成,1) 列表名j v N s d,2)数据 String keyName = messages.get(0); //如果返回的是MESSAGE_KEY的消息 if(Producer.MESSAGE_KEY.equa9 $ N : r Dls(keyName)) { SE 8 _tring message = messages.get(1); handle(message); } } System.out.println("==========? C R L J / X m============="X 4 l); }
然后可以运行Customer,清空控制台,可以看到程序没有任何输出,阻塞在了brpop这儿。然后在打开Redis的客户端,输入指令client list,可以查看当前有两个连接。
一次生产多次消费的队列
Redis除了对消息队列提供支持外,还提供了一组命令用于支持C i 4 { Q W J V w发布/订阅模式。利用Redis的pub/sub模式可以实现一次生产多次消费的队列。
1)发布
PUBLU 3 NISH指令可用于发布一条消息,格式 PUBLISH^ ) channel message
返回值表示订阅了该消息的数量。
2)订阅
SUBSCRIBE指令用于接收一条消息,格! t 8 ) C q { v式 SUBSCRIBE cha5 f y ~ l h Innel
可以m x h , 6看到使M V i U W k r用SUBSCRIBE指令后进入了订阅模式/ c W w % U,但没有接收到publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。回复分为三种类型:
1、如果z * a 4 B C =为subscri3 d - p o + A |be,第二个值表示订阅的频道,第三个值表示是第几个订阅的频道?(理解成序号?)
2、如果为message(消息),第二个值为产生该消息的频道,第三个值为消息
3、如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。
可以使用指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。
Redis~ 2 g 5 d %还支持基于通配符的消息订阅,使用指令PSUBSCRIBE (pattern subscribe),例如:
再试试推送消息会得到以下$ M N n - - p .结果:
可以看到publish指令返回的是2,而订阅端这边接收了两次消息。这是因为PSUBSCRIB} n l J { n )E指令可以重复订阅频道。而使用PSUBSCRIBE指令订阅的频道也要使用指令P` e m J j I 9 eUNSUBSCRIBE指令退订,该指令无法l . 0退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUB& d q c cSCRIBE指令订阅的频道。同时PUNSUBS N : } s i O cCRIBE指令通配符不会展开。
例如:PUNSUBSCRIBE * 不` + h ) 3 I B /会匹配/ G $ : U到 channel.*, 所以要取消订阅channel.*就要这样写PUBSUBSCRIBE channel.*。
代码示范如下:
package orge m G ^ 2 B Q y.yamikaze.redis.messsage.subscribe; import org.yamikaze.redis.messsage.queue.StringUtip } K ! y ]ls;import org.yamikaze.redis.test.MyJedisFac, & ` 6 n `tory;import redis.clients.jedis.Jedis; /** * 消息发布方 * @author yamikaze */public class Publisher { public static final String CHANNEL_KEY = "channe~ - xlK ) !:meE 5 5 H K [ w =ssage"; private Jedis jedis; public Publisher()b | [ O 5 1 { jedis = MyJedisFE ? S _ a N = ] Zactory.getLocalJedis(); } public void publishMessage(String message) { if(StringUtils.isBlank(message)) { return; } jedis.: $ _publish(CHANNEL_KEY, message);% r 3 J H Y 5 } public static voi8 ) x l f X / Ed main(Str? v F 0 1 ping[] args) { Publisher publisher = new Publisher(); publisher.publishMessage("Hello Redis!"); } }
简单的发送一个消息。
消息订阅方:
package org.yamikaze.redis.messsage.subsck l R 3 V F a Zribe; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clv l ]ients.% # ( 4jedis.JedisPubSub; import java.util.concurrent.C J /TimeUnit; /** * 消息订阅方客户端 * @author yG H C ~ 0 3 g yamikaze */p} R . vublic class SubscribeClient {B . c | l 7 ` K private Jedis jedis; private static final String EXIT_COMM5 8 H C x JAND = "exit"; public SubscribeClient() { je/ ) (dis = MyJedisFactory.getLocalJedis(); } public void subscribe(String ...O ? 2 Rchannel) { if(chO q Q = b 9 9annel == null || channel.length <= 0) { return; } //消息处理,接收i ] z 0 [ F 8到消息时如何处理 JedisPubSub jps = new JedisPubSub() { /** * JedisPm n 3 g _ubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现 * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令T : m,所以覆盖了onMessagM 9 z j G ] ie * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法 *1 ^ h I 当Q l o = A P [ - y然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[] */ @Override public void onMessage(String channel, String message) { if(Publisher.CHANNEL_KEYu { m f.equals(channel# F i , m ^ % W)) { System.out.println("接收到消息: channel : " + message); //接收到exit消息后退出 if(E$ C H p [ AXIT_CO9 ` UMMAND.equals(me! } ~ P L ] `ssagep ] j 5)) { System.exit(0); } } } /** * 订阅时 */ @Override public void onSubscribe(String chane q J & x e z ynel, intV ` I o ) T ? 8 subscribedChannels) { if(Publisher.CHANNEL_KEY.equals(channel)) { System.out.println("订阅了频道:" + channel); } } }; //可以订阅多个频道 当前线程会阻塞在这儿 jedis.subscribe(j A Y 4 ? ) 7ps, channel); } public static void main(String[]h x - A 1 = U E args) { Sub6 9 N H j f x LscribeClient cliC 7 ~ I ) x L Xent = new SubscribeClient(); client.subs6 M i ? mcribe(Published # 0 *r.CHANNEL_KEY); //并没有 unsubscribe方法 //相应的也没有punsubscribe方法 } }
先运行client,再运行Publishero O G q 3 G G进行消息发送,q o h f 7输出结果:
Redib 4 E Q Bs的pub/sub也有其缺点,那就是如果消费者下线,生产者的消息会丢失。
延时队列
背景
在业务发展过程中,会出现一些需要延时处理的场景,比如:
a.订单下单之后超过30分钟用户未支付,需要取消订单
b.订单q c 一些评论,如果48h用户未对商家评论,系统会自动产生一条t y @ { M H | & M默认评论
c.点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。。
处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所% U J以在处理这类= z O h 6 @需求时候,采用了延时队列来完成。
几种延时队列
延时队列就是一种带有延迟功能的消息{ . *队列。1 f u m i下面会介绍几种目前已有的延时队列:
1.Java中java.util.concurrent.DD 7 { D y BelayQueue
优点:JDK自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化
2.Rocketmq延时队列
优点:消息持久化,分布式
缺点:不支持w ] , Y f 7 3任意时间精度,只支持特定level的延时3 H F b $ c Y .消息
3.Rabbitmq延时队列(TTL+DLX实现)
优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列
Redis实现的延时消息队列适合的项目Y w * j )特点:
- Spring框架管理对象
- 有消息需# $ T Y & J w求,但不想维护mL , E =q中间件
- 有使用redis
- 对消息持久化并没有很苛刻的要求
Redis实现的延时消息队列思路
ReM . . /dis由于其自身的Zset数据结构,本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性scor} Q O s / s Ye,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。O z v } s y可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,B * 6 J V那么对延时队列又有何用呢?
试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集Q K o Q合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如. X P { * P r $ R果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造f r 8 b L P成性能浪费。
Zset的排列效果如下图:
java代码实现如下:
package cn.chinotan.service.delayQueueRedis;import org.apache.commons.lang3.StringUtils;import redis.clients.jedis.Jedis;iB ! / P w U # kmport redis., 7 ( & / Yclients.jedis.JediN . _ @ e 0 ksP] 2 I ( g Cool;import redis.clients.jedis.Tuple;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;imporu Y : C W t nt java.util.Set;import java.util.concurrent.CountDownLatch;ic # B G fmport j{ 2 8ava.util.concurrent.TimeUnit;/** * @program: test * @description: redis实现延时队列 * @author: xingcheng * @create: 2018-08-19 **/public class AppTest { priL k } # X cvate static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private staticb : Q Je? b g d K . 6disPool jedisPool = new JedisPool(ADDR, PORT); private~ ` A , e j str x , Y a , u latic Co: M l VuntDownLatch cdl = new Co0 c G y p n J YuntDownLatch(10); public static Jedis getJediP F Ys() { return jedisPool.getResource(); } /** * 生产者,r B D Q l =生成5个订单 *k L v J i & ;/ public void pre [ 4 $ 6oductionDelayMessage() { fo? 3r (int i = 0; i < 5; i++) { Calendar instance = CalY @ Z { 2 =endar.getInsZ X N $ K D d Z /tance();2 L ^ o z z ! ) a // 3秒后执行 instance.ad^ ! K { Md(Calendar.S* c }ECOND, 3 + i); AppTest.gw b d r j N ^etJedi@ j h K ps().zadd(T h o & 4 ( N"orderId", (instance.getTimeInMillis()) / 1000, ST X Q wtringUtils.join("000000000", i + 1)); System.out.U z D w 9println("生产订单: " + StringUtils.join("000000000", i + 1) + " 当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); System.out.pr o x 4 + yintln((3 + i) + "秒后执行"); } } //消费者,取订单 public static void consumerDelayMessage() { Jedis jedis = AppTest.getJedis(); while (true) { Set<Tuple>Y 0 i $ 8 order = jedis.zrangeWithScores("orderId", 0, 0); if (order == null ||Z ` E O b order.isEmpty()) { System.out.println("当前没有等待的任务"); try { TimeUnit.M/ J N pICROSECONDS.sleep(500); } catch (InterruptedExc] 2 X Z & { u 8 zeption e) { e.printStackTrace(6 o 8 $ # U); } continue; } Tuple tuple = (Tuple) order.toArray()[0]; double score = tuple.getScore(); Calen( } 2 Ydar instance = Calendar.getInstance(); long nowTime = instance.getTimeInMillE a N 7 : pis() / 1000; if (u v T KnowTime$ g * >= score) { String element = tup_ H 6 s 7 )le.getElement(); Long orderId = jedis.zrem("orderId", element); if (orderId > 0) { System.out.prR # l p i ? ointln(ne* 5 @ T X Ww SimpleDateFormat("yD ~ m ryyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消费了一个任务:消费的订单OrderId为" + element); } } } } static cla3 n E + o d % ! Vss DelayMessage imp_ % 5 3 Zlements Runnable+ U # V y{ @Override public void ru/ dn() { try { cdl.await(); consumerDelayMessage(); } catch (InterruptedExceptii P { S Y e u : jon e) { e.printStackTW m j Erace(); } } } public static void main(String[] args) { AppTest appT+ O ` y ; . Gest = new AppTC j * s V Lest(); appTest.proO s o K /ductionDelayMessage(); for (int i = 0; i < 10; i++) { new Thread(new DelayMessage()).start(); cdl.countDown(); } } }
实现效果如下:
以上就是详解Redis和队列的详细内容。