diff --git a/client/middleware/default.go b/client/middleware/default.go index 8e6dcaf..200e427 100644 --- a/client/middleware/default.go +++ b/client/middleware/default.go @@ -15,7 +15,7 @@ import ( // https://github.com/iyear/tdl/blob/master/core/tclient/tclient.go func NewDefaultMiddlewares(ctx context.Context, timeout time.Duration) []telegram.Middleware { return []telegram.Middleware{ - recovery.New(ctx, newBackoff(timeout)), + recovery.New(ctx, func() backoff.BackOff { return newBackoff(timeout) }), retry.New(config.C().Telegram.RpcRetry), floodwait.NewSimpleWaiter(), } diff --git a/client/middleware/recovery/recovery.go b/client/middleware/recovery/recovery.go index d5565b1..92330c6 100644 --- a/client/middleware/recovery/recovery.go +++ b/client/middleware/recovery/recovery.go @@ -14,19 +14,28 @@ import ( ) type recovery struct { - ctx context.Context - backoff backoff.BackOff + ctx context.Context + newBackoff func() backoff.BackOff } -func New(ctx context.Context, backoff backoff.BackOff) telegram.Middleware { +// New returns a recovery middleware. +// +// newBackoff is a factory that must return a fresh backoff.BackOff on every call: backoff implementations in +// cenkalti/backoff/v4 (notably ExponentialBackOff) are not safe for concurrent +// use, and the Telegram client invokes RPCs from many goroutines in parallel. +// +// Sharing a single instance corrupts its internal counters, breaks the +// exponential interval, and defeats MaxElapsedTime - see issue #218. +func New(ctx context.Context, newBackoff func() backoff.BackOff) telegram.Middleware { return &recovery{ - ctx: ctx, - backoff: backoff, + ctx: ctx, + newBackoff: newBackoff, } } func (r *recovery) Handle(next tg.Invoker) telegram.InvokeFunc { return func(ctx context.Context, input bin.Encoder, output bin.Decoder) error { + b := r.newBackoff() return backoff.RetryNotify(func() error { if err := next.Invoke(ctx, input, output); err != nil { @@ -38,7 +47,7 @@ func (r *recovery) Handle(next tg.Invoker) telegram.InvokeFunc { } return nil - }, r.backoff, func(err error, duration time.Duration) { + }, b, func(err error, duration time.Duration) { log.FromContext(ctx).Debug("Wait for connection recovery", "error", err, "duration", duration) }) }