准备

  1. 首先,先搭建两个SpringBoot项目,一个template的项目和一个template2的maven项目(后续作为MQ的生产者和消费者,以及dubbo的生产者和消费者)。
  2. 本地安装redis、rabbitMQ、和zookeeper,如何安装百度和google很多教程,可自行查找
  3. 在两个项目里引入相关jar包,如下:
	 	<!-- redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-redis</artifactId>
        </dependency>
        <!--rabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
 		<!-- dubbo依赖 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>dubbo</artifactId>
            <version>2.5.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>spring</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--zookeeper-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
        <dependency>
            <groupId>com.github.sgroschupf</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.1</version>
        </dependency>	

redis

在template项目中创建一个RedisTemplateConfig.java。此文件主要是作redis的配置,如:

package com.saras.template.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import redis.clients.jedis.JedisPoolConfig;

/**
 * redis配置
 */
@Configuration
@EnableAutoConfiguration
@EnableCaching
public class RedisTemplateConfig {
    private final static Logger logger = LoggerFactory.getLogger(RedisTemplateConfig.class);

    @Bean
    public JedisConnectionFactory getConnectionFactory() {
        JedisConnectionFactory factory = new JedisConnectionFactory();
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxIdle(50);
        config.setMinIdle(0);
        config.setMaxWaitMillis(1000);
        config.setMaxTotal(500);
        factory.setPassword("template123");
        factory.setHostName("127.0.0.1");
        factory.setPort(6379);
        factory.setTimeout(1000);
        factory.setDatabase(1);
        factory.setPoolConfig(config);
        logger.info("JedisConnectionFactory bean init success.");
        return factory;
    }

    @Bean
    public RedisTemplate<?, ?> getRedisTemplate() {
        return new StringRedisTemplate(getConnectionFactory());
    }

    @Bean
    public CacheManager cacheManager(RedisTemplate redisTemplate) {
        RedisCacheManager rcm = new RedisCacheManager(redisTemplate);
        //设置缓存过期时间 可根据需求设置
        //rcm.setDefaultExpiration(60);//秒
        return rcm;

    }
}

配置完成之后,redis即可放心食用,不过还是测试一下为好。创建一个TestController.java,内容如下:

package com.saras.template.test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class TestController {
    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping("redis")
    @ResponseBody
    public Object redis() {
        //redis测试
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
        valueOperations.set("template", "这里是测试redis的小用例");
        return "hello redis";
    }

    @RequestMapping("get")
    @ResponseBody
    public String get() {
        //redis测试
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
        return valueOperations.get("template");
    }
}

PS:太懒,不想写测试用例,怎么简单怎么来

写完之后启动项目,我将template的端口号配置为9096,所以访问localhost:9096/redis 如图:

Alt text

然后再在新的标签页访问localhost:9096/get 如图:

Alt text

这样基本上redis就完成了!

RabbitMQ

同样,在template项目上创建RabbitMQConfig.java,内容如下:


package com.saras.template.config;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;


@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE = "template-exchange";
    public static final String ROUTINGKEY = "template-routingKey";
    public static final String QUEUE = "template-queue";

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); 
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}

这样RabbitMQ的基本配置就完成,同样来测试一下

创建RabbitMQSendService.java


package com.saras.template.core.rabbitmq;

public interface RabbitMQSendService {

    void sendMsg(String message);
}

创建RabbitMQSendServiceImpl.java

package com.saras.template.core.rabbitmq.impl;

