Published on

etcd分布式锁

Authors
  • avatar
    Name
    yushenw
    Linkedin

实现分布式锁需要注意的事项

  • 锁的独占性: 确保任何时候只有一个客户端可以持有锁。

  • 死锁的预防: 如果持有锁的进程崩溃或失去连接,需要有机制释放锁,防止死锁。

  • 锁的可靠性: 确保在网络分区或节点故障的情况下,锁的行为是可预测和正确的。

  • 锁的公平性: 确保锁的分配是公平的,避免某些进程长时间无法获取锁。

  • 性能和可伸缩性: 锁服务应当能够高效处理请求,且能随着系统扩展而伸缩。

  • 重入性: 根据需求,考虑是否需要支持重入锁(同一线程可重复获得锁)。

使用 etcd 实现分布式锁的步骤

etcd 提供了强一致性的键值存储,可以用于实现分布式锁。以下是使用 etcd 实现分布式锁的基本步骤:

  • 创建租约: 首先为锁创建一个租约(lease)。租约具有时间限制,如果客户端崩溃,租约到期后锁会自动释放。

  • 尝试获取锁: 通过在 etcd 中写入一个带有租约的键来尝试获取锁。如果键已存在(已被其他客户端持有),则获取锁失败。

  • 监视锁: 如果获取锁失败,客户端可以监视这个键,等待它被删除或过期,然后再次尝试获取锁。

  • 维持租约: 一旦获取到锁,客户端需要定期续约以保持锁的持有状态。

  • 释放锁: 当完成任务后,客户端应删除对应的键,释放锁。

使用示例

package main

import (
	"context"
	"fmt"
	"log"
	"math/rand"
	"sync"
	"time"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

const (
	LOCK_KEY = "/tmp/go/lock"
)

var wg sync.WaitGroup

func watchLock(ctx context.Context, client *clientv3.Client) {
	wChan := client.Watch(ctx, LOCK_KEY)
	for wResp := range wChan {
		for _, ev := range wResp.Events {
			log.Printf("Type: %s, Key: %s, Value: %s", ev.Type, ev.Kv.Key, ev.Kv.Value)
		}
	}
}

func tryLockWithRetry(client *clientv3.Client, retryCount int, retryInterval time.Duration, id int) {
	// lease, err := cli.Grant(context.TODO(), 30)
	// session, err := concurrency.NewSession(cli, concurrency.WithLease(lease.ID))
	session, err := concurrency.NewSession(client)
	if err != nil {
		log.Fatalf("%d New Session Error: %v\n", id, err)
	}
	defer session.Close()
	mutex := concurrency.NewMutex(session, LOCK_KEY)
	defer wg.Done()
	for i := 0; i < retryCount; i++ {
		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		err = mutex.Lock(ctx)
		cancel()

		if err != nil {
			if err == context.DeadlineExceeded {
				fmt.Printf("%d 尝试 %d 次后加锁超时,等待重试...\n", id, i+1)
				time.Sleep(retryInterval)
				continue
			} else {
				wg.Done()
				log.Fatal(err)
			}
		}

		min, max := 2, 8
		src := rand.NewSource(time.Now().UnixNano())
		r := rand.New(src)
		// rand.Intn(max-min+1) 生成一个介于 0(包含)和 max-min+1(不包含)之间的随机整数。
		sleepTime := time.Duration(r.Intn(max-min+1)+min) * time.Second

		fmt.Printf("%d 成功获取锁,执行业务逻辑\n", id)
		time.Sleep(sleepTime)
		fmt.Printf("%d 执行完毕,耗时: %.2f s\n", id, sleepTime.Seconds())

		if err = mutex.Unlock(context.Background()); err != nil {
			wg.Done()
			log.Fatal(err)
		} else {
			fmt.Printf("%d 释放锁成功\n", id)
		}
		return
	}
}

func main() {
	client, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"localhost:2379"},
		DialTimeout: 5 * time.Second,
	})

	if err != nil {
		log.Fatal(err)
	}

	defer client.Close()

	const N = 5

	wg.Add(N)

	for i := 0; i < N; i++ {
		go tryLockWithRetry(client, 3, 3*time.Second, i)
	}

	fmt.Println("等待子程序执行...")
	wg.Wait()
	fmt.Println("执行完毕 程序退出")

}

运行结果