1. 参考
flea-frame-cache使用之Redis接入 源代码v1.0.0
2. 依赖
jedis-3.0.1.jar
1 2 3 4 5 6
| <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.0.1</version> </dependency>
|
spring-context-4.3.18.RELEASE.jar
1 2 3 4 5 6
| <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.18.RELEASE</version> </dependency>
|
spring-context-support-4.3.18.RELEASE.jar
1 2 3 4 5
| <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>4.3.18.RELEASE</version> </dependency>
|
3. 基础接入
3.1 定义Flea缓存接口 — IFleaCache
可参考笔者的这篇博文 Memcached接入,不再赘述。
3.2 定义抽象Flea缓存类 — AbstractFleaCache
可参考笔者的这篇博文 Memcached接入,不再赘述。
3.3 定义Redis客户端接口类 — RedisClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
|
public interface RedisClient {
String set(final String key, final String value);
String set(final byte[] key, final byte[] value);
String set(final String key, final String value, final int expiry);
String set(final byte[] key, final byte[] value, final int expiry);
String set(final String key, final String value, final long expiry);
String set(final byte[] key, final byte[] value, final long expiry);
String set(final String key, final String value, SetParams params);
String set(final byte[] key, final byte[] value, SetParams params);
String get(final String key);
byte[] get(final byte[] key);
Long del(final String key);
String getLocation(final String key);
String getLocation(final byte[] key);
String getHost(final String key);
String getHost(final byte[] key);
Integer getPort(final String key);
Integer getPort(final byte[] key);
Client getClient(final String key);
Client getClient(final byte[] key);
ShardedJedisPool getJedisPool();
void setShardedJedis(ShardedJedis shardedJedis);
ShardedJedis getShardedJedis();
String getPoolName();
void setPoolName(String poolName); }
|
3.4 定义Redis客户端实现类 — FleaRedisClient
该类实现 RedisClient 接口, 其中分布式Jedis连接池 ShardedJedisPool 用于获取分布式Jedis对象 ShardedJedis, ShardedJedis可以自行根据初始化的算法,计算当前传入的数据键在某一台初始化的 Redis 服务器上,从而操作对数据的添加,查找,删除功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
|
public class FleaRedisClient implements RedisClient {
private ShardedJedisPool shardedJedisPool;
private ShardedJedis shardedJedis;
private String poolName;
private FleaRedisClient() { this(null); }
private FleaRedisClient(String poolName) { this.poolName = poolName; init(); }
private void init() { if (StringUtils.isBlank(poolName)) { poolName = CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME; shardedJedisPool = RedisPool.getInstance().getJedisPool(); } else { shardedJedisPool = RedisPool.getInstance(poolName).getJedisPool(); }
}
@Override public String set(final String key, final String value) { return shardedJedis.set(key, value); }
@Override public String set(byte[] key, byte[] value) { return shardedJedis.set(key, value); }
@Override public String set(final String key, final String value, final int expiry) { return shardedJedis.setex(key, expiry, value); }
@Override public String set(byte[] key, byte[] value, int expiry) { return shardedJedis.setex(key, expiry, value); }
@Override public String set(String key, String value, long expiry) { return shardedJedis.psetex(key, expiry, value); }
@Override public String set(byte[] key, byte[] value, long expiry) { return shardedJedis.psetex(key, expiry, value); }
@Override public String set(final String key, final String value, final SetParams params) { return shardedJedis.set(key, value, params); }
@Override public String set(byte[] key, byte[] value, SetParams params) { return shardedJedis.set(key, value, params); }
@Override public String get(final String key) { return shardedJedis.get(key); }
@Override public byte[] get(byte[] key) { return shardedJedis.get(key); }
@Override public Long del(final String key) { return shardedJedis.del(key); }
@Override public String getLocation(final String key) { return getLocationByKey(key); }
@Override public String getLocation(byte[] key) { return getLocationByKey(key); }
@Override public String getHost(final String key) { return getHostByKey(key); }
@Override public String getHost(byte[] key) { return getHostByKey(key); }
@Override public Integer getPort(final String key) { return getPortByKey(key); }
@Override public Integer getPort(byte[] key) { return getPortByKey(key); }
@Override public Client getClient(String key) { return getClientByKey(key); }
@Override public Client getClient(byte[] key) { return getClientByKey(key); }
private String getLocationByKey(Object key) { StringBuilder location = new StringBuilder(); Client client = getClientByKey(key); if (ObjectUtils.isNotEmpty(client)) { location.append(client.getHost()).append(CommonConstants.SymbolConstants.COLON).append(client.getPort()); } return location.toString(); }
private String getHostByKey(Object key) { Client client = getClientByKey(key); if (ObjectUtils.isNotEmpty(client)) { return client.getHost(); } return null; }
private Integer getPortByKey(Object key) { Client client = getClientByKey(key); if (ObjectUtils.isNotEmpty(client)) { return client.getPort(); } return null; }
private Client getClientByKey(Object key) { Client client = null; if (ObjectUtils.isNotEmpty(key)) { if (key instanceof String) { client = shardedJedis.getShard(key.toString()).getClient(); } else if (key instanceof byte[]) { client = shardedJedis.getShard((byte[]) key).getClient(); } } return client; }
@Override public ShardedJedisPool getJedisPool() { return shardedJedisPool; }
@Override public void setShardedJedis(ShardedJedis shardedJedis) { this.shardedJedis = shardedJedis; }
@Override public ShardedJedis getShardedJedis() { return shardedJedis; }
@Override public String getPoolName() { return poolName; }
@Override public void setPoolName(String poolName) { this.poolName = poolName; init(); }
static class Builder { } }
|
该类的构造函数初始化逻辑,可以看出我们使用了 RedisPool, 下面来介绍一下。
3.5 定义Redis连接池 — RedisPool
RedisPool 用于Redis相关配置信息的初始化,其中重点是获取分布式Jedis连接池 ShardedJedisPool ,该类其中一个构造方法如下:
1 2 3 4 5 6 7
|
public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards, Hashing algo)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
public class RedisPool {
private static final ConcurrentMap<String, RedisPool> redisPools = new ConcurrentHashMap<>();
private String poolName;
private ShardedJedisPool shardedJedisPool;
private RedisPool(String poolName) { this.poolName = poolName; }
public static RedisPool getInstance(String poolName) { if (!redisPools.containsKey(poolName)) { synchronized (redisPools) { if (!redisPools.containsKey(poolName)) { RedisPool redisPool = new RedisPool(poolName); redisPools.putIfAbsent(poolName, redisPool); } } } return redisPools.get(poolName); }
public static RedisPool getInstance() { return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME); }
void initialize() { }
void initialize(List<CacheServer> cacheServerList, CacheParams cacheParams) { }
public ShardedJedisPool getJedisPool() { if (ObjectUtils.isEmpty(shardedJedisPool)) { throw new RuntimeException("获取分布式Redis集群客户端连接池失败:请先调用initialize初始化"); } return shardedJedisPool; }
public String getPoolName() { return poolName; } }
|
3.6 Redis配置文件
flea-frame-cache 读取 redis.properties(Redis 配置文件),用作初始化 RedisPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
redis.systemName=FleaFrame
redis.server=127.0.0.1:10001,127.0.0.1:10002,127.0.0.1:10003
redis.password=huazie123,huazie123,huazie123
redis.weight=1,1,1
redis.connectionTimeout=2000
redis.soTimeout=2000
redis.hashingAlg=1
redis.pool.maxTotal=100
redis.pool.maxIdle=10
redis.pool.minIdle=0
redis.pool.maxWaitMillis=2000
|
3.7 定义Redis Flea缓存类 — RedisFleaCache
该类继承抽象Flea缓存类 AbstractFleaCache ,其构造方法可见如需要传入Redis客户端 RedisClient ,相关使用下面介绍:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
public class RedisFleaCache extends AbstractFleaCache {
private RedisClient redisClient;
public RedisFleaCache(String name, long expiry, RedisClient redisClient) { super(name, expiry); this.redisClient = redisClient; cache = CacheEnum.Redis; }
@Override public Object getNativeValue(String key) { if (LOGGER.isDebugEnabled()) { LOGGER.debug1(new Object() {}, "KEY = {}", key); } return ObjectUtils.deserialize(redisClient.get(key.getBytes())); }
@Override public void putNativeValue(String key, Object value, long expiry) { if (ObjectUtils.isNotEmpty(value)) { byte[] valueBytes = ObjectUtils.serialize(value); if (expiry == CommonConstants.NumeralConstants.ZERO) { redisClient.set(key.getBytes(), valueBytes); } else { redisClient.set(key.getBytes(), valueBytes, (int) expiry); } }
}
@Override public void deleteNativeValue(String key) { redisClient.del(key); }
@Override public String getSystemName() { return RedisConfig.getConfig().getSystemName(); } }
|
3.8 定义抽象Flea缓存管理类 — AbstractFleaCacheManager
可参考笔者的这篇博文 Memcached接入,不再赘述。
3.9 定义Redis Flea缓存管理类 — RedisFleaCacheManager
该类继承抽象Flea缓存管理类 AbstractFleaCacheManager,构造方法使用了Redis 客户端代理类 RedisClientProxy 获取Redis客户端 RedisClient,可在 3.10 查看。newCache 方法返回的是 RedisFleaCache 的实例对象,每一类 Redis 缓存数据都对应了一个 RedisFleaCache 的实例对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
public class RedisFleaCacheManager extends AbstractFleaCacheManager {
private RedisClient redisClient;
public RedisFleaCacheManager() { RedisPool.getInstance().initialize(); redisClient = RedisClientProxy.getProxyInstance(); }
@Override protected AbstractFleaCache newCache(String name, long expiry) { return new RedisFleaCache(name, expiry, redisClient); } }
|
3.10 定义Redis客户端代理类 — RedisClientProxy
Redis 客户端代理类 RedisClientProxy 中 getProxyInstance() 返回 默认连接池的 Redis 客户端,getProxyInstance(String poolName) 返回 指定连接池的 Redis 客户端。它们返回的都是 Redis 客户端接口类 RedisClient ,实际代理的是 Flea Redis 客户端 FleaRedisClient。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
public class RedisClientProxy extends FleaProxy<RedisClient> {
private final static ConcurrentMap<String, RedisClient> redisClients = new ConcurrentHashMap<String, RedisClient>();
public static RedisClient getProxyInstance() { return getProxyInstance(CacheConstants.FleaCacheConstants.DEFAUTL_POOL_NAME); }
public static RedisClient getProxyInstance(String poolName) { if (!redisClients.containsKey(poolName)) { synchronized (redisClients) { if (!redisClients.containsKey(poolName)) { RedisClient originRedisClient; if(CacheConstants.FleaCacheConstants.DEFAUTL_POOL_NAME.equals(poolName)) { originRedisClient = new FleaRedisClient(); } else { originRedisClient = new FleaRedisClient(poolName); } RedisClient proxyRedisClient = newProxyInstance(originRedisClient.getClass().getClassLoader(), originRedisClient.getClass().getInterfaces(), new RedisClientInvocationHandler(originRedisClient)); redisClients.put(poolName, proxyRedisClient); } } } return redisClients.get(poolName); } }
|
3.11 定义Redis客户端调用处理类 — RedisClientInvocationHandler
该类在 RedisClientProxy 中被调用,用于添加 Flea Redis 客户端类相应方法被代理调用前后的自定义操作,包含了代理拦截器 RedisClientProxyInterceptor 和 FleaDebugProxyInterceptor ,异常代理拦截器 FleaErrorProxyInterceptor。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
public class RedisClientInvocationHandler extends FleaProxyHandler {
private static List<IFleaProxyInterceptor> proxyInterceptors;
private static IFleaExceptionProxyInterceptor exceptionProxyInterceptor;
static { proxyInterceptors = new ArrayList<>(); proxyInterceptors.add(new RedisClientProxyInterceptor()); proxyInterceptors.add(new FleaDebugProxyInterceptor()); exceptionProxyInterceptor = new FleaErrorProxyInterceptor(); }
public RedisClientInvocationHandler(Object proxyObject) { super(proxyObject, proxyInterceptors, exceptionProxyInterceptor); } }
|
3.12 定义Redis客户端代理拦截器 — RedisClientProxyInterceptor
RedisClientProxyInterceptor 主要实现代理前的分布式 Jedis 对象 ShardedJedis 的获取,即方法 beforeHandle ;代理后的分布式 Jedis 对象 ShardedJedis 的关闭,归还相关资源给分布式 Jedis 连接池 ShardedJedisPool,即方法 afterHandle。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
public class RedisClientProxyInterceptor implements IFleaProxyInterceptor {
@Override public void beforeHandle(Object proxyObject, Method method, Object[] args) throws Exception { RedisClient redisClient = convertProxyObject(proxyObject); ShardedJedisPool jedisPool = redisClient.getJedisPool(); if (ObjectUtils.isNotEmpty(jedisPool)) { redisClient.setShardedJedis(jedisPool.getResource()); } }
@Override public void afterHandle(Object proxyObject, Method method, Object[] args, Object result, boolean hasException) throws Exception { RedisClient redisClient = convertProxyObject(proxyObject); ShardedJedis shardedJedis = redisClient.getShardedJedis(); if (ObjectUtils.isNotEmpty(shardedJedis)) { shardedJedis.close(); } }
private RedisClient convertProxyObject(Object proxyObject) throws Exception { if (!(proxyObject instanceof RedisClient)) { throw new Exception("The proxyObject must implement RedisClient interface"); } return (RedisClient) proxyObject; } }
|
哇,终于Redis接入差不多要完成了,下面一起开始启动单元测试吧
3.13 Redis接入自测 — FleaCacheTest
首先,这里需要按照 Redis 配置文件中的地址部署相应的 Redis 服务,可参考笔者的 这篇博文。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Test public void testRedisFleaCache() { try { AbstractFleaCacheManager manager = FleaCacheManagerFactory.getFleaCacheManager(CacheEnum.Redis.getName()); AbstractFleaCache cache = manager.getCache("fleaparadetail"); LOGGER.debug("Cache={}", cache);
cache.get("menu1");
cache.getCacheKey(); LOGGER.debug(cache.getCacheName() + ">>>" + cache.getCacheDesc()); } catch (Exception e) { LOGGER.error("Exception:", e); } }
|
4. 进阶接入
4.1 定义抽象Spring缓存 — AbstractSpringCache
可参考笔者的这篇博文 Memcached接入,不再赘述。
4.2 定义Redis Spring缓存类 — RedisSpringCache
该类继承抽象 Spring 缓存 AbstractSpringCache,用于对接 Spring; 从构造方法可见,该类初始化还是使用 RedisFleaCache。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public class RedisSpringCache extends AbstractSpringCache {
public RedisSpringCache(String name, IFleaCache fleaCache) { super(name, fleaCache); }
public RedisSpringCache(String name, long expiry, RedisClient redisClient) { this(name, new RedisFleaCache(name, expiry, redisClient)); }
}
|
4.3 定义抽象Spring缓存管理类 — AbstractSpringCacheManager
可参考笔者的这篇博文 Memcached接入,不再赘述。
4.4 定义Redis Spring缓存管理类 — RedisSpringCacheManager
该类继承抽象 Spring 缓存管理类 AbstractSpringCacheManager,用于对接Spring; 基本实现同 RedisFleaCacheManager,唯一不同在于 newCache 的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
public class RedisSpringCacheManager extends AbstractSpringCacheManager {
private RedisClient redisClient;
public RedisSpringCacheManager() { RedisPool.getInstance().initialize(); redisClient = RedisClientProxy.getProxyInstance(); }
@Override protected AbstractSpringCache newCache(String name, long expiry) { return new RedisSpringCache(name, expiry, redisClient); } }
|
4.5 spring 配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
<bean id="redisSpringCacheManager" class="com.huazie.frame.cache.redis.RedisSpringCacheManager"> <property name="configMap"> <map> <entry key="fleaparadetail" value="86400"/> </map> </property> </bean>
<cache:annotation-driven cache-manager="redisSpringCacheManager" proxy-target-class="true"/>
|
4.6 缓存自测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| private ApplicationContext applicationContext;
@Before public void init() { applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); LOGGER.debug("ApplicationContext={}", applicationContext); }
@Test public void testRedisSpringCache() { try { AbstractSpringCacheManager manager = (RedisSpringCacheManager) applicationContext.getBean("redisSpringCacheManager"); LOGGER.debug("RedisCacheManager={}", manager);
AbstractSpringCache cache = manager.getCache("fleaparadetail"); LOGGER.debug("Cache={}", cache);
cache.clear();
} catch (Exception e) { LOGGER.error("Exception:", e); } }
|
结语
好了, Redis 的接入工作已经全部完成了。到目前为止,不论是Memcached的接入还是 Redis的接入,都是单一的缓存接入,笔者后续将介绍如何整合Memcached和Redis接入,以应对日益复杂的业务需求。 敬请期待!!!
更新
这一版 Redis接入 已进行重构,可详见 flea-cache使用之Redis分片模式接入