Netty 作为 websocket 客户端连接多个地址,如何处理不同 url

开源之夏第三届火热来袭,高校学生参与赢万元奖金!>>>

Netty 作为 websocket 客户端连接多个地址,如何处理不同 url

//现在要在客户端连接多个 websocket 连接
Bootstrap bootstrap = new Bootstrap()
.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.handler(initializer);
for (URI url : urls) {
ChannelFuture channelFuture = bootstrap.connect(url.getHost(), getPort(url)).sync();
//market of channel
String channelId = channelFuture.channel().id().asLongText();
Monitor.getStatus(channelId).setUrl(url.toString());
channelFuture.channel().closeFuture().sync();
}
//在设置websocket握手处理器时 无法获取上面的 url remoteAddress 报空指针
InetSocketAddress remoteAddress = socketChannel.remoteAddress();
URI uri = URIUtil.of(remoteAddress);
WebSocketClientProtocolHandler webSocketClientProtocolHandler = new WebSocketClientProtocolHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()
        )
);

哪位大佬有好的解决方案呀

回答

package com.example.nettydemo;

import cn.hutool.json.JSONObject;
import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

import java.net.URI;
import java.net.URISyntaxException;


/**
 * @author ASUS
 */ //客户端业务处理类
@ChannelHandler.Sharable
@Slf4j
public class ClientHandler extends SimpleChannelInboundHandler<Object> {


   private WebSocketClientHandshaker handshaker;


   /**
    * 当客户端主动链接服务端的链接后,调用此方法
    *
    * @param channelHandlerContext ChannelHandlerContext
    */
   @Override
   public void channelActive(ChannelHandlerContext channelHandlerContext) throws URISyntaxException {


      System.out.println("客户端Active .....");
      final Channel channel = channelHandlerContext.channel();
      StringBuffer sb = new StringBuffer();
      sb.append("ws://xxxx:port/ws?");
      sb.append("tenantId=100&appId=huike&");
      sb.append("username=user" + channel.id() + "&clientType=WXMP-TRTC&");
      sb.append("token=eyJhbGciOiJIUzUxMiJ9..-Un9sGFhKdhBmq8NYVDmcu7XJJJXYRVdmQAynu-DYGjYFc_ogHr5qfbUa0XtTt-yJqqLB3r-rX2Rwbv24z1Swg");

      URI uri = new URI(sb.toString());
      HttpHeaders httpHeaders = new DefaultHttpHeaders();
      handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, (String) null, true, httpHeaders);
      final ChannelFuture handshake;
      try {
         handshake = handshaker.handshake(channel).sync();
         handshake.addListener(e -> {
            final boolean success = e.isSuccess();
            if (success) {
               System.out.println("握手成功!");
               //发送消息
               JSONObject clientJson = new JSONObject();
               clientJson.set("type", "command");
               JSONObject dataJson = new JSONObject();
               dataJson.set("requestId", "请求唯一ID");
               clientJson.set("data", dataJson);
               //channel.writeAndFlush(new TextWebSocketFrame(clientJson.toString()));

            } else {
               System.err.println("握手失败");
            }
         });
      } catch (Exception e) {
         e.printStackTrace();
      }
      try {
         //channel.closeFuture();
      } catch (Exception e) {
         e.printStackTrace();
      }
   }

   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
      System.out.println("\n\t⌜⎓⎓⎓⎓⎓⎓exception⎓⎓⎓⎓⎓⎓⎓⎓⎓\n" +
            cause.getMessage());
      cause.printStackTrace();
      ctx.close();
   }


   @Override
   protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
      System.out.println("22222");

      if (!handshaker.isHandshakeComplete()) {
         try {
            handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) o);
            log.info("websocket Handshake 完成!");
         } catch (WebSocketHandshakeException e) {
            log.error("websocket连接失败!", e);
         }
      }

      // 握手协议返回,设置结束握手
      if (o instanceof TextWebSocketFrame) {
         TextWebSocketFrame textFrame = (TextWebSocketFrame) o;
         System.out.println("textFrame: " + textFrame.text());
      } else if (o instanceof CloseWebSocketFrame) {
         System.out.println("CloseWebSocketFrame");
      }

   }

   @Override
   public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
      if (evt instanceof IdleStateEvent) {
         IdleState state = ((IdleStateEvent) evt).state();
         if (state == IdleState.WRITER_IDLE) {
            //发送消息
            JSONObject clientJson = new JSONObject();
            clientJson.set("type", "command");
            JSONObject dataJson = new JSONObject();
            dataJson.set("requestId", "请求唯一ID");
            clientJson.set("data", dataJson);
            final String json = clientJson.toString();
            ctx.channel().writeAndFlush(new TextWebSocketFrame(json)).addListener(e -> {
               final boolean success = e.isSuccess();
               if (success) {
                  log.info("发送成功");
               } else {
                  log.info("发送失败");
               }
            });
            log.info("发送心跳数据:{}", json);
         }
      } else {
         super.userEventTriggered(ctx, evt);
      }

   }
}
channelActive的时候握手就可以了