开源之夏第三届火热来袭,高校学生参与赢万元奖金!>>>

//现在要在客户端连接多个 websocket 连接 Bootstrap bootstrap = new Bootstrap() .group(workGroup) .channel(NioSocketChannel.class) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .handler(initializer); for (URI url : urls) { ChannelFuture channelFuture = bootstrap.connect(url.getHost(), getPort(url)).sync(); //market of channel String channelId = channelFuture.channel().id().asLongText(); Monitor.getStatus(channelId).setUrl(url.toString()); channelFuture.channel().closeFuture().sync(); }
//在设置websocket握手处理器时 无法获取上面的 url remoteAddress 报空指针 InetSocketAddress remoteAddress = socketChannel.remoteAddress(); URI uri = URIUtil.of(remoteAddress); WebSocketClientProtocolHandler webSocketClientProtocolHandler = new WebSocketClientProtocolHandler( WebSocketClientHandshakerFactory.newHandshaker( uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders() ) );
哪位大佬有好的解决方案呀
回答
package com.example.nettydemo; import cn.hutool.json.JSONObject; import io.netty.channel.*; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.websocketx.*; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import java.net.URI; import java.net.URISyntaxException; /** * @author ASUS */ //客户端业务处理类 @ChannelHandler.Sharable @Slf4j public class ClientHandler extends SimpleChannelInboundHandler<Object> { private WebSocketClientHandshaker handshaker; /** * 当客户端主动链接服务端的链接后,调用此方法 * * @param channelHandlerContext ChannelHandlerContext */ @Override public void channelActive(ChannelHandlerContext channelHandlerContext) throws URISyntaxException { System.out.println("客户端Active ....."); final Channel channel = channelHandlerContext.channel(); StringBuffer sb = new StringBuffer(); sb.append("ws://xxxx:port/ws?"); sb.append("tenantId=100&appId=huike&"); sb.append("username=user" + channel.id() + "&clientType=WXMP-TRTC&"); sb.append("token=eyJhbGciOiJIUzUxMiJ9..-Un9sGFhKdhBmq8NYVDmcu7XJJJXYRVdmQAynu-DYGjYFc_ogHr5qfbUa0XtTt-yJqqLB3r-rX2Rwbv24z1Swg"); URI uri = new URI(sb.toString()); HttpHeaders httpHeaders = new DefaultHttpHeaders(); handshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, (String) null, true, httpHeaders); final ChannelFuture handshake; try { handshake = handshaker.handshake(channel).sync(); handshake.addListener(e -> { final boolean success = e.isSuccess(); if (success) { System.out.println("握手成功!"); //发送消息 JSONObject clientJson = new JSONObject(); clientJson.set("type", "command"); JSONObject dataJson = new JSONObject(); dataJson.set("requestId", "请求唯一ID"); clientJson.set("data", dataJson); //channel.writeAndFlush(new TextWebSocketFrame(clientJson.toString())); } else { System.err.println("握手失败"); } }); } catch (Exception e) { e.printStackTrace(); } try { //channel.closeFuture(); } catch (Exception e) { e.printStackTrace(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.out.println("\n\t⌜⎓⎓⎓⎓⎓⎓exception⎓⎓⎓⎓⎓⎓⎓⎓⎓\n" + cause.getMessage()); cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception { System.out.println("22222"); if (!handshaker.isHandshakeComplete()) { try { handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) o); log.info("websocket Handshake 完成!"); } catch (WebSocketHandshakeException e) { log.error("websocket连接失败!", e); } } // 握手协议返回,设置结束握手 if (o instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame) o; System.out.println("textFrame: " + textFrame.text()); } else if (o instanceof CloseWebSocketFrame) { System.out.println("CloseWebSocketFrame"); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.WRITER_IDLE) { //发送消息 JSONObject clientJson = new JSONObject(); clientJson.set("type", "command"); JSONObject dataJson = new JSONObject(); dataJson.set("requestId", "请求唯一ID"); clientJson.set("data", dataJson); final String json = clientJson.toString(); ctx.channel().writeAndFlush(new TextWebSocketFrame(json)).addListener(e -> { final boolean success = e.isSuccess(); if (success) { log.info("发送成功"); } else { log.info("发送失败"); } }); log.info("发送心跳数据:{}", json); } } else { super.userEventTriggered(ctx, evt); } } }
channelActive的时候握手就可以了
发表评论