Changkun's Blog欧长坤的博客

Science and art, life in between.科学与艺术,生活在其间。

  • Home首页
  • Ideas想法
  • Posts文章
  • Tags标签
  • Bio关于
Changkun Ou

Changkun Ou

Human-AI interaction researcher, engineer, and writer.人机交互研究者、工程师、写作者。

Bridging HCI, AI, and systems programming. Building intelligent human-in-the-loop optimization systems. Informed by psychology, philosophy, and social science.连接人机交互、AI 与系统编程。构建智能的人在环优化系统。融合心理学、哲学与社会科学。

Science and art, life in between.科学与艺术,生活在其间。

276 Blogs博客
165 Tags标签
Changkun's Blog欧长坤的博客

Stability Patterns稳定性模式

Published at发布于:: 2020-12-26

Circuit Breaker automatically degrades service functions in response to a likely fault, preventing larger or cascading failures by eliminating recurring errors and providing reasonable error responses.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type Circuit func(context.Context) (string, error)

// Breaker wraps a circuit with a given failure threashould.
func Breaker(circuit Circuit, failureThreshold uint64) Circuit {
    var lastStateSuccessful = true
    var consecutiveFailures uint64 = 0
    var lastAttempt time.Time = time.Now()

    return func(ctx context.Context) (string, error) {
        if consecutiveFailures >= failureThreshold {
            backoffLevel := consecutiveFailures - failureThreshold
            shouldRetryAt := lastAttempt.Add(time.Second * 2 << backoffLevel)

            if !time.Now().After(shouldRetryAt) {
                return "", errors.New("circuit open -- service unreachable")
            }
        }

        lastAttempt = time.Now()
        response, err := circuit(ctx)

        if err != nil {
            if !lastStateSuccessful {
                consecutiveFailures++
            }
            lastStateSuccessful = false
            return response, err
        }

        lastStateSuccessful = true
        consecutiveFailures = 0

        return response, nil
    }
}

Debounce limits the frequency of a function call to one among a cluster of invocations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
type Circuit func(context.Context) (string, error)

func DebounceFirst(circuit Circuit, d time.Duration) Circuit {
    var threshold time.Time
    var cResult string
    var cError error

    return func(ctx context.Context) (string, error) {
        if threshold.Before(time.Now()) {
            cResult, cError = circuit(ctx)
        }

        threshold = time.Now().Add(d)
        return cResult, cError
    }
}
func DebounceLast(circuit Circuit, d time.Duration) Circuit {
    var threshold time.Time = time.Now()
    var ticker *time.Ticker
    var result string
    var err error

    return func(ctx context.Context) (string, error) {
        threshold = time.Now().Add(d)

        if ticker == nil {
            ticker = time.NewTicker(time.Millisecond * 100)
            tickerc := ticker.C

            go func() {
                defer ticker.Stop()

                for {
                    select {
                    case <-tickerc:
                        if threshold.Before(time.Now()) {
                            result, err = circuit(ctx)
                            ticker.Stop()
                            ticker = nil
                            break
                        }
                    case <-ctx.Done():
                        result, err = "", ctx.Err()
                        break
                    }
                }
            }()
        }

        return result, err
    }
}

Retry accounts for a possible transient fault in a distributed system by transparently retrying a failed operation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type Effector func(context.Context) (string, error)

func Retry(effector Effector, retries int, delay time.Duration) Effector {
    return func(ctx context.Context) (string, error) {
        for r := 0; ; r++ {
            response, err := effector(ctx)
            if err == nil || r >= retries {
                return response, err
            }

            log.Printf("Attempt %d failed; retrying in %v", r + 1, delay)

            select {
            case <-time.After(delay):
            case <-ctx.Done():
                return "", ctx.Err()
            }
        }
    }
}

Throttle limits the frequency of a function call to some maximum number of invocations per unit of time.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
type Effector func(context.Context) (string, error)

func Throttle(e Effector, max uint, refill uint, d time.Duration) Effector {
    var ticker *time.Ticker = nil
    var tokens uint = max

    var lastReturnString string
    var lastReturnError error

    return func(ctx context.Context) (string, error) {
        if ctx.Err() != nil {
            return "", ctx.Err()
        }

        if ticker == nil {
            ticker = time.NewTicker(d)
            defer ticker.Stop()

            go func() {
                for {
                    select {
                    case <-ticker.C:
                        t := tokens + refill
                        if t > max {
                            t = max
                        }
                        tokens = t
                    case <-ctx.Done():
                        ticker.Stop()
                        break
                    }
                }
            }()
        }

        if tokens > 0 {
            tokens--
            lastReturnString, lastReturnError = e(ctx)
        }

        return lastReturnString, lastReturnError
    }
}

