引言 在开始本篇的讲解之前,我先来说下之前写过的两篇博文【现在已弃用】:flea-frame-db使用之基于EntityManager实现JPA分表的数据库操作【旧】 flea-frame-db使用之基于FleaJPAQuery实现JPA分表查询【旧】
这两篇都与分表相关,之所以被弃用,是因为在并发场景中这一版的分表存在问题。虽然并发场景有问题,但与之相关的分表配置、分表实现也确实为本篇的分库分表提供了一些基础能力,这些不能被忽视,将会在本篇中一一介绍。
经过重构之后,目前 flea-db 模块的结构如下图所示:
模块
描述
flea-db-common
分库配置、分表配置、SQL模板配置、异常 和 工具类等代码
flea-db-eclipselink
基于EclipseLink版的JPA实现而定制化的代码
flea-db-jdbc
基于 JDBC 开发的通用代码
flea-db-jpa
基于 JPA 开发的通用代码
1. 名词解释
名词
解释
模板库名
用作模板的数据库名
模板库持久化单元名
模板库下的持久化单元名,一般和模板库相同
模板库事务名
模板库下的事务管理器名 ,分库配置中可查看 <transaction>
标签
分库名
以模板库名为基础,根据分库规则得到的数据库名
分库持久化单元名
以模板库持久化单元名为基础,根据分库规则得到的持久化单元名,一般和分库名相同
分库事务名
以模板库事务名为基础,根据分库规则得到的事务名
分库转换
以模板库名为基础,根据分库规则得到数据库名的过程
分库序列键
分库规则中<split>
标签中 seq 的值,组成分库名表达式的一部分; 如果是分库分表,也对应着分表规则中<split>
标签中 seq 的值
分库序列值
分库序列键对应的值,在分库转换中使用
模板表名
用作模板的表名
分表名
以模板表名为基础,根据分表规则得到的表名
分表转换
以模板表名为基础,根据分表规则得到表名的过程
2. 配置讲解 2.1 分库配置 分库配置文件默认路径:flea/db/flea-lib-split.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 <?xml version="1.0" encoding="UTF-8" ?> <flea-lib-split > <libs > <lib name ="fleaorder" count ="2" exp ="(FLEA_LIB_NAME)(SEQ)" desc ="flea订单库分库规则" > <transaction name ="fleaOrderTransactionManager" exp ="(FLEA_TRANSACTION_NAME)(SEQ)" /> <splits > <split key ="DEC_NUM" seq ="SEQ" /> </splits > </lib > </libs > </flea-lib-split >
分库规则相关实现代码,可以移步 GitHub 查看 FleaSplitUtils##getSplitLib
2.2 分表配置 分表配置文件默认路径:flea/db/flea-table-split.xml 分库分表案例中,实体类中 @Table 注解定义的表名,我们可以理解为模板表名;实际的分表,根据模板表名和分表规则确定,后面将慢慢讲解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 <?xml version="1.0" encoding="UTF-8" ?> <flea-table-split > <tables > <table name ="order" lib ="fleaorder" exp ="(FLEA_TABLE_NAME)_(ORDER_ID)" desc ="Flea订单信息表分表规则" > <splits > <split key ="ONE" column ="order_id" seq ="SEQ" /> </splits > </table > <table name ="order_attr" lib ="fleaorder" exp ="(FLEA_TABLE_NAME)_(ORDER_ID)" desc ="Flea订单属性表分表规则" > <splits > <split key ="ONE" column ="order_id" seq ="SEQ" /> </splits > </table > </tables > </flea-table-split >
分表规则相关实现代码,可以移步 GitHub 查看 FleaSplitUtils##getSplitTable
2.3 JPA持久化单元配置 JPA持久化单元 ,包含了一组实体类的命名配置 和 数据源配置。实际使用中,一个 JPA持久化单元 一般对应一个数据库,其中<properties>
标签指定具体的数据库配置,包含驱动名、地址、用户和密码;<class>
标签指定该数据库下的表对应的实体类。<exclude-unlisted-classes>
标签,当设置为 true 时,只有列出的类和 jars 将被扫描持久类,否则封闭 jar 或目录也将被扫描。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 <?xml version="1.0" encoding="UTF-8" ?> <persistence version ="2.0" xmlns ="http://java.sun.com/xml/ns/persistence" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd" > <persistence-unit name ="fleaorder" transaction-type ="RESOURCE_LOCAL" > <provider > org.eclipse.persistence.jpa.PersistenceProvider</provider > <class > com.huazie.fleadbtest.jpa.split.entity.Order</class > <class > com.huazie.fleadbtest.jpa.split.entity.OrderAttr</class > <class > com.huazie.fleadbtest.jpa.split.entity.OldOrder</class > <class > com.huazie.fleadbtest.jpa.split.entity.OldOrderAttr</class > <exclude-unlisted-classes > true</exclude-unlisted-classes > <properties > <property name ="javax.persistence.jdbc.driver" value ="com.mysql.jdbc.Driver" /> <property name ="javax.persistence.jdbc.url" value ="jdbc:mysql://localhost:3306/fleaorder?useUnicode=true& characterEncoding=UTF-8" /> <property name ="javax.persistence.jdbc.user" value ="root" /> <property name ="javax.persistence.jdbc.password" value ="root" /> </properties > </persistence-unit > </persistence >
分库场景,模板库和分库都需要有一个对应的持久化单元配置,详见 接入演示的持久化单元配置 。
2.4 JPA相关Spring Bean配置 首先是JPA 固定的Spring Bean 配置,可查看 fleajpabeans-spring.xml ,配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 <?xml version="1.0" encoding="UTF-8" ?> <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" > <bean id ="defaultPersistenceManager" class ="org.springframework.orm.jpa.persistenceunit.DefaultPersistenceUnitManager" > <property name ="persistenceXmlLocations" > <list > <value > classpath:META-INF/fleajpa-persistence.xml</value > <value > classpath:META-INF/fleaorder-persistence.xml</value > <value > classpath:META-INF/fleaorder1-persistence.xml</value > <value > classpath:META-INF/fleaorder2-persistence.xml</value > </list > </property > </bean > <bean id ="defaultPersistenceProvider" class ="org.eclipse.persistence.jpa.PersistenceProvider" /> <bean id ="defaultVendorAdapter" class ="org.springframework.orm.jpa.vendor.EclipseLinkJpaVendorAdapter" > <property name ="showSql" value ="true" /> </bean > <bean id ="defaultJpaDialect" class ="org.springframework.orm.jpa.vendor.EclipseLinkJpaDialect" /> </beans >
与持久化单元对应的 Bean 配置,可查看 fleaorder-spring.xml ,配置 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 <?xml version="1.0" encoding="UTF-8" ?> <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx ="http://www.springframework.org/schema/tx" xsi:schemaLocation =" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd" > <bean id ="fleaOrderEntityManagerFactory" class ="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean" > <property name ="persistenceUnitManager" ref ="defaultPersistenceManager" /> <property name ="persistenceUnitName" value ="fleaorder" /> <property name ="persistenceProvider" ref ="defaultPersistenceProvider" /> <property name ="jpaVendorAdapter" ref ="defaultVendorAdapter" /> <property name ="jpaDialect" ref ="defaultJpaDialect" /> <property name ="jpaPropertyMap" > <map > <entry key ="eclipselink.weaving" value ="false" /> <entry key ="eclipselink.logging.thread" value ="true" /> </map > </property > </bean > <bean id ="fleaOrderTransactionManager" class ="org.springframework.orm.jpa.JpaTransactionManager" > <property name ="entityManagerFactory" ref ="fleaOrderEntityManagerFactory" /> </bean > <tx:annotation-driven transaction-manager ="fleaOrderTransactionManager" /> </beans >
3. 实现讲解 3.1 Flea自定义事务切面 Flea自定义事务切面 FleaTransactionalAspect ,拦截由自定义事务注解标记的 Spring注入 的方法, 实现在方法调用之前开启事务,调用成功后提交事务,出现异常回滚事务。
Flea自定义事务注解主要标记在两类方法上:
一类方法是,AbstractFleaJPADAOImpl 的子类的增删改方法;这些方法一般在 某某数据源DAO层实现类 中,注解中需要指定事务名。
另一类方法是,除了上一类方法的其他 Spring注入 的方法上;需要特别注意的是,自定义事务注解上不仅需要指定事务名、而且还需要指定持久化单元名;
如果存在分库的场景,在调用之前,需要设置当前线程下的分库序列值。
1 2 3 FleaLibUtil.setSplitLibSeqValue("SEQ" , "123123123" );
下面我贴出Flea 自定义事务切面的代码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 @Aspect @Component public class FleaTransactionalAspect { private static final String METHOD_NAME_GET_ENTITY_MANAGER = "getEntityManager" ; @Around("@annotation(com.huazie.fleaframework.db.jpa.transaction.FleaTransactional)") public Object invokeWithinTransaction (final ProceedingJoinPoint joinPoint) throws CommonException, FleaException, NoSuchMethodException { Method method = FleaAspectUtils.getTargetMethod(joinPoint); String transactionName = FleaEntityManager.getTransactionName(method); Object[] args = joinPoint.getArgs(); Object tObj = joinPoint.getTarget(); FleaEntity fleaEntity = null ; if (ArrayUtils.isNotEmpty(args)) { fleaEntity = getFleaEntityFromLastParam(args); } EntityManager entityManager; if (ObjectUtils.isNotEmpty(fleaEntity) && tObj instanceof AbstractFleaJPADAOImpl) { entityManager = (EntityManager) ReflectUtils.invoke(tObj, METHOD_NAME_GET_ENTITY_MANAGER, fleaEntity, Object.class); SplitTable splitTable = fleaEntity.get(DBConstants.LibTableSplitConstants.SPLIT_TABLE, SplitTable.class); SplitLib splitLib = fleaEntity.get(DBConstants.LibTableSplitConstants.SPLIT_LIB, SplitLib.class); if (ObjectUtils.isNotEmpty(splitTable)) { splitLib = splitTable.getSplitLib(); } if (ObjectUtils.isNotEmpty(splitLib) && splitLib.isExistSplitLib()) { transactionName = splitLib.getSplitLibTxName(); } } else { String unitName = FleaEntityManager.getUnitName(method); SplitLib splitLib = FleaSplitUtils.getSplitLib(unitName, FleaLibUtil.getSplitLibSeqValues()); if (splitLib.isExistSplitLib()) { transactionName = splitLib.getSplitLibTxName(); unitName = splitLib.getSplitLibName(); } entityManager = FleaEntityManager.getEntityManager(unitName, transactionName); } PlatformTransactionManager transactionManager = (PlatformTransactionManager) FleaApplicationContext.getBean(transactionName); ObjectUtils.checkEmpty(transactionManager, DaoException.class, "ERROR-DB-DAO0000000015" , transactionName); FleaTransactionTemplate transactionTemplate = new FleaTransactionTemplate (transactionManager, entityManager); return transactionTemplate.execute(new TransactionCallback <Object>() { @Override public Object doInTransaction (TransactionStatus status) { try { return joinPoint.proceed(); } catch (Throwable throwable) { ExceptionUtils.throwFleaException(FleaDBException.class, "Proceed with the next advice or target method invocation occurs exception : \n" , throwable); } return null ; } }); } }
在上述代码中,事务名 和 实体管理器 的获取是重点,因Flea 自定义事务注解标记在两类不同的方法上,这两者的获取也不一样。通过事务名可直接从Spring 配置中获取定义的事务管理器,事务名对应着Spring 配置中 transaction-manager
对应的属性值,详见 2.4 中 fleaorder-spring.xml 。
最后使用 Flea 事务模板,来实现标记 @FleaTransactional
的方法调用之前开启事务,调用成功后提交事务,出现异常回滚事务。
3.2 Flea事务模板 Flea事务模板 FleaTransactionTemplate ,参考 Spring 的 TransactionTemplate ,它是简化程序化事务划分和事务异常处理的模板类。其核心方法是 execute
, 参数是实现事务回调接口的事务代码。此模板收敛了处理事务生命周期和可能的异常的逻辑,因此事务回调接口的实现和调用代码都不需要显式处理事务。
下面将贴出其核心方法 execute
,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public <T> T execute (TransactionCallback<T> action) throws TransactionException { if (this .transactionManager instanceof CallbackPreferringPlatformTransactionManager) { return ((CallbackPreferringPlatformTransactionManager) this .transactionManager).execute(this , action); } else { TransactionStatus status = FleaJPASplitHelper.getHandler().getTransaction(this , transactionManager, entityManager); T result; try { result = action.doInTransaction(status); } catch (RuntimeException | Error ex) { rollbackOnException(status, ex); throw ex; } catch (Throwable ex) { rollbackOnException(status, ex); throw new UndeclaredThrowableException (ex, "TransactionCallback threw undeclared checked exception" ); } this .transactionManager.commit(status); return result; } } private void rollbackOnException (TransactionStatus status, Throwable ex) throws TransactionException { try { this .transactionManager.rollback(status); } catch (TransactionSystemException ex2) { ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException ex2) { throw ex2; } catch (Error err) { throw err; } }
3.3 Flea实体管理器 Flea 实体管理器工具类 FleaEntityManager ,提供了获取持久化上下文交互的实体管理器接口、持久化单元名、事务名、分表信息、各持久化上下文交互接口的静态方法【如: getFleaNextValue ,find ,remove ,merge ,persist ,flush 】。
下面我们来看下整体的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 public class FleaEntityManager { private static final ConcurrentMap<String, EntityManager> entityManagerMap = new ConcurrentHashMap <>(); private static final Object entityManagerMapLock = new Object (); private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal <>("EntityManager resources" ); private FleaEntityManager () { } public static EntityManager getEntityManager (String unitName, String transactionName) throws CommonException { StringUtils.checkBlank(unitName, DaoException.class, "ERROR-DB-DAO0000000002" , "libName" ); StringUtils.checkBlank(transactionName, DaoException.class, "ERROR-DB-DAO0000000002" , "transactionName" ); if (!entityManagerMap.containsKey(unitName)) { synchronized (entityManagerMapLock) { if (!entityManagerMap.containsKey(unitName)) { JpaTransactionManager manger = (JpaTransactionManager) FleaApplicationContext.getBean(transactionName); ObjectUtils.checkEmpty(manger, DaoException.class, "ERROR-DB-DAO0000000015" , transactionName); EntityManagerFactory entityManagerFactory = manger.getEntityManagerFactory(); EntityManager entityManager = SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); entityManagerMap.put(unitName, entityManager); } } } return entityManagerMap.get(unitName); } public static String getPersistenceUnitName (Class<?> daoImplClazz) { } public static String getTransactionName (Class<?> daoImplClazz) { } public static String getTransactionName (Method method) { } public static String getUnitName (Method method) { } public static SplitTable getSplitTable (Object entity) throws CommonException { } public static Map<Object, Object> getResourceMap () { } public static boolean hasResource (Object key) { } public static Object getResource (Object key) { } private static Object doGetResource (Object actualKey) { } public static void bindResource (Object key, Object value) throws IllegalStateException { } public static Object unbindResource (Object key) throws IllegalStateException { } public static Object unbindResourceIfPossible (Object key) { return doUnbindResource(key); } private static Object doUnbindResource (Object actualKey) { } public static <T> Number getFleaNextValue (EntityManager entityManager, Class<T> entityClass, T entity) { return FleaJPASplitHelper.getHandler().getNextValue(entityManager, entityClass, entity); } public static <T> T find (EntityManager entityManager, Object primaryKey, Class<T> entityClass, T entity) { return FleaJPASplitHelper.getHandler().find(entityManager, primaryKey, entityClass, entity); } public static <T> boolean remove (EntityManager entityManager, T entity) { return FleaJPASplitHelper.getHandler().remove(entityManager, entity); } public static <T> T merge (EntityManager entityManager, T entity) { return FleaJPASplitHelper.getHandler().merge(entityManager, entity); } public static <T> void persist (EntityManager entityManager, T entity) { FleaJPASplitHelper.getHandler().persist(entityManager, entity); } public static <T> void flush (EntityManager entityManager, T entity) { FleaJPASplitHelper.getHandler().flush(entityManager, entity); } }
3.4 Flea JPA分库分表处理接口 IFleaJPASplitHandler ,从上面 3.3中,我们可以看到 Flea实体管理器中的各持久化上下文交互接口的静态方法【如: getFleaNextValue ,find ,remove ,merge ,persist ,flush 】都是调用 FleaJPASplitHelper.getHandler() 的对应方法实现的,也就是 IFleaJPASplitHandler 的对应方法。
Flea JPA 分库分表处理者接口,包含分库分表相关的处理接口方法、增删改查的数据操作接口方法。
下面我们来看看 Flea JPA分库分表处理接口 都有哪些处理方法?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public interface IFleaJPASplitHandler { void handle (FleaJPAQuery query, Object entity) throws CommonException; void handle (FleaJPAQuery query, TypedQuery typedQuery) throws CommonException; EntityManager handle (EntityManager entityManager, Object entity, boolean flag) throws CommonException; TransactionStatus getTransaction (TransactionDefinition definition, PlatformTransactionManager transactionManager, EntityManager entityManager) ; <T> Number getNextValue (EntityManager entityManager, Class<T> entityClass, T entity) ; <T> T find (EntityManager entityManager, Object primaryKey, Class<T> entityClass, T entity) ; <T> boolean remove (EntityManager entityManager, T entity) ; <T> T merge (EntityManager entityManager, T entity) ; <T> void persist (EntityManager entityManager, T entity) ; <T> void flush (EntityManager entityManager, T entity) ; }
3.5 EclipseLink分库分表处理实现 EclipseLink 分库分表处理者 EclipseLinkLibTableSplitHandler ,由自定义的实体管理器实现类处理增删改查等操作。
在讲解 EclipseLink 分库分表处理者之前,我们先了解下其父类 FleaLibTableSplitHandler ,该类实现了通用的分库分表处理 和 增删改查操作,同时定义了抽象的内部方法由子类实现具体的操作。
下面我们来看一下具体的代码,如下:
public abstract class FleaLibTableSplitHandler implements IFleaJPASplitHandler { @Override public void handle (FleaJPAQuery query, Object entity) throws CommonException { if (ObjectUtils.isEmpty(query) || ObjectUtils.isEmpty(entity) || !(entity instanceof FleaEntity)) { return ; } FleaEntity fleaEntity = (FleaEntity) entity; SplitTable splitTable = FleaEntityManager.getSplitTable(entity); SplitLib splitLib; if (splitTable.isExistSplitTable()) { splitLib = splitTable.getSplitLib(); fleaEntity.put(DBConstants.LibTableSplitConstants.SPLIT_TABLE, splitTable); } else { String libName = query.getPoolName(); if (ObjectUtils.isEmpty(libName)) { return ; } splitLib = FleaSplitUtils.getSplitLib(libName, FleaLibUtil.getSplitLibSeqValues()); } EntityManager splitEntityManager = handleInner(splitLib); EntityManager entityManager; if (ObjectUtils.isEmpty(splitEntityManager)) { entityManager = query.getEntityManager(); } else { entityManager = splitEntityManager; } if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { splitEntityManager = getFleaEntityMangerImpl(entityManager); } if (ObjectUtils.isNotEmpty(splitEntityManager)) { query.init(splitEntityManager, query.getSourceClazz(), query.getResultClazz()); } } @Override public void handle (FleaJPAQuery query, TypedQuery typedQuery) { SplitTable splitTable = getSplitTableFromEntity(query.getEntity()); if (!splitTable.isExistSplitTable()) { return ; } handleInner(query, typedQuery, splitTable); } @Override public EntityManager handle (EntityManager entityManager, Object entity, boolean flag) throws CommonException { if (ObjectUtils.isEmpty(entityManager) || ObjectUtils.isEmpty(entity) || !(entity instanceof FleaEntity)) { return entityManager; } SplitTable splitTable = FleaEntityManager.getSplitTable(entity); FleaEntity fleaEntity = (FleaEntity) entity; SplitLib splitLib; if (splitTable.isExistSplitTable()) { splitLib = splitTable.getSplitLib(); fleaEntity.put(DBConstants.LibTableSplitConstants.SPLIT_TABLE, splitTable); } else { String libName = fleaEntity.get(DBConstants.LibTableSplitConstants.FLEA_LIB_NAME, String.class); if (ObjectUtils.isEmpty(libName)) { return entityManager; } splitLib = FleaSplitUtils.getSplitLib(libName, FleaLibUtil.getSplitLibSeqValues()); fleaEntity.put(DBConstants.LibTableSplitConstants.SPLIT_LIB, splitLib); } if (flag && splitTable.isGeneratorFlag()) { return entityManager; } EntityManager splitEntityManager = handleInner(splitLib); if (ObjectUtils.isNotEmpty(splitEntityManager)) { entityManager = splitEntityManager; } return entityManager; } private EntityManager handleInner (SplitLib splitLib) throws CommonException { EntityManager entityManager = null ; if (ObjectUtils.isNotEmpty(splitLib) && splitLib.isExistSplitLib()) { String unitName = splitLib.getSplitLibName(); String transactionName = splitLib.getSplitLibTxName(); entityManager = FleaEntityManager.getEntityManager(unitName, transactionName); } return entityManager; } @Override public TransactionStatus getTransaction (TransactionDefinition definition, PlatformTransactionManager transactionManager, EntityManager entityManager) { JpaTransactionManager jpaTransactionManager = (JpaTransactionManager) transactionManager; Object obj = TransactionSynchronizationManager.getResource(jpaTransactionManager.getEntityManagerFactory()); if (ObjectUtils.isEmpty(obj)) { EntityManager fleaEntityManagerImpl = getFleaEntityMangerImpl(entityManager); EntityManagerHolder entityManagerHolder = new EntityManagerHolder (fleaEntityManagerImpl); TransactionSynchronizationManager.bindResource(jpaTransactionManager.getEntityManagerFactory(), entityManagerHolder); } return jpaTransactionManager.getTransaction(definition); } @Override public <T> Number getNextValue (EntityManager entityManager, Class<T> entityClass, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); return getNextValueInner(entityManager, entityClass, splitTable); } @Override public <T> T find (EntityManager entityManager, Object primaryKey, Class<T> entityClass, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); SplitLib splitLib = getSplitLibFromEntity(entity); T t; if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { t = findInner(entityManager, primaryKey, entityClass, splitTable); } else { t = entityManager.find(entityClass, primaryKey); } return t; } @Override public <T> boolean remove (EntityManager entityManager, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); SplitLib splitLib = getSplitLibFromEntity(entity); if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { removeInner(entityManager, entity); } else { if (!entityManager.contains(entity)) { entity = registerObject(entityManager, entity); } entityManager.remove(entity); } return true ; } @Override public <T> T merge (EntityManager entityManager, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); SplitLib splitLib = getSplitLibFromEntity(entity); if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { return mergeInner(entityManager, entity); } else { return entityManager.merge(entity); } } @Override public <T> void persist (EntityManager entityManager, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); SplitLib splitLib = getSplitLibFromEntity(entity); if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { persistInner(entityManager, entity); } else { entityManager.persist(entity); } } @Override public <T> void flush (EntityManager entityManager, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); SplitLib splitLib = getSplitLibFromEntity(entity); if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { flushInner(entityManager); } else { entityManager.flush(); } } private boolean isFleaEntityManagerImpl (EntityManager entityManager, SplitTable splitTable, SplitLib splitLib) { return (splitTable.isExistSplitTable() || splitLib.isExistSplitLib() || FleaEntityManager.hasResource(entityManager.getEntityManagerFactory())); } private SplitTable getSplitTableFromEntity (Object entity) { SplitTable splitTable = null ; if (ObjectUtils.isNotEmpty(entity) && (entity instanceof FleaEntity)) { FleaEntity fleaEntity = (FleaEntity) entity; splitTable = fleaEntity.get(DBConstants.LibTableSplitConstants.SPLIT_TABLE, SplitTable.class); } if (ObjectUtils.isEmpty(splitTable)) { splitTable = new SplitTable (); splitTable.setExistSplitTable(false ); } return splitTable; } private SplitLib getSplitLibFromEntity (Object entity) { SplitLib splitLib = null ; if (ObjectUtils.isNotEmpty(entity) && (entity instanceof FleaEntity)) { FleaEntity fleaEntity = (FleaEntity) entity; splitLib = fleaEntity.get(DBConstants.LibTableSplitConstants.SPLIT_LIB, SplitLib.class); } if (ObjectUtils.isEmpty(splitLib)) { splitLib = new SplitLib (); splitLib.setExistSplitLib(false ); } return splitLib; } protected abstract EntityManager getFleaEntityMangerImpl (EntityManager entityManager) ; protected abstract void handleInner (FleaJPAQuery query, TypedQuery typedQuery, SplitTable splitTable) ; protected abstract <T> Number getNextValueInner (EntityManager entityManager, Class<T> entityClass, SplitTable splitTable) ; protected abstract <T> T findInner (EntityManager entityManager, Object primaryKey, Class<T> entityClass, SplitTable splitTable) ; protected abstract <T> void removeInner (EntityManager entityManager, T entity) ; protected abstract <T> T mergeInner (EntityManager entityManager, T entity) ; protected abstract <T> void persistInner (EntityManager entityManager, T entity) ; protected abstract void flushInner (EntityManager entityManager) ; protected abstract <T> T registerObject (EntityManager entityManager, T entity) ; }
上面具体的 inner
方法实现,我们可以看到都使用了 FleaEntityManagerImpl ,这就是下面将要介绍的 Flea 实体管理器 EclipseLink 版实现。
下面我们来看看相关代码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class EclipseLinkLibTableSplitHandler extends FleaLibTableSplitHandler { @Override protected void handleInner (FleaJPAQuery query, TypedQuery typedQuery, SplitTable splitTable) { EntityType entityType = query.getRoot().getModel(); ClassDescriptor classDescriptor = ((EntityTypeImpl) entityType).getDescriptor(); AbstractSession abstractSession = ((FleaEntityManagerImpl) query.getEntityManager()).getDatabaseSession(); classDescriptor = ClassDescriptorUtils.getSplitDescriptor(classDescriptor, abstractSession, splitTable); ReadAllQuery readAllQuery = typedQuery.unwrap(ReadAllQuery.class); readAllQuery.setDescriptor(classDescriptor); readAllQuery.getExpressionBuilder().setQueryClassAndDescriptor(classDescriptor.getJavaClass(), classDescriptor); } @Override protected EntityManager getFleaEntityMangerImpl (EntityManager entityManager) { return FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager); } @Override protected <T> Number getNextValueInner (EntityManager entityManager, Class<T> entityClass, SplitTable splitTable) { return FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).getNextValue(entityClass, splitTable); } @Override protected <T> T findInner (EntityManager entityManager, Object primaryKey, Class<T> entityClass, SplitTable splitTable) { return FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).find(entityClass, primaryKey, splitTable); } @Override protected <T> void removeInner (EntityManager entityManager, T entity) { FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).remove(entity); } @Override protected <T> T mergeInner (EntityManager entityManager, T entity) { return FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).merge(entity); } @Override protected <T> void persistInner (EntityManager entityManager, T entity) { FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).persist(entity); } @Override protected void flushInner (EntityManager entityManager) { FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).flush(); } @Override @SuppressWarnings("unchecked") protected <T> T registerObject (EntityManager entityManager, T entity) { if (entityManager.contains(entity) || ObjectUtils.isEmpty(entity)) { return entity; } return (T) entityManager.unwrap(UnitOfWork.class).registerObject(entity); } }
上面具体的 inner 方法实现,我们可以看到都使用了 FleaEntityManagerImpl ,这就是下面将要介绍的 Flea 实体管理器 EclipseLink 版实现。
3.6 Flea实体管理器EclipseLink版实现 Flea 实体管理器 EclipseLink 版实现 FleaEntityManagerImpl ,继承了 EclipseLink 的 EntityManagerImpl ,它需要有 一个 EntityManager 入参来构造。
下面我们来看一下相关代码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 public final class FleaEntityManagerImpl extends EntityManagerImpl { public static FleaEntityManagerImpl getFleaEntityManagerImpl (EntityManager entityManager) { EntityManagerFactory entityManagerFactory = entityManager.getEntityManagerFactory(); FleaEntityManagerImpl fleaEntityManagerImpl = (FleaEntityManagerImpl) FleaEntityManager.getResource(entityManagerFactory); if (ObjectUtils.isEmpty(fleaEntityManagerImpl)) { fleaEntityManagerImpl = new FleaEntityManagerImpl (entityManager); FleaEntityManager.bindResource(entityManagerFactory, fleaEntityManagerImpl); } return fleaEntityManagerImpl; } private FleaEntityManagerImpl (EntityManager entityManager) { super (entityManager.getEntityManagerFactory().unwrap(JpaEntityManagerFactory.class).unwrap(), entityManager.getProperties(), null ); } public <T> T find (Class<T> entityClass, Object primaryKey, SplitTable splitTable) { return find(entityClass, primaryKey, null , getQueryHints(entityClass, OperationType.FIND), splitTable); } public <T> T find (Class<T> entityClass, Object primaryKey, Map<String, Object> properties, SplitTable splitTable) { return find(entityClass, primaryKey, null , properties, splitTable); } public <T> T find (Class<T> entityClass, Object primaryKey, LockModeType lockMode, SplitTable splitTable) { return find(entityClass, primaryKey, lockMode, getQueryHints(entityClass, OperationType.FIND), splitTable); } @SuppressWarnings({"unchecked"}) public <T> T find (Class<T> entityClass, Object primaryKey, LockModeType lockMode, Map<String, Object> properties, SplitTable splitTable) { try { verifyOpen(); if (ObjectUtils.isNotEmpty(lockMode) && !lockMode.equals(LockModeType.NONE)) { checkForTransaction(true ); } AbstractSession session = this .databaseSession; ClassDescriptor descriptor = session.getDescriptor(entityClass); if (descriptor == null || descriptor.isDescriptorTypeAggregate()) { throw new IllegalArgumentException (ExceptionLocalization.buildMessage("unknown_bean_class" , new Object []{entityClass})); } if (!descriptor.shouldBeReadOnly() || !descriptor.isSharedIsolation()) { session = (AbstractSession) getActiveSession(); } else { session = (AbstractSession) getReadOnlySession(); } if (descriptor.hasTablePerMultitenantPolicy()) { descriptor = session.getDescriptor(entityClass); } descriptor = ClassDescriptorUtils.getSplitDescriptor(descriptor, session, splitTable); return (T) findInternal(descriptor, session, primaryKey, lockMode, properties); } catch (LockTimeoutException e) { throw e; } catch (RuntimeException e) { setRollbackOnly(); throw e; } } public <T> Number getNextValue (Class<T> entityClass, SplitTable splitTable) { AbstractSession session = this .databaseSession; ClassDescriptor descriptor = session.getDescriptor(entityClass); if (ObjectUtils.isEmpty(descriptor)) { throw new IllegalArgumentException (ExceptionLocalization.buildMessage("unknown_bean_class" , new Object []{entityClass})); } descriptor = ClassDescriptorUtils.getSplitDescriptor(descriptor, session, splitTable); Number nextValue; if (ObjectUtils.isNotEmpty(splitTable) && splitTable.isExistSplitTablePkColumn()) { String sequenceName = splitTable.getSplitTablePkColumnValue(); Sequencing sequencing = session.getSequencing(); FleaSequencingManager fleaSequencingManager = FleaSequencingManager.getFleaSequencingManager(sequenceName, sequencing, descriptor); nextValue = fleaSequencingManager.getNextValue(sequenceName); } else { nextValue = session.getNextSequenceNumberValue(entityClass); } return nextValue; } @Override public RepeatableWriteUnitOfWork getActivePersistenceContext (Object txn) { } }
4. 接入讲解 4.1 数据库和表 4.1.1 模板库 flea_id_generator 为主键生成器表,可查看笔者的这篇博文《flea-db使用之主键生成器表介绍》 ,不再赘述。
4.1.2 分库1
4.1.3 分库2
具体的SQL文件,请参考 fleaorder.sql ,fleaorder1.sql ,fleaorder2.sql
4.2 各实体类
4.3 FleaOrder数据源DAO层父类 FleaOrderDAOImpl ,该类继承 AbstractFleaJPADAOImpl ,成员变量 entityManager ,由 PersistenceContext 注解标记 持久化单元名,这里为模板持久化单元名。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 public class FleaOrderDAOImpl <T> extends AbstractFleaJPADAOImpl <T> { @PersistenceContext(unitName="fleaorder") private EntityManager entityManager; @Override @FleaTransactional("fleaOrderTransactionManager") public Number getFleaNextValue (T entity) throws CommonException { return super .getFleaNextValue(entity); } @Override @FleaTransactional(value = "fleaOrderTransactionManager", unitName = "fleaorder") public boolean remove (long entityId) throws CommonException { return super .remove(entityId); } @Override @FleaTransactional(value = "fleaOrderTransactionManager", unitName = "fleaorder") public boolean remove (String entityId) throws CommonException { return super .remove(entityId); } @Override @FleaTransactional("fleaOrderTransactionManager") public boolean remove (T entity) throws CommonException { return super .remove(entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public boolean remove (long entityId, T entity) throws CommonException { return super .remove(entityId, entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public boolean remove (String entityId, T entity) throws CommonException { return super .remove(entityId, entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public T update (T entity) throws CommonException { return super .update(entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public List<T> batchUpdate (List<T> entities) throws CommonException { return super .batchUpdate(entities); } @Override @FleaTransactional("fleaOrderTransactionManager") public void save (T entity) throws CommonException { super .save(entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public void batchSave (List<T> entities) throws CommonException { super .batchSave(entities); } @Override @FleaTransactional("fleaOrderTransactionManager") public int insert (String relationId, T entity) throws CommonException { return super .insert(relationId, entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public int update (String relationId, T entity) throws CommonException { return super .update(relationId, entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public int delete (String relationId, T entity) throws CommonException { return super .delete(relationId, entity); } @Override protected EntityManager getEntityManager () { return entityManager; } }
4.4 各实体的DAO层接口和实现 可至 GitHub 查看如下 DAO层代码 :
4.5 各实体的SV层接口和实现 可至 GitHub 查看如下 SV层代码 :
5. 单元测试 测试之前,先添加主键生成器表中的数据如下:
id_generator_key
id_generator_value
pk_old_order
999999999
pk_order
999999999
5.1 分库分表测试 分库分表测试类 OrderTest
下面我们来看下,分库分表的新增、查询、更新 和 查询,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext.xml"}) public class OrderTest { private static final Logger LOGGER = FleaLoggerProxy.getProxyInstance(OrderTest.class); @Resource(name = "orderSV") private IOrderSV orderSV; @Test public void testInsertOrder () throws Exception { Order order = new Order (); order.setOrderName("测试订单" ); order.setOrderPrice(0L ); order.setOrderState(0 ); order.setOrderDate(DateUtils.getCurrentTime()); Long orderId = (Long) orderSV.getFleaNextValue(null ); order.setOrderId(orderId); orderSV.save(order); } @Test public void testQueryOrder () throws Exception { long orderId = 1000000000L ; Order order = new Order (); order.setOrderId(orderId); order = orderSV.query(orderId, order); LOGGER.debug("Order = {}" , order); } @Test public void testUpdateOrder () throws Exception { long orderId = 1000000000L ; Order order = new Order (); order.setOrderId(orderId); Set<String> attrNames = new HashSet <>(); attrNames.add("orderId" ); List<Order> orderList = orderSV.query(attrNames, order); if (CollectionUtils.isNotEmpty(orderList)) { order = orderList.get(0 ); LOGGER.debug("Before : {}" , order); order.setOrderName("修改订单" ); order.setOrderPrice(100L ); order.setOrderState(1 ); orderSV.update(order); } Order order1 = new Order (); order1.setOrderId(orderId); order1 = orderSV.query(orderId, order1); LOGGER.debug("After : {}" , order1); } @Test public void testDeleteOrder () throws Exception { long orderId = 1000000000L ; Order order = new Order (); order.setOrderId(orderId); Set<String> attrNames = new HashSet <>(); attrNames.add("orderId" ); List<Order> orderList = orderSV.query(attrNames, order); if (CollectionUtils.isNotEmpty(orderList)) { Order order1 = orderList.get(0 ); LOGGER.error("Before : {}" , order1); orderSV.remove(order1); } Order order2 = orderSV.query(orderId, order); LOGGER.error("After : {}" , order2); } }
5.2 分库测试 如果只分库,不分表,需要再执行具体的增删改查之前,设置分库序列值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext.xml"}) public class OldOrderTest { private static final Logger LOGGER = FleaLoggerProxy.getProxyInstance(OldOrderTest.class); @Resource(name = "oldOrderSV") private IOldOrderSV oldOrderSV; @Test public void testInsertOldOrder () throws Exception { OldOrder oldOrder = new OldOrder (); oldOrder.setOrderName("测试旧订单1" ); oldOrder.setOrderPrice(200L ); oldOrder.setOrderState(0 ); oldOrder.setOrderDate(DateUtils.getCurrentTime()); Long orderId = (Long) oldOrderSV.getFleaNextValue(null ); oldOrder.setOrderId(orderId); FleaLibUtil.setSplitLibSeqValue("SEQ" , orderId); oldOrderSV.save(oldOrder); } @Test public void testQueryOldOrder () throws Exception { long orderId = 1000000000L ; OldOrder oldOrder = new OldOrder (); FleaLibUtil.setSplitLibSeqValue("SEQ" , orderId); oldOrder = oldOrderSV.query(orderId, oldOrder); LOGGER.debug("Order = {}" , oldOrder); } @Test public void testUpdateOldOrder () throws Exception { long orderId = 1000000000L ; FleaLibUtil.setSplitLibSeqValue("SEQ" , orderId); OldOrder oldOrder = new OldOrder (); oldOrder.setOrderId(orderId); Set<String> attrNames = new HashSet <>(); attrNames.add("orderId" ); List<OldOrder> oldOrderList = oldOrderSV.query(attrNames, oldOrder); if (CollectionUtils.isNotEmpty(oldOrderList)) { oldOrder = oldOrderList.get(0 ); LOGGER.debug("Before : {}" , oldOrder); oldOrder.setOrderName("修改旧订单1" ); oldOrder.setOrderPrice(200L ); oldOrder.setOrderState(2 ); oldOrderSV.update(oldOrder); } OldOrder oldOrder1 = new OldOrder (); oldOrder1 = oldOrderSV.query(orderId, oldOrder1); LOGGER.debug("After : {}" , oldOrder1); } @Test public void testDeleteOldOrder () throws Exception { long orderId = 1000000000L ; FleaLibUtil.setSplitLibSeqValue("SEQ" , orderId); OldOrder oldOrder = new OldOrder (); oldOrder.setOrderId(orderId); Set<String> attrNames = new HashSet <>(); attrNames.add("orderId" ); List<OldOrder> orderList = oldOrderSV.query(attrNames, oldOrder); if (CollectionUtils.isNotEmpty(orderList)) { OldOrder oldOrder1 = orderList.get(0 ); LOGGER.error("Before : {}" , oldOrder1); oldOrderSV.remove(oldOrder1); } OldOrder oldOrder2 = new OldOrder (); oldOrder2 = oldOrderSV.query(orderId, oldOrder2); LOGGER.error("After : {}" , oldOrder2); } }
5.3 JPA事务演示 首先我们先看下如何在 除了数据源DAO层实现类之外 的方法上使用自定的事务注解 @FleaTransactional
, 可至 GitHub 查看如下代码 : 这里贴出关键使用代码如下:
其中,value 的值为 模板库事务名 ,unitName 的值为 模板库持久化单元名 【也为对应 模板库名 】
1 2 3 4 5 @Override @FleaTransactional(value = "fleaOrderTransactionManager", unitName = "fleaorder") public void orderTransaction (Long orderId) throws CommonException { }
那上面该如何调用呢?
1 2 3 4 5 6 7 8 9 10 @Test public void testTransaction () throws Exception { long orderId = 1000000000L ; FleaLibUtil.setSplitLibSeqValue("SEQ" , orderId); fleaOrderModuleSV.orderTransaction(orderId); }
总结 经历了几版的重构,分库分表的实现终于可以和大家见面了,后续我将继续优化和完善 Flea Framework 这个框架,希望看到的朋友,可以多多支持!!!这篇博文内容有点多,能看到最后属实不容易,再次感谢大家的支持!!!