import com.saras.template.config.RabbitMQConfig;
import com.saras.template.core.rabbitmq.RabbitMQSendService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class RabbitMQSendServiceImpl implements RabbitTemplate.ConfirmCallback, RabbitMQSendService {
    private final static Logger logger = LoggerFactory.getLogger(RabbitMQSendServiceImpl.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public RabbitMQSendServiceImpl(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void sendMsg(String message) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        logger.debug("correlationId:{}", correlationId);
        logger.info("message:{}", message);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE, RabbitMQConfig.ROUTINGKEY, message, correlationId);
    }

    /**
     * 回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.debug("回调ID:{},ack:{},cause", correlationData, ack, cause);
        if (ack) {
            logger.debug("消息消费成功");
        } else {
            logger.debug("消息消费失败:{}", cause);
        }
    }
}

创建User.java

package com.saras.template.entity;

import com.saras.template.utils.ToString;

public class User {
	/**
	 * 物理主键
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	private Long id;
	
	/**
	 * 用户ID
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	private String user_id;
	
	/**
	 * 用户名
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	private String user_name;
	
	/**
	 * 密码
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	private String pwd;
	
	/**
	 * 描述
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	private String memo;
	
	/**
	 * yyy
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	private String ddd;
	
	/**
	 * 获取 物理主键
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public Long getId() {
		return id;
	}
	
	/**
	 * 设置 物理主键
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public void setId(Long id) {
		this.id = id;
	}
	
	/**
	 * 获取 用户ID
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public String getUser_id() {
		return user_id;
	}
	
	/**
	 * 设置 用户ID
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public void setUser_id(String user_id) {
		this.user_id = user_id;
	}
	
	/**
	 * 获取 用户名
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public String getUser_name() {
		return user_name;
	}
	
	/**
	 * 设置 用户名
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public void setUser_name(String user_name) {
		this.user_name = user_name;
	}
	
	/**
	 * 获取 密码
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public String getPwd() {
		return pwd;
	}
	
	/**
	 * 设置 密码
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public void setPwd(String pwd) {
		this.pwd = pwd;
	}
	
	/**
	 * 获取 描述
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public String getMemo() {
		return memo;
	}
	
	/**
	 * 设置 描述
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public void setMemo(String memo) {
		this.memo = memo;
	}
	
	/**
	 * 获取 yyy
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public String getDdd() {
		return ddd;
	}
	
	/**
	 * 设置 yyy
	 *
	 * @mbggenerated 2017-04-02 12:17:02
	 */
	public void setDdd(String ddd) {
		this.ddd = ddd;
	}
	
	@Override
	public String toString() {
		return ToString.toString(this);
	}
}

再创建TestRabbitMQ.java

package com.saras.template.test;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.saras.template.config.RabbitMQConfig;
import com.saras.template.entity.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TestRabbitMQ {
    private final static Logger logger = LoggerFactory.getLogger(TestRabbitMQ.class);


    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitMQConfig.QUEUE, durable = "true"),
            exchange = @Exchange(value = RabbitMQConfig.EXCHANGE, durable = "true", type = ExchangeTypes.FANOUT)),
            containerFactory = "rabbitListenerContainerFactory")
    public void handle(Message msg) {
        try {
            byte[] bytes = msg.getBody();
            String userString = JSONObject.parseObject(bytes, String.class, Feature.OrderedField);
            User user = JSON.parseObject(userString, User.class);
            logger.info("获取到RabbitMQ消息:{}", user);
        } catch (Exception e) {
            logger.error("消息转换失败:", e);
        }
    }

}


在TestController.java中添加如下内容:

	@Autowired
    private RabbitMQSendService rabbitMQSendService;

	@RequestMapping("rabbitmq")
    @ResponseBody
    public Object rabbitmq() {
        //rabbitMQ测试
        User user = new User();
        user.setUser_id("123456789");
        user.setUser_name("rabbitMq");
        user.setPwd("mq123456");
        user.setMemo("MQtest");
        String message = JSONObject.toJSONString(user);
        rabbitMQSendService.sendMsg(message);
        return "测试MQ";
    }

完成这些之后启动项目,访问localhost:9096/rabbitmq,如图:

Alt text

同时在控制台可看见,如图:

Alt text

这样rabbitMQ就配置和测试基本完成,不过一个消息可能有多个消费者,所以再在template2项目中,创建RedisTemplateConfig.java,内容如上。 然后创建User.java,内容同样如上 最后创建TestRabbitMQ.java


package com.saras.tesmplate2;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TestRabbitMQ {
    private final static Logger logger = LoggerFactory.getLogger(TestRabbitMQ.class);


    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "template2-queue", durable = "true"),
            exchange = @Exchange(value = "template-exchange", durable = "true", type = ExchangeTypes.FANOUT)),
            containerFactory = "rabbitListenerContainerFactory")
    public void handle(Message msg) {
        try {
            byte[] bytes = msg.getBody();
            String userString = JSONObject.parseObject(bytes, String.class, Feature.OrderedField);
            User user = JSON.parseObject(userString, User.class);
            logger.info("这里是template2-获取到RabbitMQ消息:{}", user);
        } catch (Exception e) {
            logger.error("消息转换失败:", e);
        }
    }

}

exchange的value和template项目中的一样 启动项目template2。访问之前的localhost:9096/rabbitmq

这样在template2的控制台可见:

Alt text

到目前为止rabbitMQ的配置和应用测试全部完成

dubbo+zookeeper

在template建立一个module,template-facade,这个module主要是对外提供的服务的模块。

在这个模块中新建一个 Testfacade.java


package com.saras.template;

public interface TestFacade {

