当前位置: 首页 > news >正文

网站建设的请示中国互联网公司排名100强

网站建设的请示,中国互联网公司排名100强,wordpress装了英文版怎么转中文,网站建设的岗位职责分布锁之RedLock 锁住你的心我的爱 #x1f682;为什么需要使用 RedLock锁被误释放时钟不一致问题锁的“延迟释放”而不是死锁Redlock是啥redlock 存在什么问题惊群效应时钟漂移Redisson 实现 RedLock在 Redisson 中, RedLock的实现类是哪一个类?这一招叫抛砖引玉springboot … 分布锁之RedLock 锁住你的心我的爱 为什么需要使用 RedLock锁被误释放时钟不一致问题锁的“延迟释放”而不是死锁Redlock是啥redlock 存在什么问题惊群效应时钟漂移Redisson 实现 RedLock在 Redisson 中, RedLock的实现类是哪一个类?这一招叫抛砖引玉springboot 中呢?源码分析我是一个在热爱生活的人啊不想随随便便找一个人搭伙过日子我想找一个就算喝了酒眼睛里也有光会给我讲远方和诗的人 为什么需要使用 RedLock 为什么需要 redis 分布式锁使用 RedLock, 原来的使用 SetNX 实现分布式锁有什么问题 在 Redis 中使用 SETNX 命令可以实现基本的分布式锁但是它存在以下问题 SETNX 命令只能实现单节点的分布式锁无法支持分布式部署的 Redis 集群。 SETNX 命令在锁释放时存在问题例如锁的过期时间设置过短可能会导致锁被误释放多个客户端同时获取锁的情况也存在竞争问题。 SETNX 命令在锁释放时没有考虑当前持有锁的客户端与其他客户端的时钟不一致问题这可能会导致锁被错误地释放或者无法被释放的情况。 为了解决上述问题需要使用分布式锁的 RedLock 实现它采用了多个 Redis 节点之间的协作来实现锁的分布式控制更加可靠和健壮。 假设某个客户端获得锁后在锁的有效期内由于一些原因其时钟与其他客户端的时钟不同步或者与 Redis 服务器的时钟不同步。这时如果使用 SETNX 命令来尝试释放锁就可能会出现以下两种情况 如果客户端的时钟比 Redis 服务器的时钟快则客户端在尝试释放锁时可能会误认为锁已经过期从而错误地释放锁导致其他客户端可以获得锁从而可能会出现数据不一致等问题。 如果客户端的时钟比 Redis 服务器的时钟慢则客户端在尝试释放锁时可能会因为认为锁还没有到期而不释放锁从而导致其他客户端一直无法获得锁从而可能会出现锁的“延迟释放”等问题。 锁被误释放 假设客户端 A 和客户端 B 都需要获取 Redis 中的同一个锁并且客户端 A 的时钟比 Redis 服务器的时钟快 10 秒钟而客户端 B 的时钟与 Redis 服务器的时钟保持一致。 客户端 A 获取锁 客户端 A 使用 SETNX 命令获取了锁并设置了过期时间为 30 秒。 客户端 B 获取锁 由于客户端 B 的时钟与 Redis 服务器的时钟保持一致因此客户端 B 获取锁时发现锁已经被客户端 A 获取了于是等待锁释放。 客户端 A 释放锁 当锁的过期时间到达时客户端 A 会尝试释放锁。由于客户端 A 的时钟比 Redis 服务器的时钟快 10 秒钟客户端 A 会认为锁已经过期从而错误地释放了锁。 客户端 B 获取锁成功 由于客户端 A 错误地释放了锁因此客户端 B 成功地获取了锁并开始执行相应的业务逻辑。 这个例子说明了如果客户端的时钟比 Redis 服务器的时钟快则可能会误释放锁导致其他客户端可以获得锁从而可能会出现数据不一致等问题。 时钟不一致问题 在使用 SETNX 实现 Redis 分布式锁时确实可能因为客户端时钟与 Redis 服务器时钟不一致导致死锁等问题。死锁的原因主要是因为客户端在判断锁是否过期时依赖于自己的时钟。如果客户端的时钟比 Redis 服务器的时钟慢可能会出现客户端错误地认为锁未过期的情况。 以下是一个例子来说明这种情况 客户端 A 获得锁设置锁的过期时间为 30 秒。客户端 A 的时钟比 Redis 服务器的时钟慢 10 秒。在锁过期之前的 20 秒时客户端 A 完成任务并尝试释放锁。由于客户端 A 的时钟慢此时它认为锁还有 10 秒才过期因此它不会释放锁。同时客户端 B 试图获得锁但由于客户端 A 尚未释放锁客户端 B 无法获得锁。客户端 A 最终在自己的时钟上等待了 30 秒后释放锁但此时已经过了 40 秒Redis 服务器的时钟导致客户端 B 等待了很长时间。 在这个例子中我们可以看到客户端 A 的时钟比 Redis 服务器慢导致锁的释放时间被延迟。这可能会导致其他客户端如客户端 B长时间等待锁从而影响整个系统的性能。 为了避免这种情况可以考虑使用 Redlock 算法。Redlock 算法是 Redis 官方推荐的一种分布式锁实现它可以在一定程度上解决客户端时钟与 Redis 服务器时钟不一致的问题。具体来说Redlock 算法要求在多个独立的 Redis 实例上同时获得锁并在大多数实例上成功获得锁时才认为锁已成功获得。这种方式能够降低单个客户端时钟不同步的影响。 锁的“延迟释放”而不是死锁 死锁是指多个进程或线程在争夺资源时相互等待的状态导致整个系统无法继续进行。在您的问题中虽然客户端 A 未正确释放锁导致客户端 B 无法获取锁但最终客户端 A 还是会释放锁所以不能严格地称为死锁。我们可以将这种情况称为锁的“延迟释放”或“拖延”导致其他客户端需要等待更长时间才能获得锁。 判断死锁的必要条件通常包括以下几点 互斥条件资源只能被一个进程或线程占有无法被其他进程共享。请求与保持条件一个进程在请求新资源的同时保持对已分配资源的占有。不可剥夺条件资源不能被强制从占有者手中回收只能由占有者自愿释放。循环等待条件存在一个进程或线程等待序列其中每个进程都在等待下一个进程释放资源。 在您提到的场景中尽管存在资源竞争和等待但由于客户端 A 最终还是会释放锁所以不满足死锁的循环等待条件。 Redlock是啥 Redlock是Redis官方提供的一种分布式锁算法它基于Paxos算法和Quorum原理可以在Redis集群环境下保证互斥性和可用性。下面是Redlock算法的基本原理 获取当前时间戳T1。 依次尝试在N个Redis节点上获取锁并记录获取锁的节点数M和最小的锁超时时间min_expire_time。 计算获取锁的时间cost_time如果cost_time小于锁的超时时间且M大于等于Q则认为锁获取成功。 如果锁获取成功则返回锁的value值和锁的超时时间。 如果锁获取失败则依次在获取锁的节点上释放锁。 在上面的流程中Redlock算法首先获取当前时间戳T1然后在N个Redis节点上尝试获取锁并记录获取锁的节点数M和最小的锁超时时间min_expire_time。如果获取锁的时间cost_time小于锁的超时时间并且M大于等于Q则认为锁获取成功返回锁的value值和锁的超时时间否则依次在获取锁的节点上释放锁。 在Redlock算法中Q是一个足够大的数值用于保证锁获取的节点数足够多从而避免因为某些节点故障或网络分区等原因导致的锁失效。min_expire_time是获取锁的节点中锁超时时间最小的节点用于保证锁超时时间的一致性。在实际使用中可以通过调整N、Q、min_expire_time等参数来平衡锁的可用性和性能开销。 redlock 存在什么问题 Redlock算法虽然能够在Redis集群环境下实现分布式锁但它并不是一个完美的解决方案仍然存在一些风险和限制。例如Redlock算法无法避免“惊群效应”和“时钟漂移”等问题因此在使用Redlock算法时需要根据具体情况进行评估和优化。 watchdog 业务延期 容易受到网络分区的影响如果多个 Redis 节点之间出现网络分区即节点之间无法正常通信则可能导致多个客户端同时持有锁从而导致数据不一致等问题。 实现困难Redlock 的实现需要确保多个 Redis 节点之间的时钟同步并且需要考虑各种异常情况例如网络故障、节点故障等这对于实现和维护来说都是一项比较困难的任务。 性能问题为了确保锁的可靠性Redlock 需要在多个节点上进行多次锁定和解锁操作这会导致一定的性能开销特别是在高并发的场景下可能会影响系统的响应性能。 因此在使用 Redlock 算法时需要认真评估其适用性并根据实际情况选择合适的分布式锁实现方式。 惊群效应 “惊群效应”Thundering Herd Effect是指在并发请求下由于访问同一个资源例如数据库而产生的一种现象当这个资源处于繁忙状态时多个并发请求同时到达导致资源的过度竞争进而导致大量的请求被延迟或者超时。 在 Redlock 算法中由于在锁失效时需要去解锁多个客户端会同时尝试解锁同一份数据可能会导致类似的惊群效应影响到 Redis 的性能。 时钟漂移 “时钟漂移”Clock Drift是指系统时钟因为硬件、温度等因素而导致的时间误差问题也就是时钟的误差率。 在 Redlock 算法中由于需要使用 Redis 服务器的时钟来计算锁的过期时间而不同 Redis 服务器的时钟可能会有不同程度的误差这会导致一些 Redis 节点计算的锁失效时间与其他节点不一致进而导致锁被错误地释放或者未被释放从而可能会出现数据不一致等问题。 Redisson 实现 RedLock 在 Redisson 中, RedLock的实现类是哪一个类? 在 Redisson 中RedLock 的实现类是 org.redisson.RedissonRedLock。该类是 Redisson 实现 RedLock 分布式锁算法的核心类通过尝试获取多个独立的 Redis 实例上的锁来实现分布式锁的功能。具体实现方式是使用多个 Redisson 的 RedissonClient 对象分别连接到不同的 Redis 实例上并在每个实例上尝试获取相同名称的分布式锁。如果在大部分 Redis 实例上都成功获取到了锁则认为获取分布式锁成功否则认为获取分布式锁失败。 RedissonRedLock 类中的 tryLock() 方法是获取 RedLock 分布式锁的核心方法该方法会依次尝试获取多个 Redis 实例上的锁并在指定的等待时间内等待获取锁的过程。如果在指定的等待时间内获取到大部分 Redis 实例上的锁则认为获取 RedLock 分布式锁成功否则释放已经获取的锁。 在 RedissonRedLock 类的实现中使用了多个 RedissonClient 对象来连接到不同的 Redis 实例上同时使用了 RPermitExpirableSemaphore 类来实现分布式信号量防止分布式锁过期时间内客户端宕机导致的死锁问题。 这一招叫抛砖引玉 如果您想使用 Redisson 实现分布式锁可以使用 org.redisson.Redisson 类来创建 Redisson 客户端并使用其 getLock() 方法获取分布式锁实例。以下是使用 Redisson 实现分布式锁的示例代码 import org.redisson.Redisson; import org.redisson.api.RLock;public class DistributedLockExample {public static void main(String[] args) {// 创建 Redisson 客户端Redisson redisson Redisson.create();// 获取分布式锁实例RLock lock redisson.getLock(myLock);try {// 尝试获取锁等待时间为 10 秒锁的过期时间为 60 秒boolean locked lock.tryLock(10, 60, TimeUnit.SECONDS);if (locked) {// 获取锁成功执行业务逻辑System.out.println(Lock acquired, executing critical section...);} else {// 获取锁失败执行相应逻辑System.out.println(Lock not acquired, exiting...);}} catch (InterruptedException e) {// 线程被中断执行相应逻辑e.printStackTrace();} finally {// 释放锁lock.unlock();redisson.shutdown();}} }在上面的代码中我们使用 Redisson.create() 方法创建 Redisson 客户端并使用其 getLock() 方法获取一个名为 “myLock” 的分布式锁实例。在 tryLock() 方法中我们尝试获取该分布式锁等待时间为 10 秒锁的过期时间为 60 秒。如果获取锁成功则执行业务逻辑并在结束后调用 unlock() 方法释放锁。如果获取锁失败则执行相应逻辑。最后在 finally 块中关闭 Redisson 客户端。 springboot 中呢? 首先需要在 pom.xml 文件中添加 Redisson 和 Jedis 的依赖 dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.16.1/version /dependency dependencygroupIdredis.clients/groupIdartifactIdjedis/artifactIdversion3.6.3/version /dependency然后创建一个 Redisson 配置类 RedissonConfig配置 Redisson 的连接信息和 RedLock 分布式锁的实现 import java.util.Arrays; import java.util.List;import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.ReadMode; import org.redisson.config.RedissonNodeConfig; import org.redisson.config.SubscriptionMode; import org.redisson.connection.balancer.LoadBalancer; import org.redisson.connection.balancer.RandomLoadBalancer; import org.redisson.connection.balancer.RoundRobinLoadBalancer; import org.redisson.connection.balancer.WeightedRoundRobinBalancer; import org.redisson.connection.balancer.WeightedRoundRobinBalancer.WeightedRoundRobinEntry; import org.redisson.connection.balancer.WeightedRoundRobinBalancer.WeightedRoundRobinLoadBalancerEntry; import org.redisson.connection.balancer.WeightedRoundRobinBalancer.WeightedRoundRobinServers; import org.redisson.connection.balancer.WeightedRoundRobinBalancer.WeightedRoundRobinServers.OrderType; import org.redisson.spring.starter.RedissonAutoConfiguration; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration AutoConfigureAfter(RedissonAutoConfiguration.class) public class RedissonConfig {Autowiredprivate RedissonClient redissonClient;Value(${redisson.redlock.addresses})private String[] redlockAddresses;Beanpublic RedissonClient redissonClient() {Config config new Config();config.useClusterServers().addNodeAddress(redlockAddresses).setScanInterval(2000);return Redisson.create(config);}Beanpublic RedissonRedLock redissonRedLock() {LoadBalancer loadBalancer new RoundRobinLoadBalancer();ListString nodeAddresses Arrays.asList(redlockAddresses);ListRedissonNodeConfig nodeConfigs RedissonNodeConfig.fromAddresses(nodeAddresses);WeightedRoundRobinBalancer balancedLoadBalancer new WeightedRoundRobinBalancer(loadBalancer, nodeConfigs);return new RedissonRedLock(balancedLoadBalancer);} }在上述配置类中我们使用了 Redisson 的 Config 类来配置 Redisson 的连接信息和 useClusterServers() 方法来连接到 Redis Cluster。我们还定义了 redissonRedLock() 方法使用 RoundRobinLoadBalancer 实现负载均衡并将多个 Redis 实例传入 RedissonRedLock 类中创建 RedLock 分布式锁实例。 然后在 application.properties 文件中添加以下配置 # Redis Cluster 地址 redisson.redlock.addressesredis://localhost:7001,redis://localhost:7002,redis://localhost:7003# RedLock 分布式锁名称 redlock.namemyLock接下来我们可以在一个 Spring Boot 的 Controller 类中使用 Autowired 注解来注入 RedissonRedLock 实例然后使用 tryLock() 方法获取分布式锁并执行业务逻辑 import java.util.concurrent.TimeUnit;import org.redisson.RedissonRedLock; import org.redisson.api.RLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;RestController public class MyController {Autowiredprivate RedissonRedLock redissonRedLock;GetMapping(/test)public String test() {RLock lock redissonRedLock.getLock(myLock);try {// 尝试获取 RedLock 分布式锁等待时间为 10 秒锁的过期时间为 60 秒boolean locked lock.tryLock(10, 60, TimeUnit.SECONDS);if (locked) {// 获取锁成功执行业务逻辑System.out.println(Lock acquired, executing critical section...);// TODO: 执行业务逻辑} else {// 获取锁失败执行相应逻辑System.out.println(Lock not acquired, exiting...);}} catch (InterruptedException e) {// 线程被中断执行相应逻辑e.printStackTrace();} finally {// 释放锁lock.unlock();}return Done;} }在上述示例代码中我们使用 Autowired 注解注入了 RedissonRedLock 实例然后在 test() 方法中获取 RedLock 分布式锁并执行业务逻辑。在 tryLock() 方法中我们尝试获取 RedLock 分布式锁等待时间为 10 秒锁的过期时间为 60 秒。如果获取锁成功则执行业务逻辑并在结束后调用 unlock() 方法释放锁。如果获取锁失败则执行相应逻辑。 最后启动 Spring Boot 应用程序并访问 http://localhost:8080/test 路径即可执行 RedLock 分布式锁实现的业务逻辑。 源码分析 /*** - 尝试获取红锁* p* - param waitTime 最大等待时间* p* - param leaseTime 锁持有时间* p* - param unit 时间单位* p* - return 是否成功获取到锁* p* - throws InterruptedException 当线程被中断时抛出*/public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {// 如果是异步获取锁则调用 tryLockAsync 方法并等待返回结果//try {// return tryLockAsync(waitTime, leaseTime, unit).get();//} catch (ExecutionException e) {// throw new IllegalStateException(e);//}// 根据 leaseTime 计算新的锁持有时间long newLeaseTime -1;if (leaseTime ! -1) {newLeaseTime unit.toMillis(waitTime) * 2;}// 获取当前时间和最大等待时间long time System.currentTimeMillis();long remainTime -1;if (waitTime ! -1) {remainTime unit.toMillis(waitTime);}// 计算获取锁的最大等待时间long lockWaitTime calcLockWaitTime(remainTime);// 失败的锁个数限制int failedLocksLimit failedLocksLimit();// 已获取到锁的集合ListRLock acquiredLocks new ArrayListRLock(locks.size());// 遍历锁集合尝试获取锁for (ListIteratorRLock iterator locks.listIterator(); iterator.hasNext(); ) {RLock lock iterator.next();boolean lockAcquired;try {// 如果最大等待时间和锁持有时间均为默认值则使用 tryLock 方法获取锁if (waitTime -1 leaseTime -1) {lockAcquired lock.tryLock();} else { // 否则使用带超时参数的 tryLock 方法获取锁long awaitTime Math.min(lockWaitTime, remainTime);lockAcquired lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {// 如果获取锁超时则释放已获取到的锁unlockInner(Arrays.asList(lock));lockAcquired false;} catch (Exception e) {lockAcquired false;}// 如果获取锁成功则将锁添加到已获取锁的集合中 if (lockAcquired) {acquiredLocks.add(lock);} else { // 否则判断是否达到失败的锁个数限制如果达到则直接退出循环 if (locks.size() - acquiredLocks.size() failedLocksLimit()) {break;}if (failedLocksLimit 0) { // 重试次数已达到failedLocksLimit释放已获取的锁并返回false unlockInner(acquiredLocks);if (waitTime -1 leaseTime -1) {return false;}failedLocksLimit failedLocksLimit();acquiredLocks.clear(); // reset iterator while (iterator.hasPrevious()) {iterator.previous();}} else { // 减少重试次数并继续尝试获取锁 failedLocksLimit--;}}if (leaseTime ! -1) { // 如果设置了过期时间ListRFutureBoolean futures new ArrayListRFutureBoolean(acquiredLocks.size());for (RLock rLock : acquiredLocks) { // 遍历所有获取到的锁RFutureBoolean future rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); // 为每个锁设置过期时间futures.add(future);}for (RFutureBoolean rFuture : futures) { // 同步等待每个设置过期时间的操作完成rFuture.syncUninterruptibly();}}return true; // 获取锁成功返回true}}这段代码实现了 Redisson 的红锁算法具体的实现逻辑如下 如果是异步获取锁则调用 tryLockAsync 方法并等待返回结果但是当前代码中这段注释掉了。 根据 leaseTime 计算新的锁持有时间。 获取当前时间和最大等待时间。 计算获取锁的最大等待时间。 失败的锁个数限制。 已获取到锁的集合。 遍历锁集合尝试获取锁。 如果获取锁成功则将锁添加到已获取锁的集合中。 如果获取锁失败判断是否达到失败的锁个数限制如果达到则直接退出循环。 如果重试次数已达到 failedLocksLimit释放已获取的锁并返回 false。 减少重试次数并继续尝试获取锁。 如果设置了过期时间为每个锁设置过期时间并同步等待每个设置过期时间的操作完成。 获取锁成功返回 true。 lockAcquired lock.tryLock(); 获取锁使用 org.redisson.command.CommandAsyncService#get /*** 获取Future的执行结果** param future Future对象* return Future的执行结果*/Overridepublic V V get(RFutureV future) {if (!future.isDone()) { // 如果Future未完成final CountDownLatch l new CountDownLatch(1);future.addListener(new FutureListenerV() { // 为Future添加监听器Overridepublic void operationComplete(FutureV future) throws Exception {l.countDown(); // 计数器减1表示Future执行完成}});boolean interrupted false;while (!future.isDone()) { // 等待Future执行完成try {l.await();} catch (InterruptedException e) {interrupted true;break;}}if (interrupted) { // 如果当前线程被中断Thread.currentThread().interrupt(); // 标记中断状态}}// commented out due to blocking issues up to 200 ms per minute for each thread // future.awaitUninterruptibly();if (future.isSuccess()) { // 如果Future执行成功return future.getNow(); // 返回Future的执行结果}throw convertException(future); // 抛出Future执行的异常}这段代码是 CommandAsyncExecutor 类中的 get 方法。该方法的作用是阻塞当前线程并等待 Future 对象执行完成然后返回执行结果或者抛出异常。 首先判断 Future 对象是否已经完成执行如果没有完成执行则使用 CountDownLatch 进行等待直到 Future 执行完成或者当前线程被中断。 然后判断 Future 对象是否执行成功如果成功则返回执行结果否则抛出异常。在抛出异常之前会调用 convertException 方法将异常转换为 RedisException 或者其他类型的异常以便上层代码能够更好地处理异常情况。
http://www.hkea.cn/news/14535878/

