准备
- 首先,先搭建两个SpringBoot项目,一个template的项目和一个template2的maven项目(后续作为MQ的生产者和消费者,以及dubbo的生产者和消费者)。
- 本地安装redis、rabbitMQ、和zookeeper,如何安装百度和google很多教程,可自行查找
- 在两个项目里引入相关jar包,如下:
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<!--rabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- dubbo依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.3</version>
<exclusions>
<exclusion>
<artifactId>spring</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
<!--zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
redis
在template项目中创建一个RedisTemplateConfig.java。此文件主要是作redis的配置,如:
package com.saras.template.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import redis.clients.jedis.JedisPoolConfig;
/**
* redis配置
*/
@Configuration
@EnableAutoConfiguration
@EnableCaching
public class RedisTemplateConfig {
private final static Logger logger = LoggerFactory.getLogger(RedisTemplateConfig.class);
@Bean
public JedisConnectionFactory getConnectionFactory() {
JedisConnectionFactory factory = new JedisConnectionFactory();
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(50);
config.setMinIdle(0);
config.setMaxWaitMillis(1000);
config.setMaxTotal(500);
factory.setPassword("template123");
factory.setHostName("127.0.0.1");
factory.setPort(6379);
factory.setTimeout(1000);
factory.setDatabase(1);
factory.setPoolConfig(config);
logger.info("JedisConnectionFactory bean init success.");
return factory;
}
@Bean
public RedisTemplate<?, ?> getRedisTemplate() {
return new StringRedisTemplate(getConnectionFactory());
}
@Bean
public CacheManager cacheManager(RedisTemplate redisTemplate) {
RedisCacheManager rcm = new RedisCacheManager(redisTemplate);
//设置缓存过期时间 可根据需求设置
//rcm.setDefaultExpiration(60);//秒
return rcm;
}
}
配置完成之后,redis即可放心食用,不过还是测试一下为好。创建一个TestController.java,内容如下:
package com.saras.template.test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class TestController {
@Autowired
private RedisTemplate redisTemplate;
@RequestMapping("redis")
@ResponseBody
public Object redis() {
//redis测试
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
valueOperations.set("template", "这里是测试redis的小用例");
return "hello redis";
}
@RequestMapping("get")
@ResponseBody
public String get() {
//redis测试
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
return valueOperations.get("template");
}
}
PS:太懒,不想写测试用例,怎么简单怎么来
写完之后启动项目,我将template的端口号配置为9096,所以访问localhost:9096/redis 如图:
然后再在新的标签页访问localhost:9096/get 如图:
这样基本上redis就完成了!
RabbitMQ
同样,在template项目上创建RabbitMQConfig.java,内容如下:
package com.saras.template.config;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE = "template-exchange";
public static final String ROUTINGKEY = "template-routingKey";
public static final String QUEUE = "template-queue";
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
这样RabbitMQ的基本配置就完成,同样来测试一下
创建RabbitMQSendService.java
package com.saras.template.core.rabbitmq;
public interface RabbitMQSendService {
void sendMsg(String message);
}
创建RabbitMQSendServiceImpl.java
package com.saras.template.core.rabbitmq.impl;
import com.saras.template.config.RabbitMQConfig;
import com.saras.template.core.rabbitmq.RabbitMQSendService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class RabbitMQSendServiceImpl implements RabbitTemplate.ConfirmCallback, RabbitMQSendService {
private final static Logger logger = LoggerFactory.getLogger(RabbitMQSendServiceImpl.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public RabbitMQSendServiceImpl(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void sendMsg(String message) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
logger.debug("correlationId:{}", correlationId);
logger.info("message:{}", message);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTINGKEY, message, correlationId);
}
/**
* 回调
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.debug("回调ID:{},ack:{},cause", correlationData, ack, cause);
if (ack) {
logger.debug("消息消费成功");
} else {
logger.debug("消息消费失败:{}", cause);
}
}
}
创建User.java
package com.saras.template.entity;
import com.saras.template.utils.ToString;
public class User {
/**
* 物理主键
*
* @mbggenerated 2017-04-02 12:17:02
*/
private Long id;
/**
* 用户ID
*
* @mbggenerated 2017-04-02 12:17:02
*/
private String user_id;
/**
* 用户名
*
* @mbggenerated 2017-04-02 12:17:02
*/
private String user_name;
/**
* 密码
*
* @mbggenerated 2017-04-02 12:17:02
*/
private String pwd;
/**
* 描述
*
* @mbggenerated 2017-04-02 12:17:02
*/
private String memo;
/**
* yyy
*
* @mbggenerated 2017-04-02 12:17:02
*/
private String ddd;
/**
* 获取 物理主键
*
* @mbggenerated 2017-04-02 12:17:02
*/
public Long getId() {
return id;
}
/**
* 设置 物理主键
*
* @mbggenerated 2017-04-02 12:17:02
*/
public void setId(Long id) {
this.id = id;
}
/**
* 获取 用户ID
*
* @mbggenerated 2017-04-02 12:17:02
*/
public String getUser_id() {
return user_id;
}
/**
* 设置 用户ID
*
* @mbggenerated 2017-04-02 12:17:02
*/
public void setUser_id(String user_id) {
this.user_id = user_id;
}
/**
* 获取 用户名
*
* @mbggenerated 2017-04-02 12:17:02
*/
public String getUser_name() {
return user_name;
}
/**
* 设置 用户名
*
* @mbggenerated 2017-04-02 12:17:02
*/
public void setUser_name(String user_name) {
this.user_name = user_name;
}
/**
* 获取 密码
*
* @mbggenerated 2017-04-02 12:17:02
*/
public String getPwd() {
return pwd;
}
/**
* 设置 密码
*
* @mbggenerated 2017-04-02 12:17:02
*/
public void setPwd(String pwd) {
this.pwd = pwd;
}
/**
* 获取 描述
*
* @mbggenerated 2017-04-02 12:17:02
*/
public String getMemo() {
return memo;
}
/**
* 设置 描述
*
* @mbggenerated 2017-04-02 12:17:02
*/
public void setMemo(String memo) {
this.memo = memo;
}
/**
* 获取 yyy
*
* @mbggenerated 2017-04-02 12:17:02
*/
public String getDdd() {
return ddd;
}
/**
* 设置 yyy
*
* @mbggenerated 2017-04-02 12:17:02
*/
public void setDdd(String ddd) {
this.ddd = ddd;
}
@Override
public String toString() {
return ToString.toString(this);
}
}
再创建TestRabbitMQ.java
package com.saras.template.test;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.saras.template.config.RabbitMQConfig;
import com.saras.template.entity.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TestRabbitMQ {
private final static Logger logger = LoggerFactory.getLogger(TestRabbitMQ.class);
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitMQConfig.QUEUE, durable = "true"),
exchange = @Exchange(value = RabbitMQConfig.EXCHANGE, durable = "true", type = ExchangeTypes.FANOUT)),
containerFactory = "rabbitListenerContainerFactory")
public void handle(Message msg) {
try {
byte[] bytes = msg.getBody();
String userString = JSONObject.parseObject(bytes, String.class, Feature.OrderedField);
User user = JSON.parseObject(userString, User.class);
logger.info("获取到RabbitMQ消息:{}", user);
} catch (Exception e) {
logger.error("消息转换失败:", e);
}
}
}
在TestController.java中添加如下内容:
@Autowired
private RabbitMQSendService rabbitMQSendService;
@RequestMapping("rabbitmq")
@ResponseBody
public Object rabbitmq() {
//rabbitMQ测试
User user = new User();
user.setUser_id("123456789");
user.setUser_name("rabbitMq");
user.setPwd("mq123456");
user.setMemo("MQtest");
String message = JSONObject.toJSONString(user);
rabbitMQSendService.sendMsg(message);
return "测试MQ";
}
完成这些之后启动项目,访问localhost:9096/rabbitmq,如图:
同时在控制台可看见,如图:
这样rabbitMQ就配置和测试基本完成,不过一个消息可能有多个消费者,所以再在template2项目中,创建RedisTemplateConfig.java,内容如上。 然后创建User.java,内容同样如上 最后创建TestRabbitMQ.java
package com.saras.tesmplate2;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TestRabbitMQ {
private final static Logger logger = LoggerFactory.getLogger(TestRabbitMQ.class);
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "template2-queue", durable = "true"),
exchange = @Exchange(value = "template-exchange", durable = "true", type = ExchangeTypes.FANOUT)),
containerFactory = "rabbitListenerContainerFactory")
public void handle(Message msg) {
try {
byte[] bytes = msg.getBody();
String userString = JSONObject.parseObject(bytes, String.class, Feature.OrderedField);
User user = JSON.parseObject(userString, User.class);
logger.info("这里是template2-获取到RabbitMQ消息:{}", user);
} catch (Exception e) {
logger.error("消息转换失败:", e);
}
}
}
exchange的value和template项目中的一样 启动项目template2。访问之前的localhost:9096/rabbitmq
这样在template2的控制台可见:
到目前为止rabbitMQ的配置和应用测试全部完成
dubbo+zookeeper
在template建立一个module,template-facade,这个module主要是对外提供的服务的模块。
在这个模块中新建一个 Testfacade.java
package com.saras.template;
public interface TestFacade {
String test(String message);
}
然后将facade这个module deploy到maven私库(maven私库的搭建请自行百度或google,教程很多也不难)
接下来创建DubboBaseConfig.java
package com.saras.template.config;
import com.alibaba.dubbo.config.*;
import com.alibaba.dubbo.config.spring.AnnotationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DubboBaseConfig {
/**
* 设置dubbo扫描包
*/
@Bean
public static AnnotationBean annotationBean() {
AnnotationBean annotationBean = new AnnotationBean();
annotationBean.setPackage("com.saras.template");
return annotationBean;
}
/**
* 注入dubbo上下文
*/
@Bean
public ApplicationConfig applicationConfig() {
// 当前应用配置
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName("template");
return applicationConfig;
}
/**
* 注入dubbo注册中心配置,基于zookeeper
*/
@Bean
public RegistryConfig registryConfig() {
// 连接注册中心配置
RegistryConfig registry = new RegistryConfig();
registry.setProtocol("zookeeper");
registry.setAddress("127.0.0.1:2181");
return registry;
}
/**
* 默认基于dubbo协议提供服务
*
* @return
*/
@Bean
public ProtocolConfig protocolConfig() {
// 服务提供者协议配置
ProtocolConfig protocolConfig = new ProtocolConfig();
protocolConfig.setName("dubbo");
protocolConfig.setPort(20880);
protocolConfig.setThreads(200);
return protocolConfig;
}
/**
* dubbo服务提供
*/
@Bean
public ProviderConfig providerConfig(ApplicationConfig applicationConfig, RegistryConfig registryConfig, ProtocolConfig protocolConfig) {
ProviderConfig providerConfig = new ProviderConfig();
providerConfig.setTimeout(3000);
providerConfig.setRetries(1);
providerConfig.setDelay(-1);
providerConfig.setApplication(applicationConfig);
providerConfig.setRegistry(registryConfig);
providerConfig.setProtocol(protocolConfig);
return providerConfig;
}
}
创建TestFacadeImpl.java实现facade的接口TestFacade
package com.saras.template;
import com.alibaba.dubbo.config.annotation.Service;
import com.saras.template.core.dubbo.DubboRemoteProxyFacotry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@Service(version = "1.0")
public class TestFacadeImpl implements TestFacade {
private final static Logger logger = LoggerFactory.getLogger(TestFacadeImpl.class);
@Autowired
private DubboRemoteProxyFacotry dubboRemoteProxyFacotry;
@Override
public String test(String message) {
logger.info("收到dubbo请求测试信息:{}", message);
return "已收到";
}
}
PS:这里的@Service 不是Spring的是dubbo的
然后在template2项目中,引入之前deploy到maven私库的template-facade jar包
创建DubboBaseConfig.java
package com.saras.template2.config;
import com.alibaba.dubbo.config.*;
import com.alibaba.dubbo.config.spring.AnnotationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DubboBaseConfig {
/**
* 设置dubbo扫描包
*/
@Bean
public static AnnotationBean annotationBean() {
AnnotationBean annotationBean = new AnnotationBean();
annotationBean.setPackage("com.saras.template2");
return annotationBean;
}
/**
* 注入dubbo上下文
*/
@Bean
public ApplicationConfig applicationConfig() {
// 当前应用配置
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName("template2");
return applicationConfig;
}
/**
* 注入dubbo注册中心配置,基于zookeeper
*/
@Bean
public RegistryConfig registryConfig() {
// 连接注册中心配置
RegistryConfig registry = new RegistryConfig();
registry.setProtocol("zookeeper");
registry.setAddress("127.0.0.1:2181");
return registry;
}
/**
* 默认基于dubbo协议提供服务
*
* @return
*/
@Bean
public ProtocolConfig protocolConfig() {
// 服务提供者协议配置
ProtocolConfig protocolConfig = new ProtocolConfig();
protocolConfig.setName("dubbo");
protocolConfig.setPort(20881);
protocolConfig.setThreads(200);
return protocolConfig;
}
/**
* dubbo服务提供
*/
@Bean
public ProviderConfig providerConfig(ApplicationConfig applicationConfig, RegistryConfig registryConfig, ProtocolConfig protocolConfig) {
ProviderConfig providerConfig = new ProviderConfig();
providerConfig.setTimeout(3000);
providerConfig.setRetries(1);
providerConfig.setDelay(-1);
providerConfig.setApplication(applicationConfig);
providerConfig.setRegistry(registryConfig);
providerConfig.setProtocol(protocolConfig);
return providerConfig;
}
}
内容基本和template中的差不多,只是改变了扫描路径和dubbo端口号
创建TestController.java
package com.sras.template2;
import com.alibaba.dubbo.config.annotation.Reference;
import com.saras.template.TestFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class TestController {
private final static Logger logger = LoggerFactory.getLogger(TestController.class);
@Reference(version = "1.0")
private TestFacade testFacade;
@RequestMapping("dubbo")
@ResponseBody
public Object dubbo() {
String message = testFacade.test("dubbo请求测试");
logger.info("dubbo接受结果信息:{}", message);
String message2 = testFacadeClient.testAync("dubbo异步请求测试");
logger.info("dubbo异步接受结果信息:{}", message2);
return message;
}
}
先启动本地的zookeeper
再启动template和template2,template2端口号为9095,所以访问localhost:9095/dubbo,如图所见:
同时在template控制台可见:
dubbo+zookeeper的配置和测试基本完成,系统间的交互也完成
不过我们还可以利用dubbo+zookeeper来通过group和version做异步通知
创建DubboRemoteProxyFactory.java
package com.saras.template.core.dubbo;
import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ReferenceConfig;
import com.alibaba.dubbo.config.RegistryConfig;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.saras.template.utils.ShutdownHooks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.Map;
@Configuration
public class DubboRemoteProxyFactory implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(DubboRemoteProxyFactory.class.getName());
private ApplicationContext applicationContext;
private static volatile Map<Key, ReferenceConfig> cache = Maps.newConcurrentMap();
private static final int PROVIDER_TIME_OUT = -1;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public <T> T getProxy(Class<T> clazz, String group, String version, int timeout) {
if ((Strings.isNullOrEmpty(version)) || (clazz == null)) {
throw new RuntimeException("获取dubbo服务代理时版本号和类不能为空");
}
Key key = new Key(clazz, group, version);
ReferenceConfig cachedReferenceConfig = cache.get(key);
if (cachedReferenceConfig != null) {
return (T) cachedReferenceConfig.get();
}
if (this.applicationContext == null) {
throw new RuntimeException("请配置DubboRemoteProxyFactory到spring容器中");
}
synchronized (DubboRemoteProxyFactory.class) {
cachedReferenceConfig = cache.get(key);
if (cachedReferenceConfig == null) {
ApplicationConfig applicationConfig = this.applicationContext.getBean(ApplicationConfig.class);
Map registryConfigMap = this.applicationContext.getBeansOfType(RegistryConfig.class);
if ((registryConfigMap == null) || (registryConfigMap.isEmpty())) {
throw new RuntimeException("请配置dubbo基本配置");
}
ReferenceConfig reference = new ReferenceConfig();
reference.setApplication(applicationConfig);
reference.setRegistries(new ArrayList(registryConfigMap.values()));
reference.setInterface(clazz);
reference.setVersion(version);
reference.setGroup(group);
if (-1 != timeout) {
reference.setTimeout(timeout);
}
try {
reference.get();
} catch (Exception e) {
logger.error("获取dubbo服务失败:{}", e.getMessage());
try {
reference.destroy();
} catch (Exception e1) {
logger.error("获取dubbo服务失败,销毁Invoker失败:{}", e1.getMessage());
throw new RuntimeException("获取dubbo服务失败,销毁Invoker失败:" + e.getMessage() + ",cause:" + e.getMessage());
}
throw new RuntimeException("获取dubbo服务失败:" + e.getMessage());
}
cache.put(key, reference);
cachedReferenceConfig = reference;
}
return (T) cachedReferenceConfig.get();
}
}
public <T> T getProxy(Class<T> clazz, String group, String version) {
return getProxy(clazz, group, version, -1);
}
static {
ShutdownHooks.addShutdownHook(() -> {
for (ReferenceConfig referenceConfig : DubboRemoteProxyFactory.cache.values()) {
try {
referenceConfig.destroy();
} catch (Exception e) {
DubboRemoteProxyFactory.logger.error("{}销毁异常", referenceConfig);
}
}
DubboRemoteProxyFactory.cache.clear();
}, "DubboRemoteProxyFacotryShutdownHook");
}
private static class Key {
private Class<?> clazz;
private String group;
private String version;
private Key(Class<?> clazz, String group, String version) {
this.clazz = clazz;
this.group = group;
this.version = version;
}
public Class<?> getClazz() {
return this.clazz;
}
public void setClazz(Class<?> clazz) {
this.clazz = clazz;
}
public String getGroup() {
return this.group;
}
public void setGroup(String group) {
this.group = group;
}
public String getVersion() {
return this.version;
}
public void setVersion(String version) {
this.version = version;
}
public boolean equals(Object o) {
if (this == o)
return true;
if ((o == null) || (getClass() != o.getClass())) {
return false;
}
Key key = (Key) o;
if (this.clazz != null ? !this.clazz.equals(key.clazz) : key.clazz != null)
return false;
if (this.group != null ? !this.group.equals(key.group) : key.group != null)
return false;
if (this.version != null ? !this.version.equals(key.version) : key.version != null) {
return false;
}
return true;
}
public int hashCode() {
int result = this.clazz != null ? this.clazz.hashCode() : 0;
result = 31 * result + (this.group != null ? this.group.hashCode() : 0);
result = 31 * result + (this.version != null ? this.version.hashCode() : 0);
return result;
}
}
}
在template中的TestFacade加入一个方法,如下:
package com.saras.template;
public interface TestFacade {
String test(String message);
String testAync(String message, String group, String version);
}
在TestFacadeImpl中实现
package com.saras.template;
import com.alibaba.dubbo.config.annotation.Service;
import com.saras.template.core.dubbo.DubboRemoteProxyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@Service(version = "1.0")
public class TestFacadeImpl implements TestFacade {
private final static Logger logger = LoggerFactory.getLogger(TestFacadeImpl.class);
@Autowired
private DubboRemoteProxyFactory dubboRemoteProxyFactory;
@Override
public String test(String message) {
logger.info("收到dubbo请求测试信息:{}", message);
return "template已收到";
}
@Override
public String testAync(String message, String group, String version) {
logger.info("收到dubbo请求测试信息:{}", message);
TestNotifyFacade testNotifyFacade = dubboRemoteProxyFactory.getProxy(TestNotifyFacade.class, group, version);
String result = testNotifyFacade.notify("通知template2消息:天天向上!");
logger.info("异步通知结果:{}", result);
return "等待异步通知消息";
}
}
再创建TestNotifyFacade.java
package com.saras.template;
public interface TestNotifyFacade {
String notify(String message);
}
然后重新将facade deploy到maven私库,template2重新拉包一下
在template2中创建TestAyncImpl.java 实现 facade中的接口 TestNotifyFacade
package com.saras.template2;
import com.alibaba.dubbo.config.annotation.Service;
import com.saras.template.TestNotifyFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Service(version = "1.0", group = "com.saras.template2.TestAyncImpl")
public class TestAyncImpl implements TestNotifyFacade {
private final static Logger logger = LoggerFactory.getLogger(TestAyncImpl.class);
@Override
public String notify(String s) {
logger.info("收到异步通知:{}", s);
return "返回处理结果告知template";
}
}
最后在TestController中加入
@RequestMapping("dubboAync")
@ResponseBody
public Object dubboAync() {
String message = testFacade.testAync("dubbo异步通知测试请求", "com.saras.template2.TestAyncImpl", "1.0");
logger.info("dubbo同步接受结果信息:{}", message);
return message;
}
重启template和template2,访问localhost:9095/dubboAync 可如图所见:
在template控制台可见:
在template2控制台可见:
这个机制可以在template2请求template时,template对于这个请求的处理可能会存在后续的变化,那么可以将group和version保存下来,等到template处理完成之后,再去告知template2这个请求的后续处理结果,达到异步的机制。因为懒,所以就做了看似同步的处理,其实可以用redis保存group和version,再去执行异步通知。
结尾
基本上到现在SpringBoot集成redis、rabbitMQ、dubbo+zookeeper就完成了。内容不是很多,但可以慢慢食用。
template项目地址: https://github.com/SarasXu/ProjectTemplate