    String test(String message);

}

然后将facade这个module deploy到maven私库(maven私库的搭建请自行百度或google,教程很多也不难)

接下来创建DubboBaseConfig.java

package com.saras.template.config;

import com.alibaba.dubbo.config.*;
import com.alibaba.dubbo.config.spring.AnnotationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DubboBaseConfig {


    /**
     * 设置dubbo扫描包
     */
    @Bean
    public static AnnotationBean annotationBean() {
        AnnotationBean annotationBean = new AnnotationBean();
        annotationBean.setPackage("com.saras.template");
        return annotationBean;
    }

    /**
     * 注入dubbo上下文
     */
    @Bean
    public ApplicationConfig applicationConfig() {
        // 当前应用配置
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName("template");
        return applicationConfig;
    }

    /**
     * 注入dubbo注册中心配置,基于zookeeper
     */
    @Bean
    public RegistryConfig registryConfig() {
        // 连接注册中心配置
        RegistryConfig registry = new RegistryConfig();
        registry.setProtocol("zookeeper");
        registry.setAddress("127.0.0.1:2181");
        return registry;
    }

    /**
     * 默认基于dubbo协议提供服务
     *
     * @return
     */
    @Bean
    public ProtocolConfig protocolConfig() {
        // 服务提供者协议配置
        ProtocolConfig protocolConfig = new ProtocolConfig();
        protocolConfig.setName("dubbo");
        protocolConfig.setPort(20880);
        protocolConfig.setThreads(200);
        return protocolConfig;
    }

    /**
     * dubbo服务提供
     */
    @Bean
    public ProviderConfig providerConfig(ApplicationConfig applicationConfig, RegistryConfig registryConfig, ProtocolConfig protocolConfig) {
        ProviderConfig providerConfig = new ProviderConfig();
        providerConfig.setTimeout(3000);
        providerConfig.setRetries(1);
        providerConfig.setDelay(-1);
        providerConfig.setApplication(applicationConfig);
        providerConfig.setRegistry(registryConfig);
        providerConfig.setProtocol(protocolConfig);
        return providerConfig;
    }
}

创建TestFacadeImpl.java实现facade的接口TestFacade


package com.saras.template;

import com.alibaba.dubbo.config.annotation.Service;
import com.saras.template.core.dubbo.DubboRemoteProxyFacotry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

@Service(version = "1.0")
public class TestFacadeImpl implements TestFacade {
    private final static Logger logger = LoggerFactory.getLogger(TestFacadeImpl.class);

    @Autowired
    private DubboRemoteProxyFacotry dubboRemoteProxyFacotry;

    @Override
    public String test(String message) {
        logger.info("收到dubbo请求测试信息:{}", message);
        return "已收到";
    }
}


PS:这里的@Service 不是Spring的是dubbo的

然后在template2项目中,引入之前deploy到maven私库的template-facade jar包

创建DubboBaseConfig.java

package com.saras.template2.config;

import com.alibaba.dubbo.config.*;
import com.alibaba.dubbo.config.spring.AnnotationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DubboBaseConfig {


    /**
     * 设置dubbo扫描包
     */
    @Bean
    public static AnnotationBean annotationBean() {
        AnnotationBean annotationBean = new AnnotationBean();
        annotationBean.setPackage("com.saras.template2");
        return annotationBean;
    }

    /**
     * 注入dubbo上下文
     */
    @Bean
    public ApplicationConfig applicationConfig() {
        // 当前应用配置
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName("template2");
        return applicationConfig;
    }

    /**
     * 注入dubbo注册中心配置,基于zookeeper
     */
    @Bean
    public RegistryConfig registryConfig() {
        // 连接注册中心配置
        RegistryConfig registry = new RegistryConfig();
        registry.setProtocol("zookeeper");
        registry.setAddress("127.0.0.1:2181");
        return registry;
    }

    /**
     * 默认基于dubbo协议提供服务
     *
     * @return
     */
    @Bean
    public ProtocolConfig protocolConfig() {
        // 服务提供者协议配置
        ProtocolConfig protocolConfig = new ProtocolConfig();
        protocolConfig.setName("dubbo");
        protocolConfig.setPort(20881);
        protocolConfig.setThreads(200);
        return protocolConfig;
    }

    /**
     * dubbo服务提供
     */
    @Bean
    public ProviderConfig providerConfig(ApplicationConfig applicationConfig, RegistryConfig registryConfig, ProtocolConfig protocolConfig) {
        ProviderConfig providerConfig = new ProviderConfig();
        providerConfig.setTimeout(3000);
        providerConfig.setRetries(1);
        providerConfig.setDelay(-1);
        providerConfig.setApplication(applicationConfig);
        providerConfig.setRegistry(registryConfig);
        providerConfig.setProtocol(protocolConfig);
        return providerConfig;
    }


}


