淘宝做海淘产品 网站折扣变化快,重庆建设部网站,wordpress nickname,把网站扒下来以后怎么做在单体的应用开发场景中涉及并发同步时#xff0c;大家往往采用Synchronized#xff08;同步#xff09;或同一个JVM内Lock机制来解决多线程间的同步问题。而在分布式集群工作的开发场景中#xff0c;就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题…在单体的应用开发场景中涉及并发同步时大家往往采用Synchronized同步或同一个JVM内Lock机制来解决多线程间的同步问题。而在分布式集群工作的开发场景中就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题这种跨机器的锁就是分布式锁。接下来本文将为大家分享分布式锁的最佳实践。
一、超卖问题复现
1.1 现象
存在如下的几张表 商品表 订单表 订单item表 商品的库存为1但是并发高的时候有多笔订单。
错误案例一数据库update相互覆盖 直接在内存中判断是否有库存计算扣减之后的值更新数据库并发的情况下会导致相互覆盖发生
Transactional(rollbackFor Exception.class)
public Long createOrder() throws Exception {Product product productMapper.selectByPrimaryKey(purchaseProductId);// ... 忽略校验逻辑//商品当前库存Integer currentCount product.getCount();//校验库存if (purchaseProductNum currentCount) {throw new Exception(商品 purchaseProductId 仅剩 currentCount 件无法购买);}// 计算剩余库存Integer leftCount currentCount - purchaseProductNum;// 更新库存product.setCount(leftCount);product.setGmtModified(new Date());productMapper.updateByPrimaryKeySelective(product);Order order new Order();// ... 省略 SetorderMapper.insertSelective(order);OrderItem orderItem new OrderItem();orderItem.setOrderId(order.getId());// ... 省略 Setreturn order.getId();
}错误案例二扣减串行执行但是库存被扣减为负数
在 SQL 中加入运算避免值的相互覆盖但是库存的数量变为负数因为校验库存是否足够还是在内存中执行的并发情况下都会读到有库存 Transactional(rollbackFor Exception.class)
public Long createOrder() throws Exception {Product product productMapper.selectByPrimaryKey(purchaseProductId);// ... 忽略校验逻辑//商品当前库存Integer currentCount product.getCount();//校验库存if (purchaseProductNum currentCount) {throw new Exception(商品 purchaseProductId 仅剩 currentCount 件无法购买);}// 使用 set count count - #{purchaseProductNum,jdbcTypeINTEGER}, 更新库存productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId());Order order new Order();// ... 省略 SetorderMapper.insertSelective(order);OrderItem orderItem new OrderItem();orderItem.setOrderId(order.getId());// ... 省略 Setreturn order.getId();
}错误案例三使用 synchronized 实现内存中串行校验但是依旧扣减为负数 因为我们使用的是事务的注解synchronized加在方法上方法执行结束的时候锁就会释放此时的事务还没有提交另一个线程拿到这把锁之后就会有一次扣减导致负数。
Transactional(rollbackFor Exception.class)
public synchronized Long createOrder() throws Exception {Product product productMapper.selectByPrimaryKey(purchaseProductId);// ... 忽略校验逻辑//商品当前库存Integer currentCount product.getCount();//校验库存if (purchaseProductNum currentCount) {throw new Exception(商品 purchaseProductId 仅剩 currentCount 件无法购买);}// 使用 set count count - #{purchaseProductNum,jdbcTypeINTEGER}, 更新库存productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId());Order order new Order();// ... 省略 SetorderMapper.insertSelective(order);OrderItem orderItem new OrderItem();orderItem.setOrderId(order.getId());// ... 省略 Setreturn order.getId();
}1.2 解决办法
从上面造成问题的原因来看只要是扣减库存的动作不是原子性的。多个线程同时操作就会有问题。
单体应用使用本地锁 数据库中的行锁解决分布式应用使用数据库中的乐观锁加一个 version 字段利用CAS来实现会导致大量的 update 失败使用数据库维护一张锁的表 悲观锁 select使用 select for update 实现使用Redis 的 setNX实现分布式锁使用zookeeper的watcher 有序临时节点来实现可阻塞的分布式锁使用Redisson框架内的分布式锁来实现使用curator 框架内的分布式锁来实现二、单体应用解决超卖的问题
正确示例将事务包含在锁的控制范围内
保证在锁释放之前事务已经提交。
//Transactional(rollbackFor Exception.class)
public synchronized Long createOrder() throws Exception {TransactionStatus transaction1 platformTransactionManager.getTransaction(transactionDefinition);Product product productMapper.selectByPrimaryKey(purchaseProductId);if (product null) {platformTransactionManager.rollback(transaction1);throw new Exception(购买商品 purchaseProductId 不存在);}//商品当前库存Integer currentCount product.getCount();//校验库存if (purchaseProductNum currentCount) {platformTransactionManager.rollback(transaction1);throw new Exception(商品 purchaseProductId 仅剩 currentCount 件无法购买);}productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId());Order order new Order();// ... 省略 SetorderMapper.insertSelective(order);OrderItem orderItem new OrderItem();orderItem.setOrderId(order.getId());// ... 省略 Setreturn order.getId();platformTransactionManager.commit(transaction1);
}正确示例使用synchronized的代码块
public Long createOrder() throws Exception {Product product null;//synchronized (this) {//synchronized (object) {synchronized (DBOrderService2.class) {TransactionStatus transaction1 platformTransactionManager.getTransaction(transactionDefinition);product productMapper.selectByPrimaryKey(purchaseProductId);if (product null) {platformTransactionManager.rollback(transaction1);throw new Exception(购买商品 purchaseProductId 不存在);}//商品当前库存Integer currentCount product.getCount();System.out.println(Thread.currentThread().getName() 库存数 currentCount);//校验库存if (purchaseProductNum currentCount) {platformTransactionManager.rollback(transaction1);throw new Exception(商品 purchaseProductId 仅剩 currentCount 件无法购买);}productMapper.updateProductCount(purchaseProductNum, new Date(), product.getId());platformTransactionManager.commit(transaction1);}TransactionStatus transaction2 platformTransactionManager.getTransaction(transactionDefinition);Order order new Order();// ... 省略 SetorderMapper.insertSelective(order);OrderItem orderItem new OrderItem();// ... 省略 SetorderItemMapper.insertSelective(orderItem);platformTransactionManager.commit(transaction2);return order.getId();正确示例使用Lock
private Lock lock new ReentrantLock();public Long createOrder() throws Exception{ Product product null;lock.lock();TransactionStatus transaction1 platformTransactionManager.getTransaction(transactionDefinition);try {product productMapper.selectByPrimaryKey(purchaseProductId);if (productnull){throw new Exception(购买商品purchaseProductId不存在);}//商品当前库存Integer currentCount product.getCount();System.out.println(Thread.currentThread().getName()库存数currentCount);//校验库存if (purchaseProductNum currentCount){throw new Exception(商品purchaseProductId仅剩currentCount件无法购买);}productMapper.updateProductCount(purchaseProductNum,new Date(),product.getId());platformTransactionManager.commit(transaction1);} catch (Exception e) {platformTransactionManager.rollback(transaction1);} finally {// 注意抛异常的时候锁释放不掉分布式锁也一样都要在这里删掉lock.unlock();}TransactionStatus transaction platformTransactionManager.getTransaction(transactionDefinition);Order order new Order();// ... 省略 SetorderMapper.insertSelective(order);OrderItem orderItem new OrderItem();// ... 省略 SetorderItemMapper.insertSelective(orderItem);platformTransactionManager.commit(transaction);return order.getId();
}
三、常见分布式锁的使用
上面使用的方法只能解决单体项目当部署多台机器的时候就会失效因为锁本身就是单机的锁所以需要使用分布式锁来实现。 3.1 数据库乐观锁
数据库中的乐观锁加一个version字段利用CAS来实现乐观锁的方式支持多台机器并发安全。但是并发量大的时候会导致大量的update失败 3.2 数据库分布式锁
db操作性能较差并且有锁表的风险一般不考虑。 3.2.1 简单的数据库锁 select for update 直接在数据库新建一张表 锁的code预先写到数据库中抢锁的时候使用select for update查询锁对应的key也就是这里的code阻塞就说明别人在使用锁。
// 加上事务就是为了 for update 的锁可以一直生效到事务执行结束
// 默认回滚的是 RunTimeException
Transactional(rollbackFor Exception.class)
public String singleLock() throws Exception {log.info(我进入了方法);DistributeLock distributeLock distributeLockMapper.selectDistributeLock(demo);if (distributeLocknull) {throw new Exception(分布式锁找不到);}log.info(我进入了锁);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return 我已经执行完成;
}select idselectDistributeLock resultTypecom.deltaqin.distribute.model.DistributeLockselect * from distribute_lockwhere businessCode #{businessCode,jdbcTypeVARCHAR}for update
/select使用唯一键作为限制插入一条数据其他待执行的SQL就会失败当数据删除之后再去获取锁 这是利用了唯一索引的排他性。 insert lock 直接维护一张锁表
Autowired
private MethodlockMapper methodlockMapper;Override
public boolean tryLock() {try {//插入一条数据 insert intomethodlockMapper.insert(new Methodlock(lock));}catch (Exception e){//插入失败return false;}return true;
}Override
public void waitLock() {try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}
}Override
public void unlock() {//删除数据 deletemethodlockMapper.deleteByMethodlock(lock);System.out.println(-------释放锁------);3.3 Redis setNx
Redis 原生支持的保证只有一个会话可以设置成功因为Redis自己就是单线程串行执行的。
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId
/dependency
spring.redis.hostlocalhost封装一个锁对象
Slf4j
public class RedisLock implements AutoCloseable {private RedisTemplate redisTemplate;private String key;private String value;//单位秒private int expireTime;/*** 没有传递 value因为直接使用的是随机值*/public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){this.redisTemplate redisTemplate;this.key key;this.expireTimeexpireTime;this.value UUID.randomUUID().toString();}/*** JDK 1.7 之后的自动关闭的功能*/Overridepublic void close() throws Exception {unLock();}/*** 获取分布式锁* SET resource_name my_random_value NX PX 30000* 每一个线程对应的随机值 my_random_value 不一样用于释放锁的时候校验* NX 表示 key 不存在的时候成功key 存在的时候设置不成功Redis 自己是单线程串行执行的第一个执行的才可以设置成功* PX 表示过期时间没有设置的话忘记删除就会永远不过期*/public boolean getLock(){RedisCallbackBoolean redisCallback connection - {//设置NXRedisStringCommands.SetOption setOption RedisStringCommands.SetOption.ifAbsent();//设置过期时间Expiration expiration Expiration.seconds(expireTime);//序列化keybyte[] redisKey redisTemplate.getKeySerializer().serialize(key);//序列化valuebyte[] redisValue redisTemplate.getValueSerializer().serialize(value);//执行setnx操作Boolean result connection.set(redisKey, redisValue, expiration, setOption);return result;};//获取分布式锁Boolean lock (Boolean)redisTemplate.execute(redisCallback);return lock;}/*** 释放锁的时候随机数相同的时候才可以释放避免释放了别人设置的锁自己的已经过期了所以别人才可以设置成功* 释放的时候采用 LUA 脚本因为 delete 没有原生支持删除的时候校验值证明是当前线程设置进去的值* 脚本是在官方文档里面有的*/public boolean unLock() {// key 是自己才可以释放不是就不能释放别人的锁String script if redis.call(\get\,KEYS[1]) ARGV[1] then\n return redis.call(\del\,KEYS[1])\n else\n return 0\n end;RedisScriptBoolean redisScript RedisScript.of(script,Boolean.class);ListString keys Arrays.asList(key);// 执行脚本的时候传递的 value 就是对应的值Boolean result (Boolean)redisTemplate.execute(redisScript, keys, value);log.info(释放锁的结果result);return result;}
}每次获取的时候自己线程需要new对应的RedisLock
public String redisLock(){log.info(我进入了方法);try (RedisLock redisLock new RedisLock(redisTemplate,redisKey,30)){if (redisLock.getLock()) {log.info(我进入了锁);Thread.sleep(15000);}} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}log.info(方法执行完成);return 方法执行完成;
}3.4 zookeeper 瞬时znode节点 watcher监听机制
临时节点具备数据自动删除的功能。当client与ZooKeeper连接和session断掉时相应的临时节点就会被删除。zk有瞬时和持久节点瞬时节点不可以有子节点。会话结束之后瞬时节点就会消失基于zk的瞬时有序节点实现分布式锁 多线程并发创建瞬时节点的时候得到有序的序列序号最小的线程可以获得锁
其他的线程监听自己序号的前一个序号。前一个线程执行结束之后删除自己序号的节点
下一个序号的线程得到通知继续执行
以此类推创建节点的时候就确认了线程执行的顺序。
dependencygroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactIdversion3.4.14/versionexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactId/exclusion/exclusions
/dependencyzk 的观察器只可以监控一次数据发生变化之后可以发送给客户端之后需要再次设置监控。exists、create、getChildren三个方法都可以添加watcher 也就是在调用方法的时候传递true就是添加监听。注意这里Lock 实现了Watcher和AutoCloseable
当前线程创建的节点是第一个节点就获得锁否则就监听自己的前一个节点的事件
/*** 自己本身就是一个 watcher可以得到通知* AutoCloseable 实现自动关闭资源不使用的时候*/
Slf4j
public class ZkLock implements AutoCloseable, Watcher {private ZooKeeper zooKeeper;/*** 记录当前锁的名字*/private String znode;public ZkLock() throws IOException {this.zooKeeper new ZooKeeper(localhost:2181,10000,this);}public boolean getLock(String businessCode) {try {//创建业务 根节点Stat stat zooKeeper.exists(/ businessCode, false);if (statnull){zooKeeper.create(/ businessCode,businessCode.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}//创建瞬时有序节点 /order/order_00000001znode zooKeeper.create(/ businessCode / businessCode _, businessCode.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//获取业务节点下 所有的子节点ListString childrenNodes zooKeeper.getChildren(/ businessCode, false);//获取序号最小的第一个子节点Collections.sort(childrenNodes);String firstNode childrenNodes.get(0);//如果创建的节点是第一个子节点则获得锁if (znode.endsWith(firstNode)){return true;}//如果不是第一个子节点则监听前一个节点String lastNode firstNode;for (String node:childrenNodes){if (znode.endsWith(node)){zooKeeper.exists(/businessCode/lastNode,true);break;}else {lastNode node;}}synchronized (this){wait();}return true;} catch (Exception e) {e.printStackTrace();}return false;}Overridepublic void close() throws Exception {zooKeeper.delete(znode,-1);zooKeeper.close();log.info(我已经释放了锁);}Overridepublic void process(WatchedEvent event) {if (event.getType() Event.EventType.NodeDeleted){synchronized (this){notify();}}}
}3.5 zookeeper curator
在实际的开发中不建议去自己“重复造轮子”而建议直接使用Curator客户端中的各种官方实现的分布式锁例如其中的InterProcessMutex可重入锁。
dependencygroupIdorg.apache.curator/groupIdartifactIdcurator-recipes/artifactIdversion4.2.0/versionexclusionsexclusionartifactIdslf4j-api/artifactIdgroupIdorg.slf4j/groupId/exclusion/exclusions
/dependency
Bean(initMethodstart,destroyMethod close)
public CuratorFramework getCuratorFramework() {RetryPolicy retryPolicy new ExponentialBackoffRetry(1000, 3);CuratorFramework client CuratorFrameworkFactory.newClient(localhost:2181, retryPolicy);return client;
}框架已经实现了分布式锁。zk的Java客户端升级版。使用的时候直接指定重试的策略就可以。
官网中分布式锁的实现是在curator-recipes依赖中不要引用错了。
Autowired
private CuratorFramework client;Test
public void testCuratorLock(){InterProcessMutex lock new InterProcessMutex(client, /order);try {if ( lock.acquire(30, TimeUnit.SECONDS) ) {try {log.info(我获得了锁);}finally {lock.release();}}} catch (Exception e) {e.printStackTrace();}client.close();
}3.6 Redission
重新实现了Java并发包下处理并发的类让其可以跨JVM使用例如CHM等。 3.6.1 非SpringBoot项目引入 https://redisson.org/ 引入Redisson的依赖然后配置对应的XML即可
dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.11.2/versionexclusionsexclusionartifactIdslf4j-api/artifactIdgroupIdorg.slf4j/groupId/exclusion/exclusions
/dependency编写相应的redisson.xml
beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexmlns:contexthttp://www.springframework.org/schema/contextxmlns:redissonhttp://redisson.org/schema/redissonxsi:schemaLocationhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context.xsdhttp://redisson.org/schema/redissonhttp://redisson.org/schema/redisson/redisson.xsd
redisson:clientredisson:single-server addressredis://127.0.0.1:6379//redisson:client
/beans配置对应ImportResource(“classpath*:redisson.xml”)资源文件。
3.6.2 SpringBoot项目引入 或者直接使用springBoot的starter即可。 https://github.com/redisson/redisson/tree/master/redisson-spring-boot-starter
dependencygroupIdorg.redisson/groupIdartifactIdredisson-spring-boot-starter/artifactIdversion3.19.1/version
/dependency修改application.properties即可#spring.redis.host 3.6.3 设置配置类
Bean
public RedissonClient getRedissonClient() {Config config new Config();config.useSingleServer().setAddress(redis://127.0.0.1:6379);return Redisson.create(config);
}3.6.4 使用
Test
public void testRedissonLock() {RLock rLock redisson.getLock(order);try {rLock.lock(30, TimeUnit.SECONDS);log.info(我获得了锁);Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}finally {log.info(我释放了锁);rLock.unlock();}
}