java基于mongodb實現分布式鎖的示例代碼
通過線程安全findAndModify 實現鎖
實現定義鎖存儲對象:
/** * mongodb 分布式鎖 */@Data@NoArgsConstructor@AllArgsConstructor@Document(collection = 'distributed-lock-doc')public class LockDocument { @Id private String id; private long expireAt; private String token;}
定義Lock API:
public interface LockService { String acquire(String key, long expiration); boolean release(String key, String token); boolean refresh(String key, String token, long expiration);}
獲取鎖:
@Override public String acquire(String key, long expiration) {Query query = Query.query(Criteria.where('_id').is(key));String token = this.generateToken();Update update = new Update() .setOnInsert('_id', key) .setOnInsert('expireAt', System.currentTimeMillis() + expiration) .setOnInsert('token', token);FindAndModifyOptions options = new FindAndModifyOptions().upsert(true) .returnNew(true);LockDocument doc = mongoTemplate.findAndModify(query, update, options, LockDocument.class);boolean locked = doc.getToken() != null && doc.getToken().equals(token);// 如果已過期if (!locked && doc.getExpireAt() < System.currentTimeMillis()) { DeleteResult deleted = this.mongoTemplate.remove(Query.query(Criteria.where('_id').is(key) .and('token').is(doc.getToken()) .and('expireAt').is(doc.getExpireAt())),LockDocument.class); if (deleted.getDeletedCount() >= 1) {// 成功釋放鎖, 再次嘗試獲取鎖return this.acquire(key, expiration); }}log.debug('Tried to acquire lock for key {} with token {} . Locked: {}', key, token, locked);return locked ? token : null; }
原理:
先嘗試upsert鎖對象,如果成功且token一致,說明拿到鎖 否則加鎖失敗 如果未拿到鎖,但是鎖已過期,嘗試刪除鎖 如果刪除成功,再次嘗試拿鎖如果失敗,說明鎖可能已經續期了釋放和續期鎖:
@Override public boolean release(String key, String token) { Query query = Query.query(Criteria.where('_id').is(key) .and('token').is(token)); DeleteResult deleted = mongoTemplate.remove(query, LockDocument.class); boolean released = deleted.getDeletedCount() == 1; if (released) { log.debug('Remove query successfully affected 1 record for key {} with token {}', key, token); } else if (deleted.getDeletedCount() > 0) { log.error('Unexpected result from release for key {} with token {}, released {}', key, token, deleted); } else { log.error('Remove query did not affect any records for key {} with token {}', key, token); } return released; } @Override public boolean refresh(String key, String token, long expiration) { Query query = Query.query(Criteria.where('_id').is(key) .and('token').is(token)); Update update = Update.update('expireAt', System.currentTimeMillis() + expiration); UpdateResult updated = mongoTemplate.updateFirst(query, update, LockDocument.class); final boolean refreshed = updated.getModifiedCount() == 1; if (refreshed) { log.debug('Refresh query successfully affected 1 record for key {} ' + 'with token {}', key, token); } else if (updated.getModifiedCount() > 0) { log.error('Unexpected result from refresh for key {} with token {}, ' + 'released {}', key, token, updated); } else { log.warn('Refresh query did not affect any records for key {} with token {}. ' + 'This is possible when refresh interval fires for the final time ' + 'after the lock has been released', key, token); } return refreshed; }使用
private LockService lockService;private void tryAcquireLockAndSchedule() {while (!this.stopSchedule) { // 嘗試拿鎖 this.token = this.lockService.acquire(SCHEDULER_LOCK, 20000); if (this.token != null) { // 拿到鎖 } else {// 等待LOCK_EXPIRATION, 再次嘗試Thread.sleep(LOCK_EXPIRATION); }} } 先嘗試拿鎖,如果獲取到token,說明拿鎖成功 否則可以sleep一段時間后再拿鎖
完整代碼,可到github查看 https://github.com/jadepeng/docker-pipeline/blob/main/pipeline-master/src/main/java/com/github/jadepeng/pipeline/service/impl/MongoLockService.java
到此這篇關于java基于mongodb實現分布式鎖的示例代碼的文章就介紹到這了,更多相關java mongodb實現分布式鎖內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!
相關文章:
