想要了解更多线程和锁相关的知识,可以看下面这个文章,了解线程和锁知识有助于理解本文内容
JAVA:创建线程,线程安全,线程锁,线程的生命周期,线程池https://blog.csdn.net/dxh9231028/article/details/140676316?spm=1001.2014.3001.5501
Mysql实现分布式锁
Mysql实现分布式锁有四种方式,分别是get_lock函数,for update尾缀,定义一个锁表,手动实现锁逻辑实现悲观锁,以及通过定义时间戳的锁表实现乐观锁。
1.get_lock函数
get_lock函数是mysql中内置的函数,可以通过select get_lock(lockname,timeout)来实现锁的功能,其中lockname是锁的名字,timeout为获取锁的超时时间,当想要限制多个线程无法同时操作同一个资源时,只需要在这些线程在执行sql之前,使用相同的lockname先执行select get_lock(lockname,timeout),第一个执行线程,mysql会创建一个lockname和当前这个会话的id的对应关系,并返回1,而其他线程执行select get_lock(lockname,timeout)时则会返回0,线程在执行完对应的操作后,需要显示的调用select release_lock(lockname)来释放锁。
该方法有一个非常致命的缺陷,当一个线程由于某些原因突然失联,没有来的及执行release_lock来释放锁,那么该资源将处于长时间的不能访问状态。
Java代码实现:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class MySQLDistributedLock {
private Connection connection;
//构造函数,new的时候创建连接
public MySQLDistributedLock(String url, String user, String password) throws Exception {
connection = DriverManager.getConnection(url, user, password);
}
// 获取锁
public boolean acquireLock(String lockName, int timeout) throws Exception {
//创建sql模板
String sql = "SELECT GET_LOCK(?, ?)";
//准备连接,sql模板预解析
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
//传入锁名
stmt.setString(1, lockName);
stmt.setInt(2, timeout);
//执行sql
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
//如果结果为1则返回true,证明获取锁成功
return rs.getInt(1) == 1;
}
}
}catch (Exception e){
//发生异常释放锁
releaseLock(lockName);
}
return false;
}
// 释放锁
public boolean releaseLock(String lockName) throws Exception {
String sql = "SELECT RELEASE_LOCK(?)";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, lockName);
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
return rs.getInt(1) == 1;
}
}
}
return false;
}
public static void main(String[] args) {
MySQLDistributedLock lock = null;
//定义锁名
String lockName = "my_distributed_lock";
try {
//创建连接
lock = new MySQLDistributedLock("jdbc:mysql://localhost:3306/test", "root", "password");
//获取锁
if (lock.acquireLock(lockName, 10)) {
System.out.println("Lock acquired!");
// 执行需要同步的业务逻辑
Thread.sleep(5000);
//释放锁
lock.releaseLock(lockName);
System.out.println("Lock released!");
} else {
System.out.println("Failed to acquire lock.");
}
} catch (Exception e) {
//发生异常释放锁
try{
lock.releaseLock(lockName);
}catch(Exception ex){
ex.printStackTrace();
}
e.printStackTrace();
}
}
}
2.for update尾缀
在执行mysql语句时,在后面添加上for update,那么当前事务操作的行,在当前事务执行完之前,不允许其他事务来进行修改操作,或读操作但是sql语句后面也拼接了for update,通过这个机制,也可以实现分布式锁。
我们可以通过创建一个锁表,多个线程通过select * where lockname = lockname for update,通过这种方式,只有一个线程可以成功获取到锁表中的锁。
相较于get_lock方法,for update当会话意外关闭时,事务会马上回滚,不会造成get_lock方法那样长时间的无法获取锁,并且for update不仅可以获取锁的方法来实现多个进程不操作同一资源,也可以直接操作目标资源,不过没有定义锁表获取锁更灵活。
Java代码实现:
/*
创建如下锁表
CREATE TABLE distributed_lock (
lock_name VARCHAR(255) NOT NULL PRIMARY KEY,
lock_value INT,
lock_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB;
*/
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class MySQLInnoDBDistributedLock {
private Connection connection;
public MySQLInnoDBDistributedLock(String url, String user, String password) throws Exception {
connection = DriverManager.getConnection(url, user, password);
connection.setAutoCommit(false); // 开启事务管理
}
// 获取锁
public boolean acquireLock(String lockName) throws Exception {
String sql = "SELECT * FROM distributed_lock WHERE lock_name = ? FOR UPDATE";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, lockName);
try (ResultSet rs = stmt.executeQuery()) {
return rs.next()
}
}
}
// 释放锁
public void releaseLock() throws Exception {
connection.commit(); // 提交事务释放锁
}
//关闭数据库连接
public void close() throws Exception {
if (connection != null) {
connection.close();
}
}
public static void main(String[] args) {
try {
MySQLInnoDBDistributedLock lock = new MySQLInnoDBDistributedLock("jdbc:mysql://localhost:3306/test", "root", "password");
String lockName = "my_distributed_lock";
if (lock.acquireLock(lockName)) {
System.out.println("Lock acquired!");
// 执行需要同步的业务逻辑...
lock.releaseLock();
System.out.println("Lock released!");
} else {
System.out.println("Failed to acquire lock.");
}
lock.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.自己定义锁表
通过自己定义一个锁表,并且定义一个不可重复的列,然后多个线程通过插入同一个数据来判断是否获取到锁,而释放锁的操作则是删除这条数据,通过自己实现这一过程,可以实现更灵活的分布式锁机制。
Java代码实现:
/*
CREATE TABLE distributed_lock (
lock_name VARCHAR(255) NOT NULL PRIMARY KEY,
locked_by VARCHAR(255),
lock_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
*/
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MySQLTableDistributedLock {
private Connection connection;
public MySQLTableDistributedLock(String url, String user, String password) throws Exception {
connection = DriverManager.getConnection(url, user, password);
}
// 获取锁
public boolean acquireLock(String lockName, String clientId) throws Exception {
String sql = "INSERT INTO distributed_lock (lock_name, locked_by) VALUES (?, ?)";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, lockName);
stmt.setString(2, clientId);
return stmt.executeUpdate() == 1;
} catch (Exception e) {
// 插入失败意味着锁已被其他客户端持有
return false;
}
}
// 释放锁
public boolean releaseLock(String lockName, String clientId) throws Exception {
String sql = "DELETE FROM distributed_lock WHERE lock_name = ? AND locked_by = ?";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, lockName);
stmt.setString(2, clientId);
return stmt.executeUpdate() == 1;
}
}
public static void main(String[] args) {
try {
MySQLTableDistributedLock lock = new MySQLTableDistributedLock("jdbc:mysql://localhost:3306/test", "root", "password");
String lockName = "my_distributed_lock";
String clientId = "client_1";
if (lock.acquireLock(lockName, clientId)) {
System.out.println("Lock acquired!");
// 执行需要同步的业务逻辑
Thread.sleep(5000);
lock.releaseLock(lockName, clientId);
System.out.println("Lock released!");
} else {
System.out.println("Failed to acquire lock.");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.时间戳列实现乐观锁
通过在需要限制多个线程同时访问的资源表中加入一个时间戳列保存最后的编辑时间,在执行修改操作之前先获取目标的时间戳,并且执行操作时在原有的条件的基础上添加一个时间戳等于之前获取的时间戳的条件,并且在修改后更新时间戳。通过这种方式,可以验证当前操作的数据是否在获取之前是否被篡改过,如果篡改过则会导致无法选择目标资源。
乐观锁在低并发情况下性能更好,但在高并发的情况下可能会导致某些线程可以更快的操作成功,有些线程则长时间无法操作成功,操作是否成功具有随机性。
Java代码实现:
/*
CREATE TABLE users (
id INT PRIMARY KEY,
username VARCHAR(100),
email VARCHAR(100),
last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
*/
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
public class OptimisticLockingExample {
private static final String JDBC_URL = "jdbc:mysql://localhost:3306/test";
private static final String JDBC_USER = "root";
private static final String JDBC_PASSWORD = "password";
public static void main(String[] args) {
Connection connection = null;
try {
// 1. 建立数据库连接
connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
// 2. 读取记录及其时间戳
int userId = 1;
UserRecord userRecord = getUserRecord(connection, userId);
if (userRecord == null) {
System.out.println("User not found.");
return;
}
System.out.println("Original User Record: " + userRecord);
// 3. 尝试使用乐观锁更新记录
boolean updateSuccess = updateUserEmail(connection, userId, "[email protected]", userRecord.getLastModified());
if (updateSuccess) {
System.out.println("Update successful.");
} else {
System.out.println("Update failed due to concurrent modification. Please retry.");
}
} catch (SQLException e) {
e.printStackTrace();
} finally {
// 关闭数据库连接
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
// 获取用户记录及其时间戳
private static UserRecord getUserRecord(Connection connection, int userId) throws SQLException {
String selectSql = "SELECT id, username, email, last_modified FROM users WHERE id = ?";
try (PreparedStatement stmt = connection.prepareStatement(selectSql)) {
stmt.setInt(1, userId);
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
int id = rs.getInt("id");
String username = rs.getString("username");
String email = rs.getString("email");
Timestamp lastModified = rs.getTimestamp("last_modified");
return new UserRecord(id, username, email, lastModified);
}
}
}
return null;
}
// 使用乐观锁更新用户的邮箱地址
private static boolean updateUserEmail(Connection connection, int userId, String newEmail, Timestamp lastModified) throws SQLException {
String updateSql = "UPDATE users SET email = ?, last_modified = CURRENT_TIMESTAMP WHERE id = ? AND last_modified = ?";
try (PreparedStatement stmt = connection.prepareStatement(updateSql)) {
stmt.setString(1, newEmail);
stmt.setInt(2, userId);
stmt.setTimestamp(3, lastModified);
int rowsAffected = stmt.executeUpdate();
return rowsAffected > 0;
}
}
// 简单的 UserRecord 类,用于存储用户记录
private static class UserRecord {
private int id;
private String username;
private String email;
private Timestamp lastModified;
public UserRecord(int id, String username, String email, Timestamp lastModified) {
this.id = id;
this.username = username;
this.email = email;
this.lastModified = lastModified;
}
public int getId() {
return id;
}
public String getUsername() {
return username;
}
public String getEmail() {
return email;
}
public Timestamp getLastModified() {
return lastModified;
}
@Override
public String toString() {
return "UserRecord{" +
"id=" + id +
", username='" + username + '\'' +
", email='" + email + '\'' +
", lastModified=" + lastModified +
'}';
}
}
}
Redis实现分布式锁
redis通过其内置的setnx指令实现,setnx指令在目标不存在会创建成功,存在时创建失败,这样可以让多个线程创建同一个key来实现分布式锁的功能,创建成功则为获取到锁。创建完key后还要设置过期时间,防止会话意外关闭导致无法释放锁。
redis大多以集群模式出现,当多个多个主机接收到不同的写操作时可能会造成同时写入成功,对此redis官方提出了redlock算法,其规定在获取锁时需要对每个主机都下发一条创建指令,只有当超过半数主机认为创建成功时,才会成功获取锁,不过这个算法并没有被redis实现,但是可以使用一些第三方驱动,或者手动实现也不算复杂,以下是一种实现方式:
import redis.clients.jedis.Jedis;
import java.util.UUID;
public class Redlock {
//保存所有redis实例。
private Jedis[] redisInstances;
// 锁的过期时间 (10秒)
private long lockTimeout = 10000;
// 假设有五个实例,那么则成功数则需要达到3个
private int quorum = 3;
//构造函数,传入redis实例
public Redlock(Jedis... redisInstances) {
this.redisInstances = redisInstances;
}
//尝试获取传入的锁,也就是创建传入的key
public String tryLock(String lockKey) {
//获取一个唯一id作为value
String lockValue = UUID.randomUUID().toString();
//获取当前时间时间戳
long startTime = System.currentTimeMillis();
//记录成功的数量
int lockCount = 0;
//开始在各个redis实例上创建key。
for (Jedis jedis : redisInstances) {
if (jedis.set(lockKey, lockValue, "NX", "PX", lockTimeout) != null) {
lockCount++;
}
}
// 判断是否获取锁成功(成功数量大于实力数量的一般,并且整个过程没有超过超时时间)
long elapsedTime = System.currentTimeMillis() - startTime;
if (lockCount >= quorum && elapsedTime < lockTimeout) {
return lockValue;
} else {
// 获取失败,释放已获取到的锁
for (Jedis jedis : redisInstances) {
// 只删除value是自己的UUID的key,也就是只删除自己创建的key
if (lockValue.equals(jedis.get(lockKey))) {
jedis.del(lockKey);
}
}
return null;
}
}
public void unlock(String lockKey, String lockValue) {
for (Jedis jedis : redisInstances) {
if (lockValue.equals(jedis.get(lockKey))) {
jedis.del(lockKey);
}
}
}
public static void main(String[] args) {
// 假设你有5个独立的 Redis 实例,实际的IP和端口是在Nacos或Zookeeper中获取
Jedis redis1 = new Jedis("localhost", 6379);
Jedis redis2 = new Jedis("localhost", 6380);
Jedis redis3 = new Jedis("localhost", 6381);
Jedis redis4 = new Jedis("localhost", 6382);
Jedis redis5 = new Jedis("localhost", 6383);
Redlock redlock = new Redlock(redis1, redis2, redis3, redis4, redis5);
String lockKey = "myLock";
String lockValue = redlock.tryLock(lockKey);
if (lockValue != null) {
System.out.println("Lock acquired!");
// 执行需要保护的操作
redlock.unlock(lockKey, lockValue);
System.out.println("Lock released!");
} else {
System.out.println("Failed to acquire lock.");
}
}
}
Zookeeper实现分布式锁:
zookeeper实现分布式锁有着绝对的优势,因为其四种节点类型中有一种节点类型是临时有序的节点。在创建相同key的有序节点时,创建并不会失败,而是会在key后面加上一串数字,这个数字是递增的,而临时节点的特性是在会话关闭时临时节点会删除这个key,这就使zookeeper在实现分布式锁时可以创建临时有序节点,客户端只需要判断自己创建的key后面的序号是不是其中最小的即可,这样不仅可以做到有序的获取锁,还能让会话意外断开时自动删除key,释放锁。
实际操作中可以创建一个/lock目录,然后所有进程都创建/lock/lockname,并且在内部判断lockname后面的数字是不是最小的,如果是最小的则开始操作目标资源,操作完毕后删除自己的key。了解zookeeper知识,可以看下面这篇文章:
Zookeeper使用快速入门:基础命令,wacth监控,权限控制https://blog.csdn.net/dxh9231028/article/details/141105185?spm=1001.2014.3001.5501
在客户端正常断开会话前仍需手动释放,因为zookeeper判断会话断开需要时间,所以这个特性只能防止会话意外断开时锁长时间无法释放的情况,不能作为锁释放的正常途径。
Java代码实现:
import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZookeeperDistributedLock {
//zookeeper的ip,实际项目中写在配置文件中
private static final String ZK_ADDRESS = "localhost:2181";
//锁的根目录,在这个目录下创建子节点锁
private static final String LOCK_PATH = "/locks";
//要创建的锁的路径
private static final String LOCK_NODE_PREFIX = LOCK_PATH + "/lockname";
private ZooKeeper zooKeeper;
private String currentNode;
private String lockNode;
public static void main(String[] args) throws Exception {
ZookeeperDistributedLock lock = new ZookeeperDistributedLock();
lock.connect();
try {
lock.acquireLock();
System.out.println("Lock acquired!");
// 执行需要保护的操作
Thread.sleep(5000); // 模拟操作
} finally {
lock.releaseLock();
System.out.println("Lock released!");
lock.close();
}
}
//连接zookeeper客户都安,并将连接赋予zookeeper变量
public void connect() throws IOException {
zooKeeper = new ZooKeeper(ZK_ADDRESS, 3000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(lockNode)) {
synchronized (this) {
notify();
}
}
}
});
}
//获取锁,也就是创建key的过程
public void acquireLock() throws Exception {
//检查父目录/lock是否存在,如果不存在则创建
if (zooKeeper.exists(LOCK_PATH, false) == null) {
zooKeeper.create(LOCK_PATH, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//在/lock下创建/lock/lockname
currentNode = zooKeeper.create(LOCK_NODE_PREFIX, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//循环判断当下最小的节点是不是本线程创建的
while (true) {
List<String> nodes = zooKeeper.getChildren(LOCK_PATH, false);
Collections.sort(nodes);
if (currentNode.endsWith(nodes.get(0))) {
// This is the smallest node, thus the lock is acquired
lockNode = currentNode;
break;
}
// Listen for changes to the previous node
String previousNode = findPreviousNode(nodes);
if (previousNode != null) {
String previousNodePath = LOCK_PATH + "/" + previousNode;
synchronized (this) {
zooKeeper.exists(previousNodePath, true);
wait();
}
}
}
}
private String findPreviousNode(List<String> nodes) {
String myNode = currentNode.substring(LOCK_PATH.length() + 1);
for (int i = nodes.size() - 1; i >= 0; i--) {
String node = nodes.get(i);
if (node.compareTo(myNode) < 0) {
return node;
}
}
return null;
}
//删除key,释放锁
public void releaseLock() throws KeeperException, InterruptedException {
zooKeeper.delete(currentNode, -1);
}
//关闭连接,释放资源
public void close() throws InterruptedException {
zooKeeper.close();
}
}
版权归原作者 不止会JS 所有, 如有侵权,请联系我们删除。