1. 参考
flea-cache使用之Redis集群模式接入 源代码
2. 依赖
jedis-3.0.1.jar
1 2 3 4 5 6
| <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.0.1</version> </dependency>
|
spring-context-4.3.18.RELEASE.jar
1 2 3 4 5 6
| <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.18.RELEASE</version> </dependency>
|
spring-context-support-4.3.18.RELEASE.jar
1 2 3 4 5
| <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>4.3.18.RELEASE</version> </dependency>
|
3. 基础接入
3.1 定义Flea缓存接口
IFleaCache 可参考笔者的这篇博文 Memcached接入,不再赘述。
3.2 定义抽象Flea缓存类
AbstractFleaCache 可参考笔者的这篇博文 Memcached接入,不再赘述。
3.3 定义Redis客户端接口类
RedisClient 定义了 读、写、删除 Redis缓存的基本操作方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
public interface RedisClient {
String set(final String key, final Object value);
String set(final byte[] key, final byte[] value);
String set(final String key, final Object value, final int expiry);
String set(final byte[] key, final byte[] value, final int expiry);
String set(final String key, final Object value, final long expiry);
String set(final byte[] key, final byte[] value, final long expiry);
String set(final String key, final Object value, final SetParams params);
String set(final byte[] key, final byte[] value, final SetParams params);
Object get(final String key);
byte[] get(final byte[] key);
Long del(final String key);
String getLocation(final String key); String getLocation(final byte[] key);
String getHost(final String key);
String getHost(final byte[] key);
Integer getPort(final String key);
Integer getPort(final byte[] key);
Client getClient(final String key);
Client getClient(final byte[] key);
String getPoolName(); }
|
3.4 定义集群模式Redis客户端实现类
FleaRedisClusterClient 即Flea集群模式Redis客户端实现,封装了Flea框架操作Redis缓存的基本操作。它内部具体操作Redis集群缓存的功能,由Jedis集群实例对象 JedisCluster
完成, 包含读、写、删除Redis缓存的基本操作方法。
集群模式下,单个缓存接入场景,可通过如下方式使用:
1 2 3
| RedisClient redisClient = new FleaRedisClusterClient.Builder().build();
redisClient.set("key", "value");
|
集群模式下,整合缓存接入场景,可通过如下方式使用:
1 2 3
| RedisClient redisClient = new FleaRedisClusterClient.Builder(poolName).build();
redisClient.set("key", "value");
|
当然每次都新建Redis客户端显然不可取,我们可通过Redis客户端工厂获取Redis客户端。
集群模式下,单个缓存接入场景,可通过如下方式使用:
1
| RedisClient redisClient = RedisClientFactory.getInstance(CacheModeEnum.CLUSTER);
|
集群模式下,整合缓存接入场景,可通过如下方式使用:
1
| RedisClient redisClient = RedisClientFactory.getInstance(poolName, CacheModeEnum.CLUSTER);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| public class FleaRedisClusterClient extends FleaRedisClient {
private JedisCluster jedisCluster;
private FleaRedisClusterClient() { this(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME); }
private FleaRedisClusterClient(String poolName) { super(poolName); init(); }
private void init() { if (CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME.equals(getPoolName())) { jedisCluster = RedisClusterPool.getInstance().getJedisCluster(); } else { jedisCluster = RedisClusterPool.getInstance(getPoolName()).getJedisCluster(); }
}
@Override public String set(String key, Object value) { if (value instanceof String) return jedisCluster.set(key, (String) value); else return jedisCluster.set(SafeEncoder.encode(key), ObjectUtils.serialize(value)); }
public static class Builder {
private String poolName;
public Builder() { }
public Builder(String poolName) { this.poolName = poolName; }
public RedisClient build() { if (StringUtils.isBlank(poolName)) { return new FleaRedisClusterClient(); } else { return new FleaRedisClusterClient(poolName); } } } }
|
该类的构造函数初始化逻辑,可以看出我们使用了 RedisClusterPool
, 下面来介绍一下。
3.5 定义Redis集群连接池
我们使用 RedisClusterPool 来初始化 Redis集群相关配置信息,其中重点是获取Jedis集群实例对象 JedisCluster
,该类其中一个构造方法如下:
1 2 3 4
| public JedisCluster(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, int maxAttempts, String password, String clientName, final GenericObjectPoolConfig poolConfig) { super(jedisClusterNode, connectionTimeout, soTimeout, maxAttempts, password, clientName, poolConfig); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
public class RedisClusterPool {
private static final ConcurrentMap<String, RedisClusterPool> redisClusterPools = new ConcurrentHashMap<>();
private static final Object redisClusterPoolLock = new Object();
private String poolName;
private JedisCluster jedisCluster;
private RedisClusterPool(String poolName) { this.poolName = poolName; }
public static RedisClusterPool getInstance() { return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME); }
public static RedisClusterPool getInstance(String poolName) { if (!redisClusterPools.containsKey(poolName)) { synchronized (redisClusterPoolLock) { if (!redisClusterPools.containsKey(poolName)) { RedisClusterPool redisClusterPool = new RedisClusterPool(poolName); redisClusterPools.putIfAbsent(poolName, redisClusterPool); } } } return redisClusterPools.get(poolName); }
public void initialize() { }
public void initialize(List<CacheServer> cacheServerList) { }
public JedisCluster getJedisCluster() { if (ObjectUtils.isEmpty(jedisCluster)) { throw new FleaCacheConfigException("获取Jedis集群实例对象失败:请先调用initialize初始化"); } return jedisCluster; } }
|
3.6 定义Redis集群配置文件
flea-cache 读取 redis.cluster.properties(Redis集群配置文件),用作初始化 RedisClusterPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| redis.cluster.switch=0
redis.systemName=FleaFrame
redis.cluster.server=127.0.0.1:20011,127.0.0.1:20012,127.0.0.1:20021,127.0.0.1:20022,127.0.0.1:20031,127.0.0.1:20032
redis.cluster.password=huazie123
redis.cluster.connectionTimeout=2000
redis.cluster.soTimeout=2000
redis.pool.maxTotal=100
redis.pool.maxIdle=10
redis.pool.minIdle=0
redis.pool.maxWaitMillis=2000
redis.maxAttempts=5
redis.nullCacheExpiry=10
|
redis.cluster.switch
: Redis集群配置开关(1:开启 0:关闭),如果不配置也默认开启
redis.systemName
: Redis缓存所属系统名
redis.cluster.server
: Redis集群服务节点地址
redis.cluster.password
: Redis集群服务节点登录密码(集群各节点配置同一个)
redis.cluster.connectionTimeout
: Redis集群客户端socket连接超时时间(单位:ms)
redis.cluster.soTimeout
: Redis集群客户端socket读写超时时间(单位:ms)
redis.pool.maxTotal
: Jedis连接池最大连接数
redis.pool.maxIdle
: Jedis连接池最大空闲连接数
redis.pool.minIdle
: Jedis连接池最小空闲连接数
redis.pool.maxWaitMillis
: Jedis连接池获取连接时的最大等待时间(单位:ms)
redis.maxAttempts
: Redis客户端操作最大尝试次数【包含第一次操作】
redis.nullCacheExpiry
: 空缓存数据有效期(单位:s)
3.7 定义Redis Flea缓存类
RedisFleaCache 可参考笔者的这篇博文 Redis分片模式接入,不再赘述。
3.8 定义抽象Flea缓存管理类
AbstractFleaCacheManager 可参考笔者的这篇博文 Memcached接入,不再赘述。
3.9 定义Redis集群模式Flea缓存管理类
RedisClusterFleaCacheManager 继承抽象Flea缓存管理类 AbstractFleaCacheManager
,用于接入Flea框架管理Redis缓存。
它的默认构造方法,用于初始化集群模式下默认连接池的Redis客户端, 这里需要先初始化Redis连接池,默认连接池名为【default
】; 然后通过 RedisClientFactory
获取集群模式下默认连接池的Redis客户端 RedisClient
,可在 3.10 查看。
方法 newCache
用于创建一个 RedisFleaCache
的实例对象,它里面包含了 读、写、删除 和 清空 缓存的基本操作,每一类 Redis 缓存数据都对应了一个 RedisFleaCache
的实例对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class RedisClusterFleaCacheManager extends AbstractFleaCacheManager {
private RedisClient redisClient;
public RedisClusterFleaCacheManager() { RedisClusterPool.getInstance().initialize(); redisClient = RedisClientFactory.getInstance(CacheModeEnum.CLUSTER); }
@Override protected AbstractFleaCache newCache(String name, int expiry) { int nullCacheExpiry = RedisClusterConfig.getConfig().getNullCacheExpiry(); return new RedisFleaCache(name, expiry, nullCacheExpiry, CacheModeEnum.CLUSTER, redisClient); } }
|
3.10 定义Redis客户端工厂类
RedisClientFactory ,有四种方式获取 Redis 客户端:
- 一是获取分片模式下默认连接池的 Redis 客户端,应用在单个缓存接入场景;
- 二是获取指定模式下默认连接池的 Redis 客户端,应用在单个缓存接入场景【3.9 采用】;
- 三是获取分片模式下指定连接池的 Redis 客户端,应用在整合缓存接入场景;
- 四是获取指定模式下指定连接池的 Redis 客户端,应用在整合缓存接入场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
public class RedisClientFactory {
private static final ConcurrentMap<String, RedisClient> redisClients = new ConcurrentHashMap<>();
private static final Object redisClientLock = new Object();
private RedisClientFactory() { }
public static RedisClient getInstance() { return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME); }
public static RedisClient getInstance(CacheModeEnum mode) { return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME, mode); }
public static RedisClient getInstance(String poolName) { return getInstance(poolName, CacheModeEnum.SHARDED); }
public static RedisClient getInstance(String poolName, CacheModeEnum mode) { String key = StringUtils.strCat(poolName, CommonConstants.SymbolConstants.UNDERLINE, StringUtils.valueOf(mode.getMode())); if (!redisClients.containsKey(key)) { synchronized (redisClientLock) { if (!redisClients.containsKey(key)) { RedisClientStrategyContext context = new RedisClientStrategyContext(poolName); redisClients.putIfAbsent(key, FleaStrategyFacade.invoke(mode.name(), context)); } } } return redisClients.get(key); } }
|
在上面 的 getInstance(String poolName, CacheModeEnum mode)
方法中,使用了 RedisClientStrategyContext ,用于定义 Redis 客户端策略上下文。根据不同的缓存模式,就可以找到对应的 Redis 客户端策略。
3.11 定义Redis客户端策略上下文
RedisClientStrategyContext 可参考笔者的这篇博文 Redis分片模式接入,不再赘述。
3.12 定义集群模式Redis客户端策略
RedisClusterClientStrategy 用于新建一个 Flea Redis
集群客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
public class RedisClusterClientStrategy implements IFleaStrategy<RedisClient, String> {
@Override public RedisClient execute(String poolName) throws FleaStrategyException { RedisClient originRedisClient; if (CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME.equals(poolName)) { originRedisClient = new FleaRedisClusterClient.Builder().build(); } else { originRedisClient = new FleaRedisClusterClient.Builder(poolName).build(); } return originRedisClient; } }
|
好了,到这里我们可以来测试 Redis 集群模式。
3.13 Redis集群模式接入自测
单元测试类 FleaCacheTest
首先,这里需要按照 Redis集群配置文件 中的地址部署相应的 Redis集群 服务,后续有机会我再出一篇简单的Redis主从集群搭建博文。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Test public void testRedisClusterFleaCache() { try { AbstractFleaCacheManager manager = FleaCacheManagerFactory.getFleaCacheManager(CacheEnum.RedisCluster.getName()); AbstractFleaCache cache = manager.getCache("fleamenufavorites"); LOGGER.debug("Cache={}", cache);
cache.clear(); cache.getCacheKey(); LOGGER.debug(cache.getCacheName() + ">>>" + cache.getCacheDesc()); } catch (Exception e) { LOGGER.error("Exception:", e); } }
|
4. 进阶接入
4.1 定义抽象Spring缓存
AbstractSpringCache 可参考笔者的这篇博文 Memcached接入,不再赘述。
4.2 定义Redis Spring缓存类
RedisSpringCache 可参考笔者的这篇博文 Redis分片模式接入,不再赘述。
4.3 定义抽象Spring缓存管理类
AbstractSpringCacheManager 可参考笔者的这篇博文 Memcached接入,不再赘述。
4.4 定义Redis集群模式Spring缓存管理类
RedisClusterSpringCacheManager 继承抽象 Spring 缓存管理类 AbstractSpringCacheManager
,用于接入Spring框架管理Redis缓存; 基本实现同 RedisClusterFleaCacheManager,唯一不同在于 newCache 的实现。
它的默认构造方法,用于初始化集群模式下默认连接池的Redis客户端, 这里需要先初始化Redis连接池,默认连接池名为【default
】; 然后通过Redis客户端工厂类来获取Redis客户端。
方法【newCache
】用于创建一个Redis Spring缓存, 而它内部是由Redis Flea缓存实现具体的 读、写、删除 和 清空 缓存的基本操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class RedisClusterSpringCacheManager extends AbstractSpringCacheManager {
private RedisClient redisClient;
public RedisClusterSpringCacheManager() { RedisClusterPool.getInstance().initialize(); redisClient = RedisClientFactory.getInstance(CacheModeEnum.CLUSTER); }
@Override protected AbstractSpringCache newCache(String name, int expiry) { int nullCacheExpiry = RedisClusterConfig.getConfig().getNullCacheExpiry(); return new RedisSpringCache(name, expiry, nullCacheExpiry, CacheModeEnum.CLUSTER, redisClient); }
}
|
4.5 spring 配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
<bean id="redisClusterSpringCacheManager" class="com.huazie.fleaframework.cache.redis.manager.RedisClusterSpringCacheManager"> <property name="configMap"> <map> <entry key="fleamenufavorites" value="100"/> </map> </property> </bean>
<cache:annotation-driven cache-manager="redisClusterSpringCacheManager" proxy-target-class="true"/>
|
4.6 缓存自测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| private ApplicationContext applicationContext;
@Before public void init() { applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); LOGGER.debug("ApplicationContext={}", applicationContext); }
@Test public void testRedisClusterSpringCache() { try { AbstractSpringCacheManager manager = (RedisClusterSpringCacheManager) applicationContext.getBean("redisClusterSpringCacheManager"); AbstractSpringCache cache = manager.getCache("fleamenufavorites"); LOGGER.debug("Cache = {}", cache);
} catch (Exception e) { LOGGER.error("Exception:", e); } }
|
结语
哇哇哇,Redis 集群模式接入终于搞定。到目前为止,不论是Memcached的接入还是 Redis分片模式接入亦或是本篇,都是单一的缓存接入,笔者的 下一篇博文 将介绍如何 整合Memcached和Redis接入,以应对日益复杂的业务需求。 敬请期待!!!