diff --git a/README.md b/README.md index d03eb2a0..c069c84e 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ Using `CIM`, you can achieve the following requirements: | [群聊](https://youtu.be/_9a4lIkQ5_o) [私聊](https://youtu.be/kfEfQFPLBTQ) | [群聊](https://www.bilibili.com/video/av39405501) [私聊](https://www.bilibili.com/video/av39405821) | | | +![demo.gif](pic/demo.gif) ## TODO LIST diff --git a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientConfigurationData.java b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientConfigurationData.java index a9b95562..b76883c9 100644 --- a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientConfigurationData.java +++ b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientConfigurationData.java @@ -38,7 +38,7 @@ public static class Auth{ @JsonIgnore private MessageListener messageListener = - (client, msg) -> System.out.printf("id:[%s] msg:[%s]%n \n", client.getAuth(), msg); + (client, properties, msg) -> System.out.printf("id:[%s] msg:[%s]%n \n", client.getAuth(), msg); @JsonIgnore private OkHttpClient okHttpClient = new OkHttpClient(); diff --git a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientImpl.java b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientImpl.java index 9d4571b5..5071f627 100644 --- a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientImpl.java +++ b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientImpl.java @@ -6,10 +6,10 @@ import com.crossoverjie.cim.client.sdk.ReConnectManager; import com.crossoverjie.cim.client.sdk.RouteManager; import com.crossoverjie.cim.client.sdk.io.CIMClientHandleInitializer; -import com.crossoverjie.cim.common.constant.Constants; import com.crossoverjie.cim.common.exception.CIMException; import com.crossoverjie.cim.common.kit.HeartBeatHandler; import com.crossoverjie.cim.common.pojo.CIMUserInfo; +import com.crossoverjie.cim.common.protocol.BaseCommand; import com.crossoverjie.cim.common.protocol.Request; import com.crossoverjie.cim.route.api.vo.req.ChatReqVO; import com.crossoverjie.cim.route.api.vo.req.LoginReqVO; @@ -82,7 +82,7 @@ public ClientImpl(ClientConfigurationData conf) { heartBeatPacket = Request.newBuilder() .setRequestId(this.conf.getAuth().getUserId()) .setReqMsg("ping") - .setType(Constants.CommandType.PING) + .setCmd(BaseCommand.PING) .build(); client = this; @@ -177,7 +177,7 @@ private void loginServer() { Request login = Request.newBuilder() .setRequestId(this.conf.getAuth().getUserId()) .setReqMsg(this.conf.getAuth().getUserName()) - .setType(Constants.CommandType.LOGIN) + .setCmd(BaseCommand.LOGIN_REQUEST) .build(); channel.writeAndFlush(login) .addListener((ChannelFutureListener) channelFuture -> diff --git a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandle.java b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandle.java index 772639eb..f49de355 100644 --- a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandle.java +++ b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandle.java @@ -2,7 +2,7 @@ import com.crossoverjie.cim.client.sdk.ClientState; import com.crossoverjie.cim.client.sdk.impl.ClientImpl; -import com.crossoverjie.cim.common.constant.Constants; +import com.crossoverjie.cim.common.protocol.BaseCommand; import com.crossoverjie.cim.common.protocol.Response; import com.crossoverjie.cim.common.util.NettyAttrUtil; import io.netty.channel.ChannelFutureListener; @@ -60,15 +60,15 @@ public void channelInactive(ChannelHandlerContext ctx) { protected void channelRead0(ChannelHandlerContext ctx, Response msg) { - if (msg.getType() == Constants.CommandType.PING) { + if (msg.getCmd() == BaseCommand.PING) { ClientImpl.getClient().getConf().getEvent().debug("received ping from server"); NettyAttrUtil.updateReaderTime(ctx.channel(), System.currentTimeMillis()); } - if (msg.getType() != Constants.CommandType.PING) { + if (msg.getCmd() != BaseCommand.PING) { // callback ClientImpl.getClient().getConf().getCallbackThreadPool().execute(() -> { - ClientImpl.getClient().getConf().getMessageListener().received(ClientImpl.getClient(), msg.getResMsg()); + ClientImpl.getClient().getConf().getMessageListener().received(ClientImpl.getClient(), msg.getPropertiesMap(), msg.getResMsg()); }); } diff --git a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/MessageListener.java b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/MessageListener.java index 7fa21c87..51375dee 100644 --- a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/MessageListener.java +++ b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/MessageListener.java @@ -1,12 +1,14 @@ package com.crossoverjie.cim.client.sdk.io; import com.crossoverjie.cim.client.sdk.Client; +import java.util.Map; public interface MessageListener { /** - * @param client client - * @param msg msgs + * @param client client + * @param properties meta data + * @param msg msgs */ - void received(Client client, String msg); + void received(Client client, Map properties, String msg); } diff --git a/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java b/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java index 54dca1ea..e97a9ed9 100644 --- a/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java +++ b/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java @@ -3,6 +3,7 @@ import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData; import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff; import com.crossoverjie.cim.client.sdk.route.AbstractRouteBaseTest; +import com.crossoverjie.cim.common.constant.Constants; import com.crossoverjie.cim.common.pojo.CIMUserInfo; import com.crossoverjie.cim.route.api.vo.req.P2PReqVO; import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO; @@ -63,7 +64,11 @@ public void groupChat() throws Exception { Client client2 = Client.builder() .auth(auth2) .routeUrl(routeUrl) - .messageListener((client, message) -> client2Receive.set(message)) + .messageListener((client, properties, message) -> { + client2Receive.set(message); + Assertions.assertEquals(properties.get(Constants.MetaKey.USER_ID), String.valueOf(auth1.getUserId())); + Assertions.assertEquals(properties.get(Constants.MetaKey.USER_NAME), auth1.getUserName()); + }) .build(); TimeUnit.SECONDS.sleep(3); ClientState.State state2 = client2.getState(); @@ -91,7 +96,7 @@ public void groupChat() throws Exception { }); Awaitility.await().untilAsserted( - () -> Assertions.assertEquals(String.format("crossoverJie:%s", msg), client2Receive.get())); + () -> Assertions.assertEquals(msg, client2Receive.get())); super.stopSingle(); } @@ -139,7 +144,7 @@ public void testP2PChat() throws Exception { Client client2 = Client.builder() .auth(auth2) .routeUrl(routeUrl) - .messageListener((client, message) -> client2Receive.set(message)) + .messageListener((client, properties, message) -> client2Receive.set(message)) .build(); TimeUnit.SECONDS.sleep(3); ClientState.State state2 = client2.getState(); @@ -156,7 +161,7 @@ public void testP2PChat() throws Exception { Client client3 = Client.builder() .auth(auth3) .routeUrl(routeUrl) - .messageListener((client, message) -> { + .messageListener((client, properties, message) -> { log.info("client3 receive message = {}", message); client3Receive.set(message); }) @@ -192,7 +197,7 @@ public void testP2PChat() throws Exception { }); Awaitility.await().untilAsserted( - () -> Assertions.assertEquals(String.format("%s:%s", cj, msg), client3Receive.get())); + () -> Assertions.assertEquals(msg, client3Receive.get())); Awaitility.await().untilAsserted( () -> Assertions.assertNull(client2Receive.get())); super.stopSingle(); @@ -244,7 +249,7 @@ public void testReconnect() throws Exception { Client client2 = Client.builder() .auth(auth2) .routeUrl(routeUrl) - .messageListener((client, message) -> client2Receive.set(message)) + .messageListener((client, properties, message) -> client2Receive.set(message)) .backoffStrategy(backoffStrategy) .build(); TimeUnit.SECONDS.sleep(3); @@ -260,7 +265,7 @@ public void testReconnect() throws Exception { String msg = "hello"; client1.sendGroup(msg); Awaitility.await() - .untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get())); + .untilAsserted(() -> Assertions.assertEquals(msg, client2Receive.get())); client2Receive.set(""); @@ -287,7 +292,7 @@ public void testReconnect() throws Exception { log.info("send message again, client2Receive = {}", client2Receive.get()); client1.sendGroup(msg); Awaitility.await() - .untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get())); + .untilAsserted(() -> Assertions.assertEquals(msg, client2Receive.get())); super.stopTwoServer(); } @@ -327,7 +332,7 @@ public void offLineAndOnline() throws Exception { Client client2 = Client.builder() .auth(auth2) .routeUrl(routeUrl) - .messageListener((client, message) -> client2Receive.set(message)) + .messageListener((client, properties, message) -> client2Receive.set(message)) // Avoid auto reconnect, this test will manually close client. .reconnectCheck((client) -> false) .build(); @@ -344,7 +349,7 @@ public void offLineAndOnline() throws Exception { String msg = "hello"; client1.sendGroup(msg); Awaitility.await().untilAsserted( - () -> Assertions.assertEquals(String.format("crossoverJie:%s", msg), client2Receive.get())); + () -> Assertions.assertEquals(msg, client2Receive.get())); client2Receive.set(""); // Manually offline @@ -353,7 +358,7 @@ public void offLineAndOnline() throws Exception { client2 = Client.builder() .auth(auth2) .routeUrl(routeUrl) - .messageListener((client, message) -> client2Receive.set(message)) + .messageListener((client, properties, message) -> client2Receive.set(message)) // Avoid to auto reconnect, this test will manually close client. .reconnectCheck((client) -> false) .build(); @@ -364,7 +369,7 @@ public void offLineAndOnline() throws Exception { // send msg again client1.sendGroup(msg); Awaitility.await().untilAsserted( - () -> Assertions.assertEquals(String.format("crossoverJie:%s", msg), client2Receive.get())); + () -> Assertions.assertEquals(msg, client2Receive.get())); super.stopSingle(); } diff --git a/cim-client/src/main/java/com/crossoverjie/cim/client/service/impl/MsgCallBackListener.java b/cim-client/src/main/java/com/crossoverjie/cim/client/service/impl/MsgCallBackListener.java index b31df4c0..040ef0fd 100644 --- a/cim-client/src/main/java/com/crossoverjie/cim/client/service/impl/MsgCallBackListener.java +++ b/cim-client/src/main/java/com/crossoverjie/cim/client/service/impl/MsgCallBackListener.java @@ -4,6 +4,8 @@ import com.crossoverjie.cim.client.sdk.Event; import com.crossoverjie.cim.client.sdk.io.MessageListener; import com.crossoverjie.cim.client.service.MsgLogger; +import com.crossoverjie.cim.common.constant.Constants; +import java.util.Map; /** * Function:自定义收到消息回调 @@ -25,8 +27,9 @@ public MsgCallBackListener(MsgLogger msgLogger, Event event) { @Override - public void received(Client client, String msg) { - this.msgLogger.log(msg); - this.event.info(msg); + public void received(Client client, Map properties, String msg) { + String sendUserName = properties.getOrDefault(Constants.MetaKey.USER_NAME, "nobody"); + this.msgLogger.log(sendUserName + ":" + msg); + this.event.info(sendUserName + ":" + msg); } } diff --git a/cim-common/src/main/java/com/crossoverjie/cim/common/constant/Constants.java b/cim-common/src/main/java/com/crossoverjie/cim/common/constant/Constants.java index e4191eda..f128b46c 100644 --- a/cim-common/src/main/java/com/crossoverjie/cim/common/constant/Constants.java +++ b/cim-common/src/main/java/com/crossoverjie/cim/common/constant/Constants.java @@ -21,25 +21,9 @@ public class Constants { */ public static final String COUNTER_CLIENT_PUSH_COUNT = "counter.client.push.count" ; - - /** - * 自定义报文类型 - */ - public static class CommandType{ - /** - * 登录 - */ - public static final int LOGIN = 1 ; - /** - * 业务消息 - */ - public static final int MSG = 2 ; - - /** - * ping - */ - public static final int PING = 3 ; + public static class MetaKey { + public static final String USER_ID = "userId" ; + public static final String USER_NAME = "userName" ; } - } diff --git a/cim-common/src/main/proto/cim.proto b/cim-common/src/main/proto/cim.proto index 7b229cd2..3b38289b 100644 --- a/cim-common/src/main/proto/cim.proto +++ b/cim-common/src/main/proto/cim.proto @@ -4,14 +4,21 @@ option java_package = "com.crossoverjie.cim.common.protocol"; option java_multiple_files = true; message Request{ - // todo source user info - int64 requestId = 2; - string reqMsg = 1; - int32 type = 3; + int64 requestId = 2; + string reqMsg = 1; + BaseCommand cmd = 3; + map properties = 4; } message Response{ - int64 responseId = 2; - string resMsg = 1; - int32 type = 3; + int64 responseId = 2; + string resMsg = 1; + BaseCommand cmd = 3; + map properties = 4; +} + +enum BaseCommand{ + LOGIN_REQUEST = 0; + MESSAGE = 1; + PING = 2; } \ No newline at end of file diff --git a/cim-common/src/test/java/com/crossoverjie/cim/common/util/ProtocolTest.java b/cim-common/src/test/java/com/crossoverjie/cim/common/util/ProtocolTest.java index 5f87c01f..bcdd9194 100644 --- a/cim-common/src/test/java/com/crossoverjie/cim/common/util/ProtocolTest.java +++ b/cim-common/src/test/java/com/crossoverjie/cim/common/util/ProtocolTest.java @@ -1,5 +1,6 @@ package com.crossoverjie.cim.common.util; +import com.crossoverjie.cim.common.protocol.BaseCommand; import com.crossoverjie.cim.common.protocol.Request; import com.google.protobuf.InvalidProtocolBufferException; import org.junit.Test; @@ -11,7 +12,7 @@ public void testProtocol() throws InvalidProtocolBufferException { Request protocol = Request.newBuilder() .setRequestId(123L) .setReqMsg("你好啊") - .setType(1) + .setCmd(BaseCommand.LOGIN_REQUEST) .build(); byte[] encode = encode(protocol); diff --git a/cim-forward-route/src/main/java/com/crossoverjie/cim/route/service/impl/AccountServiceRedisImpl.java b/cim-forward-route/src/main/java/com/crossoverjie/cim/route/service/impl/AccountServiceRedisImpl.java index 3ab5300b..d62548d3 100644 --- a/cim-forward-route/src/main/java/com/crossoverjie/cim/route/service/impl/AccountServiceRedisImpl.java +++ b/cim-forward-route/src/main/java/com/crossoverjie/cim/route/service/impl/AccountServiceRedisImpl.java @@ -1,5 +1,6 @@ package com.crossoverjie.cim.route.service.impl; +import com.crossoverjie.cim.common.constant.Constants; import com.crossoverjie.cim.common.core.proxy.RpcProxyManager; import com.crossoverjie.cim.common.enums.StatusEnum; import com.crossoverjie.cim.common.exception.CIMException; @@ -10,6 +11,7 @@ import com.crossoverjie.cim.route.api.vo.req.LoginReqVO; import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO; import com.crossoverjie.cim.route.api.vo.res.RegisterInfoResVO; +import com.crossoverjie.cim.route.constant.Constant; import com.crossoverjie.cim.route.service.AccountService; import com.crossoverjie.cim.route.service.UserInfoCacheService; import com.crossoverjie.cim.server.api.ServerApi; @@ -159,7 +161,11 @@ public void pushMsg(CIMServerResVO cimServerResVO, long sendUserId, ChatReqVO gr cimUserInfo.ifPresent(userInfo -> { String url = "http://" + cimServerResVO.getIp() + ":" + cimServerResVO.getHttpPort(); SendMsgReqVO vo = - new SendMsgReqVO(userInfo.getUserName() + ":" + groupReqVO.getMsg(), groupReqVO.getUserId()); + new SendMsgReqVO(groupReqVO.getMsg(), groupReqVO.getUserId()); + vo.setProperties(Map.of( + Constants.MetaKey.USER_ID, String.valueOf(sendUserId), + Constants.MetaKey.USER_NAME, userInfo.getUserName()) + ); serverApi.sendMsg(vo, url); }); diff --git a/cim-forward-route/src/main/java/com/crossoverjie/cim/route/service/impl/UserInfoCacheServiceImpl.java b/cim-forward-route/src/main/java/com/crossoverjie/cim/route/service/impl/UserInfoCacheServiceImpl.java index 76a1e8b7..336afb32 100644 --- a/cim-forward-route/src/main/java/com/crossoverjie/cim/route/service/impl/UserInfoCacheServiceImpl.java +++ b/cim-forward-route/src/main/java/com/crossoverjie/cim/route/service/impl/UserInfoCacheServiceImpl.java @@ -41,8 +41,7 @@ public class UserInfoCacheServiceImpl implements UserInfoCacheService { @Override public Optional loadUserInfoByUserId(Long userId) { //Retrieve user information using a second-level cache. - Optional cimUserInfo = userInfoMap.getUnchecked(userId); - return cimUserInfo; + return userInfoMap.getUnchecked(userId); } @Override diff --git a/cim-server-api/src/main/java/com/crossoverjie/cim/server/api/vo/req/SendMsgReqVO.java b/cim-server-api/src/main/java/com/crossoverjie/cim/server/api/vo/req/SendMsgReqVO.java index 247d49f4..730588b2 100644 --- a/cim-server-api/src/main/java/com/crossoverjie/cim/server/api/vo/req/SendMsgReqVO.java +++ b/cim-server-api/src/main/java/com/crossoverjie/cim/server/api/vo/req/SendMsgReqVO.java @@ -4,6 +4,9 @@ import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; +import java.util.Map; +import lombok.Getter; +import lombok.Setter; /** * Function: @@ -22,6 +25,10 @@ public class SendMsgReqVO extends BaseRequest { @Schema(requiredMode = Schema.RequiredMode.REQUIRED, description = "userId", example = "11") private Long userId ; + @Setter + @Getter + private Map properties; + public SendMsgReqVO() { } diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/config/BeanConfig.java b/cim-server/src/main/java/com/crossoverjie/cim/server/config/BeanConfig.java index 0acec955..f73bc49f 100644 --- a/cim-server/src/main/java/com/crossoverjie/cim/server/config/BeanConfig.java +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/config/BeanConfig.java @@ -1,9 +1,9 @@ package com.crossoverjie.cim.server.config; -import com.crossoverjie.cim.common.constant.Constants; import com.crossoverjie.cim.common.core.proxy.RpcProxyManager; import com.crossoverjie.cim.common.metastore.MetaStore; import com.crossoverjie.cim.common.metastore.ZkMetaStoreImpl; +import com.crossoverjie.cim.common.protocol.BaseCommand; import com.crossoverjie.cim.common.protocol.Request; import com.crossoverjie.cim.route.api.RouteApi; import jakarta.annotation.Resource; @@ -54,7 +54,7 @@ public Request heartBeat() { return Request.newBuilder() .setRequestId(0L) .setReqMsg("pong") - .setType(Constants.CommandType.PING) + .setCmd(BaseCommand.PING) .build(); } diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/handle/CIMServerHandle.java b/cim-server/src/main/java/com/crossoverjie/cim/server/handle/CIMServerHandle.java index f8896e9c..13636af0 100644 --- a/cim-server/src/main/java/com/crossoverjie/cim/server/handle/CIMServerHandle.java +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/handle/CIMServerHandle.java @@ -1,9 +1,9 @@ package com.crossoverjie.cim.server.handle; -import com.crossoverjie.cim.common.constant.Constants; import com.crossoverjie.cim.common.exception.CIMException; import com.crossoverjie.cim.common.kit.HeartBeatHandler; import com.crossoverjie.cim.common.pojo.CIMUserInfo; +import com.crossoverjie.cim.common.protocol.BaseCommand; import com.crossoverjie.cim.common.protocol.Request; import com.crossoverjie.cim.common.util.NettyAttrUtil; import com.crossoverjie.cim.server.kit.RouteHandler; @@ -74,7 +74,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc protected void channelRead0(ChannelHandlerContext ctx, Request msg) throws Exception { log.info("received msg=[{}]", msg.toString()); - if (msg.getType() == Constants.CommandType.LOGIN) { + if (msg.getCmd() == BaseCommand.LOGIN_REQUEST) { //保存客户端与 Channel 之间的关系 SessionSocketHolder.put(msg.getRequestId(), (NioSocketChannel) ctx.channel()); SessionSocketHolder.saveSession(msg.getRequestId(), msg.getReqMsg()); @@ -82,7 +82,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Request msg) throws Excep } //心跳更新时间 - if (msg.getType() == Constants.CommandType.PING){ + if (msg.getCmd() == BaseCommand.PING){ NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis()); //向客户端响应 pong 消息 Request heartBeat = SpringBeanFactory.getBean("heartBeat", Request.class); diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/server/CIMServer.java b/cim-server/src/main/java/com/crossoverjie/cim/server/server/CIMServer.java index 6ea5848a..89a3e73a 100644 --- a/cim-server/src/main/java/com/crossoverjie/cim/server/server/CIMServer.java +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/server/CIMServer.java @@ -1,6 +1,6 @@ package com.crossoverjie.cim.server.server; -import com.crossoverjie.cim.common.constant.Constants; +import com.crossoverjie.cim.common.protocol.BaseCommand; import com.crossoverjie.cim.common.protocol.Request; import com.crossoverjie.cim.server.api.vo.req.SendMsgReqVO; import com.crossoverjie.cim.server.init.CIMServerInitializer; @@ -89,7 +89,8 @@ public void sendMsg(SendMsgReqVO sendMsgReqVO){ Request protocol = Request.newBuilder() .setRequestId(sendMsgReqVO.getUserId()) .setReqMsg(sendMsgReqVO.getMsg()) - .setType(Constants.CommandType.MSG) + .putAllProperties(sendMsgReqVO.getProperties()) + .setCmd(BaseCommand.MESSAGE) .build(); ChannelFuture future = socketChannel.writeAndFlush(protocol); diff --git a/pic/demo.gif b/pic/demo.gif new file mode 100644 index 00000000..c0bd7cf0 Binary files /dev/null and b/pic/demo.gif differ