Netty多人与群组聊天
消息设计
图示:
public abstract class Packet {
/**
* 协议版本
*/
@JSONField(deserialize = false, serialize = false)
private Byte version = 1;
@JSONField(serialize = false)
public abstract Byte getCommand();
}
以上是通信过程中 Java 对象的抽象类,定义了一个版本号(默认值为 1 )以及一个获取指令的抽象方法,所有的指令数据包都必须实现这个方法,这样就可以知道某种指令的含义
如客户端请求登录消息:
LoginRequestPacket:
public class LoginRequestPacket extends Packet {
private String userName;
private String password;
@Override
public Byte getCommand() {
return LOGIN_REQUEST;
}
}
登录请求数据包继承自Packet,然后定义了三个字段,分别是用户 ID,用户名,密码,最为重要的就是覆盖了父类的 getCommand() 方法,值为常量 LOGIN_REQUEST。
其余消息图示:
编码以及解码
图示:
PacketCodec:
public class PacketCodec {
public static final int MAGIC_NUMBER = 0x12345678;
public static final PacketCodec INSTANCE = new PacketCodec();
private final Map<Byte, Class<? extends Packet>> packetTypeMap;
private final Map<Byte, Serializer> serializerMap;
private PacketCodec() {
packetTypeMap = new HashMap<>();
packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class);
packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class);
packetTypeMap.put(MESSAGE_REQUEST, MessageRequestPacket.class);
packetTypeMap.put(MESSAGE_RESPONSE, MessageResponsePacket.class);
packetTypeMap.put(LOGOUT_REQUEST, LogoutRequestPacket.class);
packetTypeMap.put(LOGOUT_RESPONSE, LogoutResponsePacket.class);
packetTypeMap.put(CREATE_GROUP_REQUEST, CreateGroupRequestPacket.class);
packetTypeMap.put(CREATE_GROUP_RESPONSE, CreateGroupResponsePacket.class);
packetTypeMap.put(JOIN_GROUP_REQUEST, JoinGroupRequestPacket.class);
packetTypeMap.put(JOIN_GROUP_RESPONSE, JoinGroupResponsePacket.class);
packetTypeMap.put(QUIT_GROUP_REQUEST, QuitGroupRequestPacket.class);
packetTypeMap.put(QUIT_GROUP_RESPONSE, QuitGroupResponsePacket.class);
packetTypeMap.put(LIST_GROUP_MEMBERS_REQUEST, ListGroupMembersRequestPacket.class);
packetTypeMap.put(LIST_GROUP_MEMBERS_RESPONSE, ListGroupMembersResponsePacket.class);
serializerMap = new HashMap<>();
Serializer serializer = new JSONSerializer();
serializerMap.put(serializer.getSerializerAlgorithm(), serializer);
}
public void encode(ByteBuf byteBuf, Packet packet) {
// 1. 序列化 java 对象
byte[] bytes = Serializer.DEFAULT.serialize(packet);
// 2. 实际编码过程
byteBuf.writeInt(MAGIC_NUMBER);
byteBuf.writeByte(packet.getVersion());
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm());
byteBuf.writeByte(packet.getCommand());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
}
public Packet decode(ByteBuf byteBuf) {
// 跳过 magic number
byteBuf.skipBytes(4);
// 跳过版本号
byteBuf.skipBytes(1);
// 序列化算法
byte serializeAlgorithm = byteBuf.readByte();
// 指令
byte command = byteBuf.readByte();
// 数据包长度
int length = byteBuf.readInt();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);
Class<? extends Packet> requestType = getRequestType(command);
Serializer serializer = getSerializer(serializeAlgorithm);
if (requestType != null && serializer != null) {
return serializer.deserialize(requestType, bytes);
}
return null;
}
private Serializer getSerializer(byte serializeAlgorithm) {
return serializerMap.get(serializeAlgorithm);
}
private Class<? extends Packet> getRequestType(byte command) {
return packetTypeMap.get(command);
}
}
用户状态以及群组的处理
通过Session来对应具体用户
public class Session {
// 用户唯一性标识
private String userId;
private String userName;
public Session(String userId, String userName) {
this.userId = userId;
this.userName = userName;
}
@Override
public String toString() {
return userId + ":" + userName;
}
}
SessionUtil:
public class SessionUtil {
private static final Map<String, Channel> userIdChannelMap = new ConcurrentHashMap<>();
private static final Map<String, ChannelGroup> groupIdChannelGroupMap = new ConcurrentHashMap<>();
public static void bindSession(Session session, Channel channel) {
userIdChannelMap.put(session.getUserId(), channel);
channel.attr(Attributes.SESSION).set(session);
}
public static void unBindSession(Channel channel) {
if (hasLogin(channel)) {
Session session = getSession(channel);
userIdChannelMap.remove(session.getUserId());
channel.attr(Attributes.SESSION).set(null);
System.out.println(session + " 退出登录!");
}
}
public static boolean hasLogin(Channel channel) {
return getSession(channel) != null;
}
public static Session getSession(Channel channel) {
return channel.attr(Attributes.SESSION).get();
}
public static Channel getChannel(String userId) {
return userIdChannelMap.get(userId);
}
public static void bindChannelGroup(String groupId, ChannelGroup channelGroup) {
groupIdChannelGroupMap.put(groupId, channelGroup);
}
public static ChannelGroup getChannelGroup(String groupId) {
return groupIdChannelGroupMap.get(groupId);
}
}
处理用户的登录登出以及群组的管理
聊天界面的管理
通过命令行输入具体的操作字符串来进行操作,图示:
整体处理流程:
public class NettyClient {
private static final int MAX_RETRY = 5;
private static final String HOST = "127.0.0.1";
private static final int PORT = 9977;
public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new Spliter());
ch.pipeline().addLast(new PacketDecoder());
// 登录响应处理器
ch.pipeline().addLast(new LoginResponseHandler());
// 收消息处理器
ch.pipeline().addLast(new MessageResponseHandler());
// 创建群响应处理器
ch.pipeline().addLast(new CreateGroupResponseHandler());
// 加群响应处理器
ch.pipeline().addLast(new JoinGroupResponseHandler());
// 退群响应处理器
ch.pipeline().addLast(new QuitGroupResponseHandler());
// 获取群成员响应处理器
ch.pipeline().addLast(new ListGroupMembersResponseHandler());
// 登出响应处理器
ch.pipeline().addLast(new LogoutResponseHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
connect(bootstrap, HOST, PORT, MAX_RETRY);
}
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 连接成功,启动控制台线程……");
Channel channel = ((ChannelFuture) future).channel();
startConsoleThread(channel);
} else if (retry == 0) {
System.err.println("重试次数已用完,放弃连接!");
} else {
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
.SECONDS);
}
});
}
private static void startConsoleThread(Channel channel) {
ConsoleCommandManager consoleCommandManager = new ConsoleCommandManager();
LoginConsoleCommand loginConsoleCommand = new LoginConsoleCommand();
Scanner scanner = new Scanner(System.in);
new Thread(() -> {
while (!Thread.interrupted()) {
if (!SessionUtil.hasLogin(channel)) {
loginConsoleCommand.exec(scanner, channel);
} else {
consoleCommandManager.exec(scanner, channel);
}
}
}).start();
}
}
public class NettyServer {
private static final int PORT = 9977;
public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new Spliter());
ch.pipeline().addLast(new PacketDecoder());
// 登录请求处理器
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new AuthHandler());
// 单聊消息请求处理器
ch.pipeline().addLast(new MessageRequestHandler());
// 创建群请求处理器
ch.pipeline().addLast(new CreateGroupRequestHandler());
// 加群请求处理器
ch.pipeline().addLast(new JoinGroupRequestHandler());
// 退群请求处理器
ch.pipeline().addLast(new QuitGroupRequestHandler());
// 获取群成员请求处理器
ch.pipeline().addLast(new ListGroupMembersRequestHandler());
// 登出请求处理器
ch.pipeline().addLast(new LogoutRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
bind(serverBootstrap, PORT);
}
private static void bind(final ServerBootstrap serverBootstrap, final int port) {
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
}
});
}
}