内容基本和template中的差不多,只是改变了扫描路径和dubbo端口号

创建TestController.java


package com.sras.template2;

import com.alibaba.dubbo.config.annotation.Reference;
import com.saras.template.TestFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class TestController {
    private final static Logger logger = LoggerFactory.getLogger(TestController.class);
    @Reference(version = "1.0")
    private TestFacade testFacade;

    @RequestMapping("dubbo")
    @ResponseBody
    public Object dubbo() {
        String message = testFacade.test("dubbo请求测试");
        logger.info("dubbo接受结果信息:{}", message);
        String message2 = testFacadeClient.testAync("dubbo异步请求测试");
        logger.info("dubbo异步接受结果信息:{}", message2);
        return message;
    }
}


先启动本地的zookeeper

再启动template和template2,template2端口号为9095,所以访问localhost:9095/dubbo,如图所见:

Alt text

同时在template控制台可见:

Alt text

dubbo+zookeeper的配置和测试基本完成,系统间的交互也完成

不过我们还可以利用dubbo+zookeeper来通过group和version做异步通知

创建DubboRemoteProxyFactory.java


package com.saras.template.core.dubbo;

import com.alibaba.dubbo.config.ApplicationConfig;
import com.alibaba.dubbo.config.ReferenceConfig;
import com.alibaba.dubbo.config.RegistryConfig;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.saras.template.utils.ShutdownHooks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.Map;

@Configuration
public class DubboRemoteProxyFactory implements ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(DubboRemoteProxyFactory.class.getName());
    private ApplicationContext applicationContext;
    private static volatile Map<Key, ReferenceConfig> cache = Maps.newConcurrentMap();
    private static final int PROVIDER_TIME_OUT = -1;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public <T> T getProxy(Class<T> clazz, String group, String version, int timeout) {
        if ((Strings.isNullOrEmpty(version)) || (clazz == null)) {
            throw new RuntimeException("获取dubbo服务代理时版本号和类不能为空");
        }
        Key key = new Key(clazz, group, version);
        ReferenceConfig cachedReferenceConfig = cache.get(key);
        if (cachedReferenceConfig != null) {
            return (T) cachedReferenceConfig.get();
        }
        if (this.applicationContext == null) {
            throw new RuntimeException("请配置DubboRemoteProxyFactory到spring容器中");
        }
        synchronized (DubboRemoteProxyFactory.class) {
            cachedReferenceConfig = cache.get(key);
            if (cachedReferenceConfig == null) {
                ApplicationConfig applicationConfig = this.applicationContext.getBean(ApplicationConfig.class);
                Map registryConfigMap = this.applicationContext.getBeansOfType(RegistryConfig.class);

                if ((registryConfigMap == null) || (registryConfigMap.isEmpty())) {
                    throw new RuntimeException("请配置dubbo基本配置");
                }
                ReferenceConfig reference = new ReferenceConfig();
                reference.setApplication(applicationConfig);
                reference.setRegistries(new ArrayList(registryConfigMap.values()));
                reference.setInterface(clazz);
                reference.setVersion(version);
                reference.setGroup(group);
                if (-1 != timeout) {
                    reference.setTimeout(timeout);
                }
                try {
                    reference.get();
                } catch (Exception e) {
                    logger.error("获取dubbo服务失败:{}", e.getMessage());
                    try {
                        reference.destroy();
                    } catch (Exception e1) {
                        logger.error("获取dubbo服务失败,销毁Invoker失败:{}", e1.getMessage());
                        throw new RuntimeException("获取dubbo服务失败,销毁Invoker失败:" + e.getMessage() + ",cause:" + e.getMessage());
                    }

                    throw new RuntimeException("获取dubbo服务失败:" + e.getMessage());
                }
                cache.put(key, reference);
                cachedReferenceConfig = reference;
            }
            return (T) cachedReferenceConfig.get();
        }
    }

    public <T> T getProxy(Class<T> clazz, String group, String version) {
        return getProxy(clazz, group, version, -1);
    }

    static {
        ShutdownHooks.addShutdownHook(() -> {
            for (ReferenceConfig referenceConfig : DubboRemoteProxyFactory.cache.values()) {
                try {
                    referenceConfig.destroy();
                } catch (Exception e) {
                    DubboRemoteProxyFactory.logger.error("{}销毁异常", referenceConfig);
                }
            }
            DubboRemoteProxyFactory.cache.clear();
        }, "DubboRemoteProxyFacotryShutdownHook");
    }

    private static class Key {
        private Class<?> clazz;
        private String group;
        private String version;

        private Key(Class<?> clazz, String group, String version) {
            this.clazz = clazz;
            this.group = group;
            this.version = version;
        }

        public Class<?> getClazz() {
            return this.clazz;
        }

        public void setClazz(Class<?> clazz) {
            this.clazz = clazz;
        }

        public String getGroup() {
            return this.group;
        }

        public void setGroup(String group) {
            this.group = group;
        }

        public String getVersion() {
            return this.version;
        }

        public void setVersion(String version) {
            this.version = version;
        }

        public boolean equals(Object o) {
            if (this == o)
                return true;
            if ((o == null) || (getClass() != o.getClass())) {
                return false;
            }
            Key key = (Key) o;

            if (this.clazz != null ? !this.clazz.equals(key.clazz) : key.clazz != null)
                return false;
            if (this.group != null ? !this.group.equals(key.group) : key.group != null)
                return false;
            if (this.version != null ? !this.version.equals(key.version) : key.version != null) {
                return false;
            }
            return true;
        }

        public int hashCode() {
            int result = this.clazz != null ? this.clazz.hashCode() : 0;
            result = 31 * result + (this.group != null ? this.group.hashCode() : 0);
            result = 31 * result + (this.version != null ? this.version.hashCode() : 0);
            return result;
        }
    }
}


