引言 在开始本篇的讲解之前,我先来说下之前写过的两篇博文【现在已弃用】:flea-frame-db使用之基于EntityManager实现JPA分表的数据库操作【旧】 flea-frame-db使用之基于FleaJPAQuery实现JPA分表查询【旧】
这两篇都与分表相关,之所以被弃用,是因为在并发场景中这一版的分表存在问题。虽然并发场景有问题,但与之相关的分表配置、分表实现也确实为本篇的分库分表提供了一些基础能力,这些不能被忽视,将会在本篇中一一介绍。
经过重构之后,目前 flea-db 模块的结构如下图所示:
模块
描述
flea-db-common
分库配置、分表配置、SQL模板配置、异常 和 工具类等代码
flea-db-eclipselink
基于EclipseLink版的JPA实现而定制化的代码
flea-db-jdbc
基于 JDBC 开发的通用代码
flea-db-jpa
基于 JPA 开发的通用代码
1. 名词解释
名词
解释
模板库名
用作模板的数据库名
模板库持久化单元名
模板库下的持久化单元名,一般和模板库相同
模板库事务名
模板库下的事务管理器名 ,分库配置中可查看 <transaction>
标签
分库名
以模板库名为基础,根据分库规则得到的数据库名
分库持久化单元名
以模板库持久化单元名为基础,根据分库规则得到的持久化单元名,一般和分库名相同
分库事务名
以模板库事务名为基础,根据分库规则得到的事务名
分库转换
以模板库名为基础,根据分库规则得到数据库名的过程
分库序列键
分库规则中<split>
标签中 seq 的值,组成分库名表达式的一部分; 如果是分库分表,也对应着分表规则中<split>
标签中 seq 的值
分库序列值
分库序列键对应的值,在分库转换中使用
模板表名
用作模板的表名
分表名
以模板表名为基础,根据分表规则得到的表名
分表转换
以模板表名为基础,根据分表规则得到表名的过程
2. 配置讲解 2.1 分库配置 分库配置文件默认路径:flea/db/flea-lib-split.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 <?xml version="1.0" encoding="UTF-8" ?> <flea-lib-split > <libs > <lib name ="fleaorder" count ="2" exp ="(FLEA_LIB_NAME)(SEQ)" desc ="flea订单库分库规则" > <transaction name ="fleaOrderTransactionManager" exp ="(FLEA_TRANSACTION_NAME)(SEQ)" /> <splits > <split key ="DEC_NUM" seq ="SEQ" /> </splits > </lib > </libs > </flea-lib-split >
分库规则相关实现代码,可以移步 GitHub 查看 FleaSplitUtils##getSplitLib
2.2 分表配置 分表配置文件默认路径:flea/db/flea-table-split.xml 分库分表案例中,实体类中 @Table 注解定义的表名,我们可以理解为模板表名;实际的分表,根据模板表名和分表规则确定,后面将慢慢讲解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 <?xml version="1.0" encoding="UTF-8" ?> <flea-table-split > <tables > <table name ="order" lib ="fleaorder" exp ="(FLEA_TABLE_NAME)_(ORDER_ID)" desc ="Flea订单信息表分表规则" > <splits > <split key ="ONE" column ="order_id" seq ="SEQ" /> </splits > </table > <table name ="order_attr" lib ="fleaorder" exp ="(FLEA_TABLE_NAME)_(ORDER_ID)" desc ="Flea订单属性表分表规则" > <splits > <split key ="ONE" column ="order_id" seq ="SEQ" /> </splits > </table > </tables > </flea-table-split >
分表规则相关实现代码,可以移步 GitHub 查看 FleaSplitUtils##getSplitTable
2.3 JPA持久化单元配置 JPA持久化单元 ,包含了一组实体类的命名配置 和 数据源配置。实际使用中,一个 JPA持久化单元 一般对应一个数据库,其中<properties>
标签指定具体的数据库配置,包含驱动名、地址、用户和密码;<class>
标签指定该数据库下的表对应的实体类。<exclude-unlisted-classes>
标签,当设置为 true 时,只有列出的类和 jars 将被扫描持久类,否则封闭 jar 或目录也将被扫描。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 <?xml version="1.0" encoding="UTF-8" ?> <persistence version ="2.0" xmlns ="http://java.sun.com/xml/ns/persistence" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd" > <persistence-unit name ="fleaorder" transaction-type ="RESOURCE_LOCAL" > <provider > org.eclipse.persistence.jpa.PersistenceProvider</provider > <class > com.huazie.fleadbtest.jpa.split.entity.Order</class > <class > com.huazie.fleadbtest.jpa.split.entity.OrderAttr</class > <class > com.huazie.fleadbtest.jpa.split.entity.OldOrder</class > <class > com.huazie.fleadbtest.jpa.split.entity.OldOrderAttr</class > <exclude-unlisted-classes > true</exclude-unlisted-classes > <properties > <property name ="javax.persistence.jdbc.driver" value ="com.mysql.jdbc.Driver" /> <property name ="javax.persistence.jdbc.url" value ="jdbc:mysql://localhost:3306/fleaorder?useUnicode=true& characterEncoding=UTF-8" /> <property name ="javax.persistence.jdbc.user" value ="root" /> <property name ="javax.persistence.jdbc.password" value ="root" /> </properties > </persistence-unit > </persistence >
分库场景,模板库和分库都需要有一个对应的持久化单元配置,详见 接入演示的持久化单元配置 。
2.4 JPA相关Spring Bean配置 首先是JPA 固定的Spring Bean 配置,可查看 fleajpabeans-spring.xml ,配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 <?xml version="1.0" encoding="UTF-8" ?> <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" > <bean id ="defaultPersistenceManager" class ="org.springframework.orm.jpa.persistenceunit.DefaultPersistenceUnitManager" > <property name ="persistenceXmlLocations" > <list > <value > classpath:META-INF/fleajpa-persistence.xml</value > <value > classpath:META-INF/fleaorder-persistence.xml</value > <value > classpath:META-INF/fleaorder1-persistence.xml</value > <value > classpath:META-INF/fleaorder2-persistence.xml</value > </list > </property > </bean > <bean id ="defaultPersistenceProvider" class ="org.eclipse.persistence.jpa.PersistenceProvider" /> <bean id ="defaultVendorAdapter" class ="org.springframework.orm.jpa.vendor.EclipseLinkJpaVendorAdapter" > <property name ="showSql" value ="true" /> </bean > <bean id ="defaultJpaDialect" class ="org.springframework.orm.jpa.vendor.EclipseLinkJpaDialect" /> </beans >
与持久化单元对应的 Bean 配置,可查看 fleaorder-spring.xml ,配置 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 <?xml version="1.0" encoding="UTF-8" ?> <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx ="http://www.springframework.org/schema/tx" xsi:schemaLocation =" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd" > <bean id ="fleaOrderEntityManagerFactory" class ="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean" > <property name ="persistenceUnitManager" ref ="defaultPersistenceManager" /> <property name ="persistenceUnitName" value ="fleaorder" /> <property name ="persistenceProvider" ref ="defaultPersistenceProvider" /> <property name ="jpaVendorAdapter" ref ="defaultVendorAdapter" /> <property name ="jpaDialect" ref ="defaultJpaDialect" /> <property name ="jpaPropertyMap" > <map > <entry key ="eclipselink.weaving" value ="false" /> <entry key ="eclipselink.logging.thread" value ="true" /> </map > </property > </bean > <bean id ="fleaOrderTransactionManager" class ="org.springframework.orm.jpa.JpaTransactionManager" > <property name ="entityManagerFactory" ref ="fleaOrderEntityManagerFactory" /> </bean > <tx:annotation-driven transaction-manager ="fleaOrderTransactionManager" /> </beans >
3. 实现讲解 3.1 Flea自定义事务切面 Flea自定义事务切面 FleaTransactionalAspect ,拦截由自定义事务注解标记的 Spring注入 的方法, 实现在方法调用之前开启事务,调用成功后提交事务,出现异常回滚事务。
Flea自定义事务注解主要标记在两类方法上:
一类方法是,AbstractFleaJPADAOImpl 的子类的增删改方法;这些方法一般在 某某数据源DAO层实现类 中,注解中需要指定事务名。
另一类方法是,除了上一类方法的其他 Spring注入 的方法上;需要特别注意的是,自定义事务注解上不仅需要指定事务名、而且还需要指定持久化单元名;
如果存在分库的场景,在调用之前,需要设置当前线程下的分库序列值。
1 2 3 FleaLibUtil.setSplitLibSeqValue("SEQ" , "123123123" );
下面我贴出Flea 自定义事务切面的代码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 @Aspect @Component public class FleaTransactionalAspect { private static final String METHOD_NAME_GET_ENTITY_MANAGER = "getEntityManager" ; @Around("@annotation(com.huazie.fleaframework.db.jpa.transaction.FleaTransactional)") public Object invokeWithinTransaction (final ProceedingJoinPoint joinPoint) throws CommonException, FleaException, NoSuchMethodException { Method method = FleaAspectUtils.getTargetMethod(joinPoint); String transactionName = FleaEntityManager.getTransactionName(method); Object[] args = joinPoint.getArgs(); Object tObj = joinPoint.getTarget(); FleaEntity fleaEntity = null ; if (ArrayUtils.isNotEmpty(args)) { fleaEntity = getFleaEntityFromLastParam(args); } EntityManager entityManager; if (ObjectUtils.isNotEmpty(fleaEntity) && tObj instanceof AbstractFleaJPADAOImpl) { entityManager = (EntityManager) ReflectUtils.invoke(tObj, METHOD_NAME_GET_ENTITY_MANAGER, fleaEntity, Object.class); SplitTable splitTable = fleaEntity.get(DBConstants.LibTableSplitConstants.SPLIT_TABLE, SplitTable.class); SplitLib splitLib = fleaEntity.get(DBConstants.LibTableSplitConstants.SPLIT_LIB, SplitLib.class); if (ObjectUtils.isNotEmpty(splitTable)) { splitLib = splitTable.getSplitLib(); } if (ObjectUtils.isNotEmpty(splitLib) && splitLib.isExistSplitLib()) { transactionName = splitLib.getSplitLibTxName(); } } else { String unitName = FleaEntityManager.getUnitName(method); SplitLib splitLib = FleaSplitUtils.getSplitLib(unitName, FleaLibUtil.getSplitLibSeqValues()); if (splitLib.isExistSplitLib()) { transactionName = splitLib.getSplitLibTxName(); unitName = splitLib.getSplitLibName(); } entityManager = FleaEntityManager.getEntityManager(unitName, transactionName); } PlatformTransactionManager transactionManager = (PlatformTransactionManager) FleaApplicationContext.getBean(transactionName); ObjectUtils.checkEmpty(transactionManager, DaoException.class, "ERROR-DB-DAO0000000015" , transactionName); FleaTransactionTemplate transactionTemplate = new FleaTransactionTemplate (transactionManager, entityManager); return transactionTemplate.execute(new TransactionCallback <Object>() { @Override public Object doInTransaction (TransactionStatus status) { try { return joinPoint.proceed(); } catch (Throwable throwable) { ExceptionUtils.throwFleaException(FleaDBException.class, "Proceed with the next advice or target method invocation occurs exception : \n" , throwable); } return null ; } }); } }
在上述代码中,事务名 和 实体管理器 的获取是重点,因Flea 自定义事务注解标记在两类不同的方法上,这两者的获取也不一样。通过事务名可直接从Spring 配置中获取定义的事务管理器,事务名对应着Spring 配置中 transaction-manager
对应的属性值,详见 2.4 中 fleaorder-spring.xml 。
最后使用 Flea 事务模板,来实现标记 @FleaTransactional
的方法调用之前开启事务,调用成功后提交事务,出现异常回滚事务。
3.2 Flea事务模板 Flea事务模板 FleaTransactionTemplate ,参考 Spring 的 TransactionTemplate ,它是简化程序化事务划分和事务异常处理的模板类。其核心方法是 execute
, 参数是实现事务回调接口的事务代码。此模板收敛了处理事务生命周期和可能的异常的逻辑,因此事务回调接口的实现和调用代码都不需要显式处理事务。
下面将贴出其核心方法 execute
,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public <T> T execute (TransactionCallback<T> action) throws TransactionException { if (this .transactionManager instanceof CallbackPreferringPlatformTransactionManager) { return ((CallbackPreferringPlatformTransactionManager) this .transactionManager).execute(this , action); } else { TransactionStatus status = FleaJPASplitHelper.getHandler().getTransaction(this , transactionManager, entityManager); T result; try { result = action.doInTransaction(status); } catch (RuntimeException | Error ex) { rollbackOnException(status, ex); throw ex; } catch (Throwable ex) { rollbackOnException(status, ex); throw new UndeclaredThrowableException (ex, "TransactionCallback threw undeclared checked exception" ); } this .transactionManager.commit(status); return result; } } private void rollbackOnException (TransactionStatus status, Throwable ex) throws TransactionException { try { this .transactionManager.rollback(status); } catch (TransactionSystemException ex2) { ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException ex2) { throw ex2; } catch (Error err) { throw err; } }
3.3 Flea实体管理器 Flea 实体管理器工具类 FleaEntityManager ,提供了获取持久化上下文交互的实体管理器接口、持久化单元名、事务名、分表信息、各持久化上下文交互接口的静态方法【如: getFleaNextValue ,find ,remove ,merge ,persist ,flush 】。
下面我们来看下整体的代码:
public class FleaEntityManager { private static final ConcurrentMap<String, EntityManager> entityManagerMap = new ConcurrentHashMap <>(); private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal <>("EntityManager resources" ); private FleaEntityManager () { } public static EntityManager getEntityManager (String unitName, String transactionName) throws CommonException { StringUtils.checkBlank(unitName, DaoException.class, "ERROR-DB-DAO0000000002" , "libName" ); StringUtils.checkBlank(transactionName, DaoException.class, "ERROR-DB-DAO0000000002" , "transactionName" ); if (!entityManagerMap.containsKey(unitName)) { synchronized (entityManagerMap) { if (!entityManagerMap.containsKey(unitName)) { JpaTransactionManager manger = (JpaTransactionManager) FleaApplicationContext.getBean(transactionName); ObjectUtils.checkEmpty(manger, DaoException.class, "ERROR-DB-DAO0000000015" , transactionName); EntityManagerFactory entityManagerFactory = manger.getEntityManagerFactory(); EntityManager entityManager = SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory); entityManagerMap.put(unitName, entityManager); } } } return entityManagerMap.get(unitName); } public static String getPersistenceUnitName (Class<?> daoImplClazz) { } public static String getTransactionName (Class<?> daoImplClazz) { } public static String getTransactionName (Method method) { } public static String getUnitName (Method method) { } public static SplitTable getSplitTable (Object entity) throws CommonException { } public static Map<Object, Object> getResourceMap () { } public static boolean hasResource (Object key) { } public static Object getResource (Object key) { } private static Object doGetResource (Object actualKey) { } public static void bindResource (Object key, Object value) throws IllegalStateException { } public static Object unbindResource (Object key) throws IllegalStateException { } public static Object unbindResourceIfPossible (Object key) { return doUnbindResource(key); } private static Object doUnbindResource (Object actualKey) { } public static <T> Number getFleaNextValue (EntityManager entityManager, Class<T> entityClass, T entity) { return FleaJPASplitHelper.getHandler().getNextValue(entityManager, entityClass, entity); } public static <T> T find (EntityManager entityManager, Object primaryKey, Class<T> entityClass, T entity) { return FleaJPASplitHelper.getHandler().find(entityManager, primaryKey, entityClass, entity); } public static <T> boolean remove (EntityManager entityManager, T entity) { return FleaJPASplitHelper.getHandler().remove(entityManager, entity); } public static <T> T merge (EntityManager entityManager, T entity) { return FleaJPASplitHelper.getHandler().merge(entityManager, entity); } public static <T> void persist (EntityManager entityManager, T entity) { FleaJPASplitHelper.getHandler().persist(entityManager, entity); } public static <T> void flush (EntityManager entityManager, T entity) { FleaJPASplitHelper.getHandler().flush(entityManager, entity); } }
3.4 Flea JPA分库分表处理接口 IFleaJPASplitHandler ,从上面 3.3中,我们可以看到 Flea实体管理器中的各持久化上下文交互接口的静态方法【如: getFleaNextValue ,find ,remove ,merge ,persist ,flush 】都是调用 FleaJPASplitHelper.getHandler() 的对应方法实现的,也就是 IFleaJPASplitHandler 的对应方法。
Flea JPA 分库分表处理者接口,包含分库分表相关的处理接口方法、增删改查的数据操作接口方法。
下面我们来看看 Flea JPA分库分表处理接口 都有哪些处理方法?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public interface IFleaJPASplitHandler { void handle (FleaJPAQuery query, Object entity) throws CommonException; void handle (FleaJPAQuery query, TypedQuery typedQuery) throws CommonException; EntityManager handle (EntityManager entityManager, Object entity, boolean flag) throws CommonException; TransactionStatus getTransaction (TransactionDefinition definition, PlatformTransactionManager transactionManager, EntityManager entityManager) ; <T> Number getNextValue (EntityManager entityManager, Class<T> entityClass, T entity) ; <T> T find (EntityManager entityManager, Object primaryKey, Class<T> entityClass, T entity) ; <T> boolean remove (EntityManager entityManager, T entity) ; <T> T merge (EntityManager entityManager, T entity) ; <T> void persist (EntityManager entityManager, T entity) ; <T> void flush (EntityManager entityManager, T entity) ; }
3.5 EclipseLink分库分表处理实现 EclipseLink 分库分表处理者 EclipseLinkLibTableSplitHandler ,由自定义的实体管理器实现类处理增删改查等操作。
在讲解 EclipseLink 分库分表处理者之前,我们先了解下其父类 FleaLibTableSplitHandler ,该类实现了通用的分库分表处理 和 增删改查操作,同时定义了抽象的内部方法由子类实现具体的操作。
下面我们来看一下具体的代码,如下:
public abstract class FleaLibTableSplitHandler implements IFleaJPASplitHandler { @Override public void handle (FleaJPAQuery query, Object entity) throws CommonException { if (ObjectUtils.isEmpty(query) || ObjectUtils.isEmpty(entity) || !(entity instanceof FleaEntity)) { return ; } FleaEntity fleaEntity = (FleaEntity) entity; SplitTable splitTable = FleaEntityManager.getSplitTable(entity); SplitLib splitLib; if (splitTable.isExistSplitTable()) { splitLib = splitTable.getSplitLib(); fleaEntity.put(DBConstants.LibTableSplitConstants.SPLIT_TABLE, splitTable); } else { String libName = query.getPoolName(); if (ObjectUtils.isEmpty(libName)) { return ; } splitLib = FleaSplitUtils.getSplitLib(libName, FleaLibUtil.getSplitLibSeqValues()); } EntityManager splitEntityManager = handleInner(splitLib); EntityManager entityManager; if (ObjectUtils.isEmpty(splitEntityManager)) { entityManager = query.getEntityManager(); } else { entityManager = splitEntityManager; } if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { splitEntityManager = getFleaEntityMangerImpl(entityManager); } if (ObjectUtils.isNotEmpty(splitEntityManager)) { query.init(splitEntityManager, query.getSourceClazz(), query.getResultClazz()); } } @Override public void handle (FleaJPAQuery query, TypedQuery typedQuery) { SplitTable splitTable = getSplitTableFromEntity(query.getEntity()); if (!splitTable.isExistSplitTable()) { return ; } handleInner(query, typedQuery, splitTable); } @Override public EntityManager handle (EntityManager entityManager, Object entity, boolean flag) throws CommonException { if (ObjectUtils.isEmpty(entityManager) || ObjectUtils.isEmpty(entity) || !(entity instanceof FleaEntity)) { return entityManager; } SplitTable splitTable = FleaEntityManager.getSplitTable(entity); FleaEntity fleaEntity = (FleaEntity) entity; SplitLib splitLib; if (splitTable.isExistSplitTable()) { splitLib = splitTable.getSplitLib(); fleaEntity.put(DBConstants.LibTableSplitConstants.SPLIT_TABLE, splitTable); } else { String libName = fleaEntity.get(DBConstants.LibTableSplitConstants.FLEA_LIB_NAME, String.class); if (ObjectUtils.isEmpty(libName)) { return entityManager; } splitLib = FleaSplitUtils.getSplitLib(libName, FleaLibUtil.getSplitLibSeqValues()); fleaEntity.put(DBConstants.LibTableSplitConstants.SPLIT_LIB, splitLib); } if (flag && splitTable.isGeneratorFlag()) { return entityManager; } EntityManager splitEntityManager = handleInner(splitLib); if (ObjectUtils.isNotEmpty(splitEntityManager)) { entityManager = splitEntityManager; } return entityManager; } private EntityManager handleInner (SplitLib splitLib) throws CommonException { EntityManager entityManager = null ; if (ObjectUtils.isNotEmpty(splitLib) && splitLib.isExistSplitLib()) { String unitName = splitLib.getSplitLibName(); String transactionName = splitLib.getSplitLibTxName(); entityManager = FleaEntityManager.getEntityManager(unitName, transactionName); } return entityManager; } @Override public TransactionStatus getTransaction (TransactionDefinition definition, PlatformTransactionManager transactionManager, EntityManager entityManager) { JpaTransactionManager jpaTransactionManager = (JpaTransactionManager) transactionManager; Object obj = TransactionSynchronizationManager.getResource(jpaTransactionManager.getEntityManagerFactory()); if (ObjectUtils.isEmpty(obj)) { EntityManager fleaEntityManagerImpl = getFleaEntityMangerImpl(entityManager); EntityManagerHolder entityManagerHolder = new EntityManagerHolder (fleaEntityManagerImpl); TransactionSynchronizationManager.bindResource(jpaTransactionManager.getEntityManagerFactory(), entityManagerHolder); } return jpaTransactionManager.getTransaction(definition); } @Override public <T> Number getNextValue (EntityManager entityManager, Class<T> entityClass, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); return getNextValueInner(entityManager, entityClass, splitTable); } @Override public <T> T find (EntityManager entityManager, Object primaryKey, Class<T> entityClass, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); SplitLib splitLib = getSplitLibFromEntity(entity); T t; if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { t = findInner(entityManager, primaryKey, entityClass, splitTable); } else { t = entityManager.find(entityClass, primaryKey); } return t; } @Override public <T> boolean remove (EntityManager entityManager, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); SplitLib splitLib = getSplitLibFromEntity(entity); if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { removeInner(entityManager, entity); } else { if (!entityManager.contains(entity)) { entity = registerObject(entityManager, entity); } entityManager.remove(entity); } return true ; } @Override public <T> T merge (EntityManager entityManager, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); SplitLib splitLib = getSplitLibFromEntity(entity); if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { return mergeInner(entityManager, entity); } else { return entityManager.merge(entity); } } @Override public <T> void persist (EntityManager entityManager, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); SplitLib splitLib = getSplitLibFromEntity(entity); if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { persistInner(entityManager, entity); } else { entityManager.persist(entity); } } @Override public <T> void flush (EntityManager entityManager, T entity) { SplitTable splitTable = getSplitTableFromEntity(entity); SplitLib splitLib = getSplitLibFromEntity(entity); if (isFleaEntityManagerImpl(entityManager, splitTable, splitLib)) { flushInner(entityManager); } else { entityManager.flush(); } } private boolean isFleaEntityManagerImpl (EntityManager entityManager, SplitTable splitTable, SplitLib splitLib) { return (splitTable.isExistSplitTable() || splitLib.isExistSplitLib() || FleaEntityManager.hasResource(entityManager.getEntityManagerFactory())); } private SplitTable getSplitTableFromEntity (Object entity) { SplitTable splitTable = null ; if (ObjectUtils.isNotEmpty(entity) && (entity instanceof FleaEntity)) { FleaEntity fleaEntity = (FleaEntity) entity; splitTable = fleaEntity.get(DBConstants.LibTableSplitConstants.SPLIT_TABLE, SplitTable.class); } if (ObjectUtils.isEmpty(splitTable)) { splitTable = new SplitTable (); splitTable.setExistSplitTable(false ); } return splitTable; } private SplitLib getSplitLibFromEntity (Object entity) { SplitLib splitLib = null ; if (ObjectUtils.isNotEmpty(entity) && (entity instanceof FleaEntity)) { FleaEntity fleaEntity = (FleaEntity) entity; splitLib = fleaEntity.get(DBConstants.LibTableSplitConstants.SPLIT_LIB, SplitLib.class); } if (ObjectUtils.isEmpty(splitLib)) { splitLib = new SplitLib (); splitLib.setExistSplitLib(false ); } return splitLib; } protected abstract EntityManager getFleaEntityMangerImpl (EntityManager entityManager) ; protected abstract void handleInner (FleaJPAQuery query, TypedQuery typedQuery, SplitTable splitTable) ; protected abstract <T> Number getNextValueInner (EntityManager entityManager, Class<T> entityClass, SplitTable splitTable) ; protected abstract <T> T findInner (EntityManager entityManager, Object primaryKey, Class<T> entityClass, SplitTable splitTable) ; protected abstract <T> void removeInner (EntityManager entityManager, T entity) ; protected abstract <T> T mergeInner (EntityManager entityManager, T entity) ; protected abstract <T> void persistInner (EntityManager entityManager, T entity) ; protected abstract void flushInner (EntityManager entityManager) ; protected abstract <T> T registerObject (EntityManager entityManager, T entity) ; }
上面具体的 inner
方法实现,我们可以看到都使用了 FleaEntityManagerImpl ,这就是下面将要介绍的 Flea 实体管理器 EclipseLink 版实现。
下面我们来看看相关代码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class EclipseLinkLibTableSplitHandler extends FleaLibTableSplitHandler { @Override protected void handleInner (FleaJPAQuery query, TypedQuery typedQuery, SplitTable splitTable) { EntityType entityType = query.getRoot().getModel(); ClassDescriptor classDescriptor = ((EntityTypeImpl) entityType).getDescriptor(); AbstractSession abstractSession = ((FleaEntityManagerImpl) query.getEntityManager()).getDatabaseSession(); classDescriptor = ClassDescriptorUtils.getSplitDescriptor(classDescriptor, abstractSession, splitTable); ReadAllQuery readAllQuery = typedQuery.unwrap(ReadAllQuery.class); readAllQuery.setDescriptor(classDescriptor); readAllQuery.getExpressionBuilder().setQueryClassAndDescriptor(classDescriptor.getJavaClass(), classDescriptor); } @Override protected EntityManager getFleaEntityMangerImpl (EntityManager entityManager) { return FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager); } @Override protected <T> Number getNextValueInner (EntityManager entityManager, Class<T> entityClass, SplitTable splitTable) { return FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).getNextValue(entityClass, splitTable); } @Override protected <T> T findInner (EntityManager entityManager, Object primaryKey, Class<T> entityClass, SplitTable splitTable) { return FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).find(entityClass, primaryKey, splitTable); } @Override protected <T> void removeInner (EntityManager entityManager, T entity) { FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).remove(entity); } @Override protected <T> T mergeInner (EntityManager entityManager, T entity) { return FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).merge(entity); } @Override protected <T> void persistInner (EntityManager entityManager, T entity) { FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).persist(entity); } @Override protected void flushInner (EntityManager entityManager) { FleaEntityManagerImpl.getFleaEntityManagerImpl(entityManager).flush(); } @Override @SuppressWarnings("unchecked") protected <T> T registerObject (EntityManager entityManager, T entity) { if (entityManager.contains(entity) || ObjectUtils.isEmpty(entity)) { return entity; } return (T) entityManager.unwrap(UnitOfWork.class).registerObject(entity); } }
上面具体的 inner 方法实现,我们可以看到都使用了 FleaEntityManagerImpl ,这就是下面将要介绍的 Flea 实体管理器 EclipseLink 版实现。
3.6 Flea实体管理器EclipseLink版实现 Flea 实体管理器 EclipseLink 版实现 FleaEntityManagerImpl ,继承了 EclipseLink 的 EntityManagerImpl ,它需要有 一个 EntityManager 入参来构造。
下面我们来看一下相关代码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 public final class FleaEntityManagerImpl extends EntityManagerImpl { public static FleaEntityManagerImpl getFleaEntityManagerImpl (EntityManager entityManager) { EntityManagerFactory entityManagerFactory = entityManager.getEntityManagerFactory(); FleaEntityManagerImpl fleaEntityManagerImpl = (FleaEntityManagerImpl) FleaEntityManager.getResource(entityManagerFactory); if (ObjectUtils.isEmpty(fleaEntityManagerImpl)) { fleaEntityManagerImpl = new FleaEntityManagerImpl (entityManager); FleaEntityManager.bindResource(entityManagerFactory, fleaEntityManagerImpl); } return fleaEntityManagerImpl; } private FleaEntityManagerImpl (EntityManager entityManager) { super (entityManager.getEntityManagerFactory().unwrap(JpaEntityManagerFactory.class).unwrap(), entityManager.getProperties(), null ); } public <T> T find (Class<T> entityClass, Object primaryKey, SplitTable splitTable) { return find(entityClass, primaryKey, null , getQueryHints(entityClass, OperationType.FIND), splitTable); } public <T> T find (Class<T> entityClass, Object primaryKey, Map<String, Object> properties, SplitTable splitTable) { return find(entityClass, primaryKey, null , properties, splitTable); } public <T> T find (Class<T> entityClass, Object primaryKey, LockModeType lockMode, SplitTable splitTable) { return find(entityClass, primaryKey, lockMode, getQueryHints(entityClass, OperationType.FIND), splitTable); } @SuppressWarnings({"unchecked"}) public <T> T find (Class<T> entityClass, Object primaryKey, LockModeType lockMode, Map<String, Object> properties, SplitTable splitTable) { try { verifyOpen(); if (ObjectUtils.isNotEmpty(lockMode) && !lockMode.equals(LockModeType.NONE)) { checkForTransaction(true ); } AbstractSession session = this .databaseSession; ClassDescriptor descriptor = session.getDescriptor(entityClass); if (descriptor == null || descriptor.isDescriptorTypeAggregate()) { throw new IllegalArgumentException (ExceptionLocalization.buildMessage("unknown_bean_class" , new Object []{entityClass})); } if (!descriptor.shouldBeReadOnly() || !descriptor.isSharedIsolation()) { session = (AbstractSession) getActiveSession(); } else { session = (AbstractSession) getReadOnlySession(); } if (descriptor.hasTablePerMultitenantPolicy()) { descriptor = session.getDescriptor(entityClass); } descriptor = ClassDescriptorUtils.getSplitDescriptor(descriptor, session, splitTable); return (T) findInternal(descriptor, session, primaryKey, lockMode, properties); } catch (LockTimeoutException e) { throw e; } catch (RuntimeException e) { setRollbackOnly(); throw e; } } public <T> Number getNextValue (Class<T> entityClass, SplitTable splitTable) { AbstractSession session = this .databaseSession; ClassDescriptor descriptor = session.getDescriptor(entityClass); if (ObjectUtils.isEmpty(descriptor)) { throw new IllegalArgumentException (ExceptionLocalization.buildMessage("unknown_bean_class" , new Object []{entityClass})); } descriptor = ClassDescriptorUtils.getSplitDescriptor(descriptor, session, splitTable); Number nextValue; if (ObjectUtils.isNotEmpty(splitTable) && splitTable.isExistSplitTablePkColumn()) { String sequenceName = splitTable.getSplitTablePkColumnValue(); Sequencing sequencing = session.getSequencing(); FleaSequencingManager fleaSequencingManager = FleaSequencingManager.getFleaSequencingManager(sequenceName, sequencing, descriptor); nextValue = fleaSequencingManager.getNextValue(sequenceName); } else { nextValue = session.getNextSequenceNumberValue(entityClass); } return nextValue; } @Override public RepeatableWriteUnitOfWork getActivePersistenceContext (Object txn) { } }
4. 接入讲解 4.1 数据库和表 4.1.1 模板库 flea_id_generator 为主键生成器表,可查看笔者的这篇博文《flea-db使用之主键生成器表介绍》 ,不再赘述。
4.1.2 分库1
4.1.3 分库2
具体的SQL文件,请参考 fleaorder.sql ,fleaorder1.sql ,fleaorder2.sql
4.2 各实体类
4.3 FleaOrder数据源DAO层父类 FleaOrderDAOImpl ,该类继承 AbstractFleaJPADAOImpl ,成员变量 entityManager ,由 PersistenceContext 注解标记 持久化单元名,这里为模板持久化单元名。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 public class FleaOrderDAOImpl <T> extends AbstractFleaJPADAOImpl <T> { @PersistenceContext(unitName="fleaorder") private EntityManager entityManager; @Override @FleaTransactional("fleaOrderTransactionManager") public Number getFleaNextValue (T entity) throws CommonException { return super .getFleaNextValue(entity); } @Override @FleaTransactional(value = "fleaOrderTransactionManager", unitName = "fleaorder") public boolean remove (long entityId) throws CommonException { return super .remove(entityId); } @Override @FleaTransactional(value = "fleaOrderTransactionManager", unitName = "fleaorder") public boolean remove (String entityId) throws CommonException { return super .remove(entityId); } @Override @FleaTransactional("fleaOrderTransactionManager") public boolean remove (T entity) throws CommonException { return super .remove(entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public boolean remove (long entityId, T entity) throws CommonException { return super .remove(entityId, entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public boolean remove (String entityId, T entity) throws CommonException { return super .remove(entityId, entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public T update (T entity) throws CommonException { return super .update(entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public List<T> batchUpdate (List<T> entities) throws CommonException { return super .batchUpdate(entities); } @Override @FleaTransactional("fleaOrderTransactionManager") public void save (T entity) throws CommonException { super .save(entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public void batchSave (List<T> entities) throws CommonException { super .batchSave(entities); } @Override @FleaTransactional("fleaOrderTransactionManager") public int insert (String relationId, T entity) throws CommonException { return super .insert(relationId, entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public int update (String relationId, T entity) throws CommonException { return super .update(relationId, entity); } @Override @FleaTransactional("fleaOrderTransactionManager") public int delete (String relationId, T entity) throws CommonException { return super .delete(relationId, entity); } @Override protected EntityManager getEntityManager () { return entityManager; } }
4.4 各实体的DAO层接口和实现 可至 GitHub 查看如下 DAO层代码 :
4.5 各实体的SV层接口和实现 可至 GitHub 查看如下 SV层代码 :
5. 单元测试 测试之前,先添加主键生成器表中的数据如下:
id_generator_key
id_generator_value
pk_old_order
999999999
pk_order
999999999
5.1 分库分表测试 分库分表测试类 OrderTest
下面我们来看下,分库分表的新增、查询、更新 和 查询,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext.xml"}) public class OrderTest { private static final Logger LOGGER = FleaLoggerProxy.getProxyInstance(OrderTest.class); @Resource(name = "orderSV") private IOrderSV orderSV; @Test public void testInsertOrder () throws Exception { Order order = new Order (); order.setOrderName("测试订单" ); order.setOrderPrice(0L ); order.setOrderState(0 ); order.setOrderDate(DateUtils.getCurrentTime()); Long orderId = (Long) orderSV.getFleaNextValue(null ); order.setOrderId(orderId); orderSV.save(order); } @Test public void testQueryOrder () throws Exception { long orderId = 1000000000L ; Order order = new Order (); order.setOrderId(orderId); order = orderSV.query(orderId, order); LOGGER.debug("Order = {}" , order); } @Test public void testUpdateOrder () throws Exception { long orderId = 1000000000L ; Order order = new Order (); order.setOrderId(orderId); Set<String> attrNames = new HashSet <>(); attrNames.add("orderId" ); List<Order> orderList = orderSV.query(attrNames, order); if (CollectionUtils.isNotEmpty(orderList)) { order = orderList.get(0 ); LOGGER.debug("Before : {}" , order); order.setOrderName("修改订单" ); order.setOrderPrice(100L ); order.setOrderState(1 ); orderSV.update(order); } Order order1 = new Order (); order1.setOrderId(orderId); order1 = orderSV.query(orderId, order1); LOGGER.debug("After : {}" , order1); } @Test public void testDeleteOrder () throws Exception { long orderId = 1000000000L ; Order order = new Order (); order.setOrderId(orderId); Set<String> attrNames = new HashSet <>(); attrNames.add("orderId" ); List<Order> orderList = orderSV.query(attrNames, order); if (CollectionUtils.isNotEmpty(orderList)) { Order order1 = orderList.get(0 ); LOGGER.error("Before : {}" , order1); orderSV.remove(order1); } Order order2 = orderSV.query(orderId, order); LOGGER.error("After : {}" , order2); } }
5.2 分库测试 如果只分库,不分表,需要再执行具体的增删改查之前,设置分库序列值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:applicationContext.xml"}) public class OldOrderTest { private static final Logger LOGGER = FleaLoggerProxy.getProxyInstance(OldOrderTest.class); @Resource(name = "oldOrderSV") private IOldOrderSV oldOrderSV; @Test public void testInsertOldOrder () throws Exception { OldOrder oldOrder = new OldOrder (); oldOrder.setOrderName("测试旧订单1" ); oldOrder.setOrderPrice(200L ); oldOrder.setOrderState(0 ); oldOrder.setOrderDate(DateUtils.getCurrentTime()); Long orderId = (Long) oldOrderSV.getFleaNextValue(null ); oldOrder.setOrderId(orderId); FleaLibUtil.setSplitLibSeqValue("SEQ" , orderId); oldOrderSV.save(oldOrder); } @Test public void testQueryOldOrder () throws Exception { long orderId = 1000000000L ; OldOrder oldOrder = new OldOrder (); FleaLibUtil.setSplitLibSeqValue("SEQ" , orderId); oldOrder = oldOrderSV.query(orderId, oldOrder); LOGGER.debug("Order = {}" , oldOrder); } @Test public void testUpdateOldOrder () throws Exception { long orderId = 1000000000L ; FleaLibUtil.setSplitLibSeqValue("SEQ" , orderId); OldOrder oldOrder = new OldOrder (); oldOrder.setOrderId(orderId); Set<String> attrNames = new HashSet <>(); attrNames.add("orderId" ); List<OldOrder> oldOrderList = oldOrderSV.query(attrNames, oldOrder); if (CollectionUtils.isNotEmpty(oldOrderList)) { oldOrder = oldOrderList.get(0 ); LOGGER.debug("Before : {}" , oldOrder); oldOrder.setOrderName("修改旧订单1" ); oldOrder.setOrderPrice(200L ); oldOrder.setOrderState(2 ); oldOrderSV.update(oldOrder); } OldOrder oldOrder1 = new OldOrder (); oldOrder1 = oldOrderSV.query(orderId, oldOrder1); LOGGER.debug("After : {}" , oldOrder1); } @Test public void testDeleteOldOrder () throws Exception { long orderId = 1000000000L ; FleaLibUtil.setSplitLibSeqValue("SEQ" , orderId); OldOrder oldOrder = new OldOrder (); oldOrder.setOrderId(orderId); Set<String> attrNames = new HashSet <>(); attrNames.add("orderId" ); List<OldOrder> orderList = oldOrderSV.query(attrNames, oldOrder); if (CollectionUtils.isNotEmpty(orderList)) { OldOrder oldOrder1 = orderList.get(0 ); LOGGER.error("Before : {}" , oldOrder1); oldOrderSV.remove(oldOrder1); } OldOrder oldOrder2 = new OldOrder (); oldOrder2 = oldOrderSV.query(orderId, oldOrder2); LOGGER.error("After : {}" , oldOrder2); } }
5.3 JPA事务演示 首先我们先看下如何在 除了数据源DAO层实现类之外 的方法上使用自定的事务注解 @FleaTransactional
, 可至 GitHub 查看如下代码 : 这里贴出关键使用代码如下:
其中,value 的值为 模板库事务名 ,unitName 的值为 模板库持久化单元名 【也为对应 模板库名 】
1 2 3 4 5 @Override @FleaTransactional(value = "fleaOrderTransactionManager", unitName = "fleaorder") public void orderTransaction (Long orderId) throws CommonException { }
那上面该如何调用呢?
1 2 3 4 5 6 7 8 9 10 @Test public void testTransaction () throws Exception { long orderId = 1000000000L ; FleaLibUtil.setSplitLibSeqValue("SEQ" , orderId); fleaOrderModuleSV.orderTransaction(orderId); }
总结 经历了几版的重构,分库分表的实现终于可以和大家见面了,后续我将继续优化和完善 Flea Framework 这个框架,希望看到的朋友,可以多多支持!!!这篇博文内容有点多,能看到最后属实不容易,再次感谢大家的支持!!!