Timeout allows a process to stop waiting for an answer once it’s clear that an answer may not be coming.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func Timeout(slowfunc func(interface{}) (interface{}, error),
    arg interface{}) func(context.Context) (interface{}, error) {
    chres := make(chan interface{})
    cherr := make(chan error)

    go func() {
        res, err := flowfunc(arg)
        chres <- res
        cherr <- err
    }()

    return func(ctx context.Context) (interface{}, error) {
        select {
        case res := <-chres:
            return res, <-cherr
        case <-ctx.Done():
            return "", ctx.Err()
        }
    }
}

断路器(Circuit Breaker) 在可能发生故障时自动降级服务功能,通过消除重复错误并提供合理的错误响应来防止更大的或级联的故障。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type Circuit func(context.Context) (string, error)

// Breaker wraps a circuit with a given failure threashould.
func Breaker(circuit Circuit, failureThreshold uint64) Circuit {
    var lastStateSuccessful = true
    var consecutiveFailures uint64 = 0
    var lastAttempt time.Time = time.Now()

    return func(ctx context.Context) (string, error) {
        if consecutiveFailures >= failureThreshold {
            backoffLevel := consecutiveFailures - failureThreshold
            shouldRetryAt := lastAttempt.Add(time.Second * 2 << backoffLevel)

            if !time.Now().After(shouldRetryAt) {
                return "", errors.New("circuit open -- service unreachable")
            }
        }

        lastAttempt = time.Now()
        response, err := circuit(ctx)

        if err != nil {
            if !lastStateSuccessful {
                consecutiveFailures++
            }
            lastStateSuccessful = false
            return response, err
        }

        lastStateSuccessful = true
        consecutiveFailures = 0

        return response, nil
    }
}

防抖(Debounce) 将一组调用中的函数调用频率限制为仅执行一次。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
type Circuit func(context.Context) (string, error)

func DebounceFirst(circuit Circuit, d time.Duration) Circuit {
    var threshold time.Time
    var cResult string
    var cError error

    return func(ctx context.Context) (string, error) {
        if threshold.Before(time.Now()) {
            cResult, cError = circuit(ctx)
        }

        threshold = time.Now().Add(d)
        return cResult, cError
    }
}
func DebounceLast(circuit Circuit, d time.Duration) Circuit {
    var threshold time.Time = time.Now()
    var ticker *time.Ticker
    var result string
    var err error

    return func(ctx context.Context) (string, error) {
        threshold = time.Now().Add(d)

        if ticker == nil {
            ticker = time.NewTicker(time.Millisecond * 100)
            tickerc := ticker.C

            go func() {
                defer ticker.Stop()

                for {
                    select {
                    case <-tickerc:
                        if threshold.Before(time.Now()) {
                            result, err = circuit(ctx)
                            ticker.Stop()
                            ticker = nil
                            break
                        }
                    case <-ctx.Done():
                        result, err = "", ctx.Err()
                        break
                    }
                }
            }()
        }

        return result, err
    }
}

重试(Retry) 通过透明地重试失败的操作来处理分布式系统中可能出现的瞬时故障。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type Effector func(context.Context) (string, error)

func Retry(effector Effector, retries int, delay time.Duration) Effector {
    return func(ctx context.Context) (string, error) {
        for r := 0; ; r++ {
            response, err := effector(ctx)
            if err == nil || r >= retries {
                return response, err
            }

            log.Printf("Attempt %d failed; retrying in %v", r + 1, delay)

            select {
            case <-time.After(delay):
            case <-ctx.Done():
                return "", ctx.Err()
            }
        }
    }
}

节流(Throttle) 将函数调用的频率限制在每单位时间内的最大调用次数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
type Effector func(context.Context) (string, error)

func Throttle(e Effector, max uint, refill uint, d time.Duration) Effector {
    var ticker *time.Ticker = nil
    var tokens uint = max

    var lastReturnString string
    var lastReturnError error

    return func(ctx context.Context) (string, error) {
        if ctx.Err() != nil {
            return "", ctx.Err()
        }

        if ticker == nil {
            ticker = time.NewTicker(d)
            defer ticker.Stop()

            go func() {
                for {
                    select {
                    case <-ticker.C:
                        t := tokens + refill
                        if t > max {
                            t = max
                        }
                        tokens = t
                    case <-ctx.Done():
                        ticker.Stop()
                        break
                    }
                }
            }()
        }

        if tokens > 0 {
            tokens--
            lastReturnString, lastReturnError = e(ctx)
        }

        return lastReturnString, lastReturnError
    }
}

超时(Timeout) 允许进程在明确答案可能不会到来时停止等待。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func Timeout(slowfunc func(interface{}) (interface{}, error),
    arg interface{}) func(context.Context) (interface{}, error) {
    chres := make(chan interface{})
    cherr := make(chan error)

    go func() {
        res, err := flowfunc(arg)
        chres <- res
        cherr <- err
    }()

    return func(ctx context.Context) (interface{}, error) {
        select {
        case res := <-chres:
            return res, <-cherr
        case <-ctx.Done():
            return "", ctx.Err()
        }
    }
}
© 2008 - 2026 Changkun Ou. All rights reserved.保留所有权利。 | PV/UV: /
0%