Remove obsolete deliverer package
このコミットが含まれているのは:
コミット
2d8651d95c
|
@ -1,13 +0,0 @@
|
|||
# deliverer
|
||||
|
||||
This library is completely optional, provided only for convenience.
|
||||
|
||||
An extra utility that provides a simple mechanism to asynchronously deliver
|
||||
federated messages from a `pub.Pubber` available from the `go-fed/activity/pub`
|
||||
library.
|
||||
|
||||
It implements the `pub.Deliverer` interface.
|
||||
|
||||
The parent application may provide a way to persist delivery attempts in a way
|
||||
that survives shutdown by implementing the new `DeliveryPersister ` interface.
|
||||
The sky is the limit.
|
|
@ -1,233 +0,0 @@
|
|||
package deliverer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/go-fed/activity/pub"
|
||||
"golang.org/x/time/rate"
|
||||
"math"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DeliveryPersister allows applications to keep track of delivery states of
|
||||
// the messages being sent, including during retries. This permits clients to
|
||||
// also resume delivery of messages that were in the process of being delivered
|
||||
// when the application server was shut down.
|
||||
type DeliveryPersister interface {
|
||||
// Sending informs the delivery persister that the provided bytes are
|
||||
// being delivered to the specified url. It must return a unique id for
|
||||
// this delivery.
|
||||
Sending(b []byte, to *url.URL) string
|
||||
// Cancel informs the delivery persister that the provided delivery was
|
||||
// interrupted by the server cancelling. These should be retried once
|
||||
// the server is back online.
|
||||
Cancel(id string)
|
||||
// Successful informs the delivery persister that the request has been
|
||||
// successfully delivered and no further retries are needed.
|
||||
Successful(id string)
|
||||
// Retrying indicates the specified delivery is being retried.
|
||||
Retrying(id string)
|
||||
// Undeliverable indicates the specified delivery has failed and is no
|
||||
// longer being retried.
|
||||
Undeliverable(id string)
|
||||
}
|
||||
|
||||
// DeliveryOptions provides options when delivering messages to federated
|
||||
// servers. All are required unless explicitly stated otherwise.
|
||||
type DeliveryOptions struct {
|
||||
// Initial amount of time to wait before retrying delivery.
|
||||
InitialRetryTime time.Duration
|
||||
// The longest amount of time to wait before retrying delivery.
|
||||
MaximumRetryTime time.Duration
|
||||
// Rate of backing off retries. Must be at least 1.
|
||||
BackoffFactor float64
|
||||
// Maximum number of retries to do when delivering a message. Must be at
|
||||
// least 1.
|
||||
MaxRetries int
|
||||
// Global rate limiter across all deliveries, to prevent spamming
|
||||
// outbound messages.
|
||||
RateLimit *rate.Limiter
|
||||
// Persister allows implementations to save messages that are enqueued
|
||||
// for delivery between downtimes. It also permits metrics gathering and
|
||||
// monitoring of outbound messages.
|
||||
//
|
||||
// This field is optional.
|
||||
Persister DeliveryPersister
|
||||
}
|
||||
|
||||
var _ pub.Deliverer = &DelivererPool{}
|
||||
|
||||
type DelivererPool struct {
|
||||
// When present, permits clients to be notified of all state changes
|
||||
// when delivering a request to another federated server.
|
||||
//
|
||||
// Optional.
|
||||
persister DeliveryPersister
|
||||
// Limit speed of retries.
|
||||
initialRetryTime time.Duration
|
||||
maxRetryTime time.Duration
|
||||
retryTimeFactor float64
|
||||
// Limit total number of retries.
|
||||
maxNumberRetries int
|
||||
// Enforces speed limit of retries
|
||||
limiter *rate.Limiter
|
||||
// Allow graceful cancelling
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
timerId uint64
|
||||
timerMap map[uint64]*time.Timer
|
||||
mu sync.Mutex // Limits concurrent access to timerId and timerMap
|
||||
// Allow graceful error handling
|
||||
errChan chan error
|
||||
}
|
||||
|
||||
func NewDelivererPool(d DeliveryOptions) *DelivererPool {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &DelivererPool{
|
||||
persister: d.Persister,
|
||||
initialRetryTime: d.InitialRetryTime,
|
||||
maxRetryTime: d.MaximumRetryTime,
|
||||
retryTimeFactor: d.BackoffFactor,
|
||||
maxNumberRetries: d.MaxRetries,
|
||||
limiter: d.RateLimit,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
timerId: 0,
|
||||
timerMap: make(map[uint64]*time.Timer, 0),
|
||||
mu: sync.Mutex{},
|
||||
errChan: make(chan error, 0),
|
||||
}
|
||||
}
|
||||
|
||||
type retryData struct {
|
||||
nextWait time.Duration
|
||||
n int
|
||||
f func() error
|
||||
id string
|
||||
}
|
||||
|
||||
func (r retryData) NextRetry(factor float64, max time.Duration) retryData {
|
||||
w := time.Duration(int64(math.Floor((float64(r.nextWait) * factor) + 0.5)))
|
||||
if w > max {
|
||||
w = max
|
||||
}
|
||||
return retryData{
|
||||
nextWait: w,
|
||||
n: r.n + 1,
|
||||
f: r.f,
|
||||
id: r.id,
|
||||
}
|
||||
}
|
||||
|
||||
func (r retryData) ShouldRetry(max int) bool {
|
||||
return r.n < max
|
||||
}
|
||||
|
||||
// Do spawns a goroutine that retries f until it returns no error. Retry
|
||||
// behavior is determined by the DeliveryOptions passed to the DelivererPool
|
||||
// upon construction.
|
||||
func (d *DelivererPool) Do(b []byte, to *url.URL, sendFn func([]byte, *url.URL) error) {
|
||||
f := func() error {
|
||||
return sendFn(b, to)
|
||||
}
|
||||
go func() {
|
||||
id := ""
|
||||
if d.persister != nil {
|
||||
id = d.persister.Sending(b, to)
|
||||
}
|
||||
d.do(retryData{
|
||||
nextWait: d.initialRetryTime,
|
||||
n: 0,
|
||||
f: f,
|
||||
id: id,
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
// Restart resumes a previous attempt at delivering a payload to the specified
|
||||
// URL. Retry behavior is determined by the DeliveryOptions passed to this
|
||||
// DelivererPool upon construction, and is not governed by the previous
|
||||
// DelivererPool that attempted to deliver the message.
|
||||
func (d *DelivererPool) Restart(b []byte, to *url.URL, id string, sendFn func([]byte, *url.URL) error) {
|
||||
f := func() error {
|
||||
return sendFn(b, to)
|
||||
}
|
||||
go func() {
|
||||
d.do(retryData{
|
||||
nextWait: d.initialRetryTime,
|
||||
n: 0,
|
||||
f: f,
|
||||
id: id,
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop turns down and stops any in-flight requests or retries.
|
||||
func (d *DelivererPool) Stop() {
|
||||
d.cancel()
|
||||
d.closeTimers()
|
||||
}
|
||||
|
||||
// Provides a channel streaming any errors the pool encounters, including errors
|
||||
// that it retries on.
|
||||
func (d *DelivererPool) Errors() <-chan error {
|
||||
return d.errChan
|
||||
}
|
||||
|
||||
func (d *DelivererPool) do(r retryData) {
|
||||
if err := d.limiter.Wait(d.ctx); err != nil {
|
||||
if d.persister != nil {
|
||||
d.persister.Cancel(r.id)
|
||||
}
|
||||
d.errChan <- err
|
||||
return
|
||||
}
|
||||
if err := r.f(); err != nil {
|
||||
d.errChan <- err
|
||||
if r.ShouldRetry(d.maxNumberRetries) {
|
||||
if d.persister != nil {
|
||||
d.persister.Retrying(r.id)
|
||||
}
|
||||
d.addClosableTimer(r)
|
||||
} else {
|
||||
d.errChan <- fmt.Errorf("delivery tried maximum number of times")
|
||||
if d.persister != nil {
|
||||
d.persister.Undeliverable(r.id)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
if d.persister != nil {
|
||||
d.persister.Successful(r.id)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DelivererPool) addClosableTimer(r retryData) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
id := d.timerId
|
||||
d.timerId++
|
||||
d.timerMap[id] = time.AfterFunc(r.nextWait, func() {
|
||||
d.do(r.NextRetry(d.retryTimeFactor, d.maxRetryTime))
|
||||
d.removeTimer(id)
|
||||
})
|
||||
}
|
||||
|
||||
func (d *DelivererPool) removeTimer(id uint64) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
if _, ok := d.timerMap[id]; ok {
|
||||
delete(d.timerMap, id)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DelivererPool) closeTimers() {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
for _, v := range d.timerMap {
|
||||
v.Stop()
|
||||
}
|
||||
d.timerMap = make(map[uint64]*time.Timer, 0)
|
||||
}
|
|
@ -1,311 +0,0 @@
|
|||
package deliverer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/go-test/deep"
|
||||
"golang.org/x/time/rate"
|
||||
"net/url"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
id1 = "id1"
|
||||
id2 = "id2"
|
||||
sending = "sending"
|
||||
cancel = "cancel"
|
||||
successful = "successful"
|
||||
retrying = "retrying"
|
||||
undeliverable = "undeliverable"
|
||||
noState = "noState"
|
||||
)
|
||||
|
||||
var (
|
||||
testBytes []byte = []byte{0, 1, 2, 3}
|
||||
testURL *url.URL
|
||||
)
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
testURL, err = url.Parse("example.com")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
var _ DeliveryPersister = &mockDeliveryPersister{}
|
||||
|
||||
type mockDeliveryPersister struct {
|
||||
t *testing.T
|
||||
i int
|
||||
mu *sync.Mutex
|
||||
id1State string
|
||||
id2State string
|
||||
}
|
||||
|
||||
func newMockDeliveryPersister(t *testing.T) *mockDeliveryPersister {
|
||||
return &mockDeliveryPersister{
|
||||
t: t,
|
||||
mu: &sync.Mutex{},
|
||||
id1State: noState,
|
||||
id2State: noState,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockDeliveryPersister) Sending(b []byte, to *url.URL) string {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if m.i == 0 {
|
||||
m.i++
|
||||
return id1
|
||||
} else if m.i == 1 {
|
||||
m.i++
|
||||
return id2
|
||||
} else {
|
||||
m.t.Fatal("too many calls to Sending")
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *mockDeliveryPersister) Cancel(id string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if id == id1 {
|
||||
m.id1State = cancel
|
||||
} else if id == id2 {
|
||||
m.id2State = cancel
|
||||
} else {
|
||||
m.t.Fatalf("unknown Cancel id: %s", id)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockDeliveryPersister) Successful(id string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if id == id1 {
|
||||
m.id1State = successful
|
||||
} else if id == id2 {
|
||||
m.id2State = successful
|
||||
} else {
|
||||
m.t.Fatalf("unknown Successful id: %s", id)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockDeliveryPersister) Retrying(id string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if id == id1 {
|
||||
m.id1State = retrying
|
||||
} else if id == id2 {
|
||||
m.id2State = retrying
|
||||
} else {
|
||||
m.t.Fatalf("unknown Retrying id: %s", id)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockDeliveryPersister) Undeliverable(id string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if id == id1 {
|
||||
m.id1State = undeliverable
|
||||
} else if id == id2 {
|
||||
m.id2State = undeliverable
|
||||
} else {
|
||||
m.t.Fatalf("unknown Retrying id: %s", id)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelivererPoolSuccessNoPersister(t *testing.T) {
|
||||
testSendFn := func(b []byte, u *url.URL) error {
|
||||
if diff := deep.Equal(b, testBytes); diff != nil {
|
||||
t.Fatal(diff)
|
||||
} else if u != testURL {
|
||||
t.Fatal("wrong testURL")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
pool := NewDelivererPool(DeliveryOptions{
|
||||
InitialRetryTime: time.Microsecond,
|
||||
MaximumRetryTime: time.Microsecond,
|
||||
BackoffFactor: 2,
|
||||
MaxRetries: 1,
|
||||
RateLimit: rate.NewLimiter(1, 1),
|
||||
})
|
||||
pool.Do(testBytes, testURL, testSendFn)
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
}
|
||||
|
||||
func TestDelivererPoolSuccessPersister(t *testing.T) {
|
||||
testSendFn := func(b []byte, u *url.URL) error {
|
||||
if diff := deep.Equal(b, testBytes); diff != nil {
|
||||
t.Fatal(diff)
|
||||
} else if u != testURL {
|
||||
t.Fatal("wrong testURL")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
p := newMockDeliveryPersister(t)
|
||||
pool := NewDelivererPool(DeliveryOptions{
|
||||
InitialRetryTime: time.Microsecond,
|
||||
MaximumRetryTime: time.Microsecond,
|
||||
BackoffFactor: 2,
|
||||
MaxRetries: 1,
|
||||
RateLimit: rate.NewLimiter(1, 1),
|
||||
Persister: p,
|
||||
})
|
||||
pool.Do(testBytes, testURL, testSendFn)
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
if p.id1State != successful {
|
||||
t.Fatalf("want: %s, got %s", successful, p.id1State)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestartSuccess(t *testing.T) {
|
||||
testSendFn := func(b []byte, u *url.URL) error {
|
||||
if diff := deep.Equal(b, testBytes); diff != nil {
|
||||
t.Fatal(diff)
|
||||
} else if u != testURL {
|
||||
t.Fatal("wrong testURL")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
p := newMockDeliveryPersister(t)
|
||||
pool := NewDelivererPool(DeliveryOptions{
|
||||
InitialRetryTime: time.Microsecond,
|
||||
MaximumRetryTime: time.Microsecond,
|
||||
BackoffFactor: 2,
|
||||
MaxRetries: 1,
|
||||
RateLimit: rate.NewLimiter(1, 1),
|
||||
Persister: p,
|
||||
})
|
||||
pool.Restart(testBytes, testURL, id2, testSendFn)
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
if p.id2State != successful {
|
||||
t.Fatalf("want: %s, got %s", successful, p.id1State)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelivererPoolRetrying(t *testing.T) {
|
||||
testSendFn := func(b []byte, u *url.URL) error {
|
||||
if diff := deep.Equal(b, testBytes); diff != nil {
|
||||
t.Fatal(diff)
|
||||
} else if u != testURL {
|
||||
t.Fatal("wrong testURL")
|
||||
}
|
||||
return fmt.Errorf("expected")
|
||||
}
|
||||
p := newMockDeliveryPersister(t)
|
||||
pool := NewDelivererPool(DeliveryOptions{
|
||||
InitialRetryTime: time.Microsecond,
|
||||
MaximumRetryTime: time.Microsecond,
|
||||
BackoffFactor: 2,
|
||||
MaxRetries: 1,
|
||||
RateLimit: rate.NewLimiter(1000000, 10000000),
|
||||
Persister: p,
|
||||
})
|
||||
pool.Do(testBytes, testURL, testSendFn)
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
select {
|
||||
case <-pool.Errors():
|
||||
default:
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
if p.id1State != retrying {
|
||||
t.Fatalf("want: %s, got %s", retrying, p.id1State)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelivererPoolUndeliverable(t *testing.T) {
|
||||
testSendFn := func(b []byte, u *url.URL) error {
|
||||
if diff := deep.Equal(b, testBytes); diff != nil {
|
||||
t.Fatal(diff)
|
||||
} else if u != testURL {
|
||||
t.Fatal("wrong testURL")
|
||||
}
|
||||
return fmt.Errorf("expected")
|
||||
}
|
||||
p := newMockDeliveryPersister(t)
|
||||
pool := NewDelivererPool(DeliveryOptions{
|
||||
InitialRetryTime: time.Microsecond,
|
||||
MaximumRetryTime: time.Microsecond,
|
||||
BackoffFactor: 2,
|
||||
MaxRetries: 1,
|
||||
RateLimit: rate.NewLimiter(1000000, 10000000),
|
||||
Persister: p,
|
||||
})
|
||||
pool.Do(testBytes, testURL, testSendFn)
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
<-pool.Errors()
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
<-pool.Errors()
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
<-pool.Errors()
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
if p.id1State != undeliverable {
|
||||
t.Fatalf("want: %s, got %s", undeliverable, p.id1State)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestartRetrying(t *testing.T) {
|
||||
testSendFn := func(b []byte, u *url.URL) error {
|
||||
if diff := deep.Equal(b, testBytes); diff != nil {
|
||||
t.Fatal(diff)
|
||||
} else if u != testURL {
|
||||
t.Fatal("wrong testURL")
|
||||
}
|
||||
return fmt.Errorf("expected")
|
||||
}
|
||||
p := newMockDeliveryPersister(t)
|
||||
pool := NewDelivererPool(DeliveryOptions{
|
||||
InitialRetryTime: time.Microsecond,
|
||||
MaximumRetryTime: time.Microsecond,
|
||||
BackoffFactor: 2,
|
||||
MaxRetries: 1,
|
||||
RateLimit: rate.NewLimiter(1000000, 10000000),
|
||||
Persister: p,
|
||||
})
|
||||
pool.Restart(testBytes, testURL, id2, testSendFn)
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
select {
|
||||
case <-pool.Errors():
|
||||
default:
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
if p.id2State != retrying {
|
||||
t.Fatalf("want: %s, got %s", retrying, p.id2State)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestartUndeliverable(t *testing.T) {
|
||||
testSendFn := func(b []byte, u *url.URL) error {
|
||||
if diff := deep.Equal(b, testBytes); diff != nil {
|
||||
t.Fatal(diff)
|
||||
} else if u != testURL {
|
||||
t.Fatal("wrong testURL")
|
||||
}
|
||||
return fmt.Errorf("expected")
|
||||
}
|
||||
p := newMockDeliveryPersister(t)
|
||||
pool := NewDelivererPool(DeliveryOptions{
|
||||
InitialRetryTime: time.Microsecond,
|
||||
MaximumRetryTime: time.Microsecond,
|
||||
BackoffFactor: 2,
|
||||
MaxRetries: 1,
|
||||
RateLimit: rate.NewLimiter(1000000, 10000000),
|
||||
Persister: p,
|
||||
})
|
||||
pool.Restart(testBytes, testURL, id2, testSendFn)
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
<-pool.Errors()
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
<-pool.Errors()
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
<-pool.Errors()
|
||||
time.Sleep(time.Microsecond * 500)
|
||||
if p.id2State != undeliverable {
|
||||
t.Fatalf("want: %s, got %s", undeliverable, p.id2State)
|
||||
}
|
||||
}
|
読み込み中…
新しいイシューから参照