相关文章:

  • 长沙网站建设哪里好汝阳县建设局网站
  • 网络知识网站wordpress建图片网站
  • 个人博客网页设计代码上海网站搜索排名优化哪家好
  • 用网站源码怎么做网站全国城建培训中心官网查询证书
  • 汕头网页网站制作找人做自建房图纸去哪个网站
  • 延吉建设局网站一个网站可以有几个关键词
  • 注册网站在哪里创建建设网站 深圳
  • 平面设计师常用网站爱客crm软件
  • 聊城网站建设推广厦门开企网
  • 昆明快速做网站哈尔滨快速制作网站
  • 网站建设制作需求嘉鱼网站建设优化
  • 网站建设如何报价保定知名网站建设公司
  • 手机网站 qq代码网站里的地图定位怎么做的
  • 河北网站建设中心海口建设公司网站
  • 北京王府井攻略网站关键字优化地点
  • 无锡建设网站的公司关键词库在网站上怎么体现
  • 如何建网站老鱼网服务器两个域名一个ip做两个网站
  • 网站开发基本过程购物网站建设实训心得体会
  • 现在流行的网站开发工具网站首页设计思路
  • 网站a记录吗宽带营销策略
  • 站酷网首页谁给推荐一个免费的好网站
  • 淮安 网站建设高级网站设计师手写代码篇
  • qq网站 直接登录移动互联网开发记事本项目告别
  • 改则网站建设推荐几个的网站
  • 电子商务网站系统设计WordPress如何迁移数据
  • 山西省建设注册中心网站网站建设哪个比较好
  • 宝坻做网站哪家好wordpress 3.6 下载
  • 做网站哪个语言强汉南公司网站建设
  • 电子商务网站 注意网络规划设计师(高级)
  • 学校网站的页头图片做wordpress绑定七牛