手撸代码,Redis发布订阅机制实现

手撸代码,Redis发布订阅机制实现

🍁 作者:知识浅谈,阿里云技术博主,CSDN签约讲师,后端领域优质创作者,热爱分享创作

💒 公众号:知识浅谈

📌 擅长领域:全栈工程师、爬虫、ACM算法

🔥 联系方式vx:zsqtcc

手撸代码,Redis发布订阅机制实现总结

🤞这次都给他拿下🤞

正菜来了⛳⛳⛳

🎈订阅频道

订阅某个topic,当对应的topic有消息的时候可以接收到对应的消息。

手撸代码,Redis发布订阅机制实现

🍮订阅命令

subscribe命令订阅频道

C:\Users\93676\Desktop>redis-cli.exe -h 82.156.53.229 -p 6379
82.157.53.229:6379> subscribe chat
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "chat"
3) (integer) 1

🎈发布消息

当新消息产生的时候,可以送到给多个客户端。

手撸代码,Redis发布订阅机制实现

🍮发布命令

subscribe命令订阅频道

[root@VM-24-2-centos ~]# redis-cli
127.0.0.1:6379> publish chat "asdaasd"
(integer) 1    #这个示有一个订阅端接收到
127.0.0.1:6379>

🎈Redisson代码实现

新建一个springboot项目

📐第 1 步:xml配置文件

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.17.0</version>
</dependency>

📐第 2 步 :application配置文件

spring:
  redis:
    database: 0
    host: 82.156.53.229
    port: 6379
#    password: 因为我没设置密码所以注释掉

📐第 3 步:订阅端代码🏹

@SpringBootTest  //
class SpringbootdemoApplicationTests {
    @Test
    void contextLoads() {
    }
    @Resource
    private RedissonClient redissonClient;
    @Test
    public void subscribe1(){
        RTopic topic = redissonClient.getTopic("chat");
        List<String> channelNames = topic.getChannelNames();
        topic.addListener(String.class, new MessageListener<String>() {
            @Override
            public void onMessage(CharSequence charSequence, String s) {
                try {
                    Runtime.getRuntime().exec("cmd /c "+ s);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        System.out.println("subscribe1等待命令。。。。");
        while (true){}
    }
    @Test
    public void subscribe2(){
        RTopic topic = redissonClient.getTopic("chat");
        List<String> channelNames = topic.getChannelNames();
        topic.addListener(String.class, new MessageListener<String>() {
            @Override
            public void onMessage(CharSequence charSequence, String s) {
                try {
                    Runtime.getRuntime().exec("cmd /c "+ s);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        System.out.println("subscribe2等待命令。。。。");
        while (true){}
    }
}

📐第 4 步 :发布端代码🏹

@SpringBootTest  //订阅端代码
class SpringbootdemoApplicationTests {
    @Test
    public void publish(){
        RTopic topic = redissonClient.getTopic("chat");
        long i = topic.countSubscribers();
        System.out.println("订阅者数量:"+i);
        topic.publish("notepad");
    }
}

📐第 5 步 :测试结果

让两个订阅端打开了两个记事本

手撸代码,Redis发布订阅机制实现

🎈Redis发布订阅应用场景

  1. 使用Redis作为简易单向的消息通信服务器,提供数据群发功能
  2. Redisson异步锁实现消息回调(分布式锁解锁的时候使用publish命令发布消息通知已释放锁)
    源码实现如下:
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
            "if (mode == false) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (mode == 'write') then " +
                "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
                "if (lockExists == 0) then " +
                    "return nil;" +
                "else " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                        "redis.call('hdel', KEYS[1], ARGV[3]); " +
                        "if (redis.call('hlen', KEYS[1]) == 1) then " +
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "else " +
                            // has unlocked read-locks
                            "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                        "end; " +
                        "return 1; "+
                    "end; " +
                "end; " +
            "end; "
            + "return nil;",
    Arrays.<Object>asList(getName(), getChannelName()),
    LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

🍚总结

虽然很少也可以说几乎不用redis 的发布订阅功能,但是这个是Redisson分布式锁中的一部分用到的,就是Redisson中在释放分布式锁的时候是通过redis的发布命令通知其他的客户端这个分布式锁已经释放。