在template中的TestFacade加入一个方法,如下:


package com.saras.template;

public interface TestFacade {
    String test(String message);

    String testAync(String message, String group, String version);

}


在TestFacadeImpl中实现

package com.saras.template;

import com.alibaba.dubbo.config.annotation.Service;
import com.saras.template.core.dubbo.DubboRemoteProxyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

@Service(version = "1.0")
public class TestFacadeImpl implements TestFacade {
    private final static Logger logger = LoggerFactory.getLogger(TestFacadeImpl.class);

    @Autowired
    private DubboRemoteProxyFactory dubboRemoteProxyFactory;

    @Override
    public String test(String message) {
        logger.info("收到dubbo请求测试信息:{}", message);
        return "template已收到";
    }

    @Override
    public String testAync(String message, String group, String version) {
        logger.info("收到dubbo请求测试信息:{}", message);
        TestNotifyFacade testNotifyFacade = dubboRemoteProxyFactory.getProxy(TestNotifyFacade.class, group, version);
        String result = testNotifyFacade.notify("通知template2消息:天天向上!");
        logger.info("异步通知结果:{}", result);
        return "等待异步通知消息";
    }
}


再创建TestNotifyFacade.java


package com.saras.template;

public interface TestNotifyFacade {

    String notify(String message);
}

然后重新将facade deploy到maven私库,template2重新拉包一下

在template2中创建TestAyncImpl.java 实现 facade中的接口 TestNotifyFacade

package com.saras.template2;

import com.alibaba.dubbo.config.annotation.Service;
import com.saras.template.TestNotifyFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service(version = "1.0", group = "com.saras.template2.TestAyncImpl")
public class TestAyncImpl implements TestNotifyFacade {
    private final static Logger logger = LoggerFactory.getLogger(TestAyncImpl.class);

    @Override
    public String notify(String s) {
        logger.info("收到异步通知:{}", s);
        return "返回处理结果告知template";
    }
}


最后在TestController中加入


 @RequestMapping("dubboAync")
    @ResponseBody
    public Object dubboAync() {
        String message = testFacade.testAync("dubbo异步通知测试请求", "com.saras.template2.TestAyncImpl", "1.0");
        logger.info("dubbo同步接受结果信息:{}", message);
        return message;
    }

重启template和template2,访问localhost:9095/dubboAync 可如图所见:

Alt text

在template控制台可见:

Alt text

在template2控制台可见:

Alt text

这个机制可以在template2请求template时,template对于这个请求的处理可能会存在后续的变化,那么可以将group和version保存下来,等到template处理完成之后,再去告知template2这个请求的后续处理结果,达到异步的机制。因为懒,所以就做了看似同步的处理,其实可以用redis保存group和version,再去执行异步通知。

结尾

基本上到现在SpringBoot集成redis、rabbitMQ、dubbo+zookeeper就完成了。内容不是很多,但可以慢慢食用。

template项目地址: https://github.com/SarasXu/ProjectTemplate