198 行
4.8 KiB
Go
198 行
4.8 KiB
Go
package pub
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"golang.org/x/time/rate"
|
|
"math"
|
|
"net/url"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// DeliveryPersister allows applications to keep track of delivery states of
|
|
// the messages being sent, including during retries. This permits clients to
|
|
// also resume delivery of messages that were in the process of being delivered
|
|
// when the application server was shut down.
|
|
type DeliveryPersister interface {
|
|
// Sending informs the delivery persister that the provided bytes are
|
|
// being delivered to the specified url. It must return a unique id for
|
|
// this delivery.
|
|
Sending(b []byte, to url.URL) string
|
|
// Cancel informs the delivery persister that the provided delivery was
|
|
// interrupted by the server cancelling. These should be retried once
|
|
// the server is back online.
|
|
Cancel(id string)
|
|
// Successful informs the delivery persister that the request has been
|
|
// successfully delivered and no further retries are needed.
|
|
Successful(id string)
|
|
// Retrying indicates the specified delivery is being retried.
|
|
Retrying(id string)
|
|
// Undeliverable indicates the specified delivery has failed and is no
|
|
// longer being retried.
|
|
Undeliverable(id string)
|
|
}
|
|
|
|
type delivererPool struct {
|
|
// When present, permits clients to be notified of all state changes
|
|
// when delivering a request to another federated server.
|
|
//
|
|
// Optional.
|
|
persister DeliveryPersister
|
|
// Limit speed of retries.
|
|
initialRetryTime time.Duration
|
|
maxRetryTime time.Duration
|
|
retryTimeFactor float64
|
|
// Limit total number of retries.
|
|
maxNumberRetries int
|
|
// Enforces speed limit of retries
|
|
limiter *rate.Limiter
|
|
// Allow graceful cancelling
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
timerId uint64
|
|
timerMap map[uint64]*time.Timer
|
|
mu sync.Mutex // Limits concurrent access to timerId and timerMap
|
|
// Allow graceful error handling
|
|
errChan chan error
|
|
}
|
|
|
|
func newDelivererPool(initial, maxRetry time.Duration, factor float64, maxN int, qps *rate.Limiter, persister DeliveryPersister) *delivererPool {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &delivererPool{
|
|
persister: persister,
|
|
initialRetryTime: initial,
|
|
maxRetryTime: maxRetry,
|
|
retryTimeFactor: factor,
|
|
maxNumberRetries: maxN,
|
|
limiter: qps,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
timerId: 0,
|
|
timerMap: make(map[uint64]*time.Timer, 0),
|
|
mu: sync.Mutex{},
|
|
errChan: make(chan error, 0),
|
|
}
|
|
}
|
|
|
|
type retryData struct {
|
|
nextWait time.Duration
|
|
n int
|
|
f func() error
|
|
id string
|
|
}
|
|
|
|
func (r retryData) NextRetry(factor float64, max time.Duration) retryData {
|
|
w := time.Duration(int64(math.Floor((float64(r.nextWait) * factor) + 0.5)))
|
|
if w > max {
|
|
w = max
|
|
}
|
|
return retryData{
|
|
nextWait: w,
|
|
n: r.n + 1,
|
|
f: r.f,
|
|
id: r.id,
|
|
}
|
|
}
|
|
|
|
func (r retryData) ShouldRetry(max int) bool {
|
|
return r.n < max
|
|
}
|
|
|
|
// Do spawns a goroutine that retries f until it returns no error.
|
|
func (d *delivererPool) Do(b []byte, to url.URL, sendFn func([]byte, url.URL) error) {
|
|
f := func() error {
|
|
return sendFn(b, to)
|
|
}
|
|
go func() {
|
|
id := ""
|
|
if d.persister != nil {
|
|
id = d.persister.Sending(b, to)
|
|
}
|
|
d.do(retryData{
|
|
nextWait: d.initialRetryTime,
|
|
n: 0,
|
|
f: f,
|
|
id: id,
|
|
})
|
|
}()
|
|
}
|
|
|
|
func (d *delivererPool) Restart(b []byte, to url.URL, id string, sendFn func([]byte, url.URL) error) {
|
|
f := func() error {
|
|
return sendFn(b, to)
|
|
}
|
|
go func() {
|
|
d.do(retryData{
|
|
nextWait: d.initialRetryTime,
|
|
n: 0,
|
|
f: f,
|
|
id: id,
|
|
})
|
|
}()
|
|
}
|
|
|
|
func (d *delivererPool) Stop() {
|
|
d.cancel()
|
|
d.closeTimers()
|
|
}
|
|
|
|
func (d *delivererPool) Errors() <-chan error {
|
|
return d.errChan
|
|
}
|
|
|
|
func (d *delivererPool) do(r retryData) {
|
|
if err := d.limiter.Wait(d.ctx); err != nil {
|
|
if d.persister != nil {
|
|
d.persister.Cancel(r.id)
|
|
}
|
|
d.errChan <- err
|
|
return
|
|
}
|
|
if err := r.f(); err != nil {
|
|
d.errChan <- err
|
|
if r.ShouldRetry(d.maxNumberRetries) {
|
|
if d.persister != nil {
|
|
d.persister.Retrying(r.id)
|
|
}
|
|
d.addClosableTimer(r)
|
|
} else {
|
|
d.errChan <- fmt.Errorf("delivery tried maximum number of times")
|
|
if d.persister != nil {
|
|
d.persister.Undeliverable(r.id)
|
|
}
|
|
}
|
|
}
|
|
if d.persister != nil {
|
|
d.persister.Successful(r.id)
|
|
}
|
|
}
|
|
|
|
func (d *delivererPool) addClosableTimer(r retryData) {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
id := d.timerId
|
|
d.timerId++
|
|
d.timerMap[id] = time.AfterFunc(r.nextWait, func() {
|
|
d.do(r.NextRetry(d.retryTimeFactor, d.maxRetryTime))
|
|
d.removeTimer(id)
|
|
})
|
|
}
|
|
|
|
func (d *delivererPool) removeTimer(id uint64) {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
if _, ok := d.timerMap[id]; ok {
|
|
delete(d.timerMap, id)
|
|
}
|
|
}
|
|
|
|
func (d *delivererPool) closeTimers() {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
for _, v := range d.timerMap {
|
|
v.Stop()
|
|
}
|
|
d.timerMap = make(map[uint64]*time.Timer, 0)
|
|
}
|