diff --git a/deliverer/README.md b/deliverer/README.md deleted file mode 100644 index ec43b26..0000000 --- a/deliverer/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# deliverer - -This library is completely optional, provided only for convenience. - -An extra utility that provides a simple mechanism to asynchronously deliver -federated messages from a `pub.Pubber` available from the `go-fed/activity/pub` -library. - -It implements the `pub.Deliverer` interface. - -The parent application may provide a way to persist delivery attempts in a way -that survives shutdown by implementing the new `DeliveryPersister ` interface. -The sky is the limit. diff --git a/deliverer/deliverer.go b/deliverer/deliverer.go deleted file mode 100644 index 54fccb2..0000000 --- a/deliverer/deliverer.go +++ /dev/null @@ -1,233 +0,0 @@ -package deliverer - -import ( - "context" - "fmt" - "github.com/go-fed/activity/pub" - "golang.org/x/time/rate" - "math" - "net/url" - "sync" - "time" -) - -// DeliveryPersister allows applications to keep track of delivery states of -// the messages being sent, including during retries. This permits clients to -// also resume delivery of messages that were in the process of being delivered -// when the application server was shut down. -type DeliveryPersister interface { - // Sending informs the delivery persister that the provided bytes are - // being delivered to the specified url. It must return a unique id for - // this delivery. - Sending(b []byte, to *url.URL) string - // Cancel informs the delivery persister that the provided delivery was - // interrupted by the server cancelling. These should be retried once - // the server is back online. - Cancel(id string) - // Successful informs the delivery persister that the request has been - // successfully delivered and no further retries are needed. - Successful(id string) - // Retrying indicates the specified delivery is being retried. - Retrying(id string) - // Undeliverable indicates the specified delivery has failed and is no - // longer being retried. - Undeliverable(id string) -} - -// DeliveryOptions provides options when delivering messages to federated -// servers. All are required unless explicitly stated otherwise. -type DeliveryOptions struct { - // Initial amount of time to wait before retrying delivery. - InitialRetryTime time.Duration - // The longest amount of time to wait before retrying delivery. - MaximumRetryTime time.Duration - // Rate of backing off retries. Must be at least 1. - BackoffFactor float64 - // Maximum number of retries to do when delivering a message. Must be at - // least 1. - MaxRetries int - // Global rate limiter across all deliveries, to prevent spamming - // outbound messages. - RateLimit *rate.Limiter - // Persister allows implementations to save messages that are enqueued - // for delivery between downtimes. It also permits metrics gathering and - // monitoring of outbound messages. - // - // This field is optional. - Persister DeliveryPersister -} - -var _ pub.Deliverer = &DelivererPool{} - -type DelivererPool struct { - // When present, permits clients to be notified of all state changes - // when delivering a request to another federated server. - // - // Optional. - persister DeliveryPersister - // Limit speed of retries. - initialRetryTime time.Duration - maxRetryTime time.Duration - retryTimeFactor float64 - // Limit total number of retries. - maxNumberRetries int - // Enforces speed limit of retries - limiter *rate.Limiter - // Allow graceful cancelling - ctx context.Context - cancel context.CancelFunc - timerId uint64 - timerMap map[uint64]*time.Timer - mu sync.Mutex // Limits concurrent access to timerId and timerMap - // Allow graceful error handling - errChan chan error -} - -func NewDelivererPool(d DeliveryOptions) *DelivererPool { - ctx, cancel := context.WithCancel(context.Background()) - return &DelivererPool{ - persister: d.Persister, - initialRetryTime: d.InitialRetryTime, - maxRetryTime: d.MaximumRetryTime, - retryTimeFactor: d.BackoffFactor, - maxNumberRetries: d.MaxRetries, - limiter: d.RateLimit, - ctx: ctx, - cancel: cancel, - timerId: 0, - timerMap: make(map[uint64]*time.Timer, 0), - mu: sync.Mutex{}, - errChan: make(chan error, 0), - } -} - -type retryData struct { - nextWait time.Duration - n int - f func() error - id string -} - -func (r retryData) NextRetry(factor float64, max time.Duration) retryData { - w := time.Duration(int64(math.Floor((float64(r.nextWait) * factor) + 0.5))) - if w > max { - w = max - } - return retryData{ - nextWait: w, - n: r.n + 1, - f: r.f, - id: r.id, - } -} - -func (r retryData) ShouldRetry(max int) bool { - return r.n < max -} - -// Do spawns a goroutine that retries f until it returns no error. Retry -// behavior is determined by the DeliveryOptions passed to the DelivererPool -// upon construction. -func (d *DelivererPool) Do(b []byte, to *url.URL, sendFn func([]byte, *url.URL) error) { - f := func() error { - return sendFn(b, to) - } - go func() { - id := "" - if d.persister != nil { - id = d.persister.Sending(b, to) - } - d.do(retryData{ - nextWait: d.initialRetryTime, - n: 0, - f: f, - id: id, - }) - }() -} - -// Restart resumes a previous attempt at delivering a payload to the specified -// URL. Retry behavior is determined by the DeliveryOptions passed to this -// DelivererPool upon construction, and is not governed by the previous -// DelivererPool that attempted to deliver the message. -func (d *DelivererPool) Restart(b []byte, to *url.URL, id string, sendFn func([]byte, *url.URL) error) { - f := func() error { - return sendFn(b, to) - } - go func() { - d.do(retryData{ - nextWait: d.initialRetryTime, - n: 0, - f: f, - id: id, - }) - }() -} - -// Stop turns down and stops any in-flight requests or retries. -func (d *DelivererPool) Stop() { - d.cancel() - d.closeTimers() -} - -// Provides a channel streaming any errors the pool encounters, including errors -// that it retries on. -func (d *DelivererPool) Errors() <-chan error { - return d.errChan -} - -func (d *DelivererPool) do(r retryData) { - if err := d.limiter.Wait(d.ctx); err != nil { - if d.persister != nil { - d.persister.Cancel(r.id) - } - d.errChan <- err - return - } - if err := r.f(); err != nil { - d.errChan <- err - if r.ShouldRetry(d.maxNumberRetries) { - if d.persister != nil { - d.persister.Retrying(r.id) - } - d.addClosableTimer(r) - } else { - d.errChan <- fmt.Errorf("delivery tried maximum number of times") - if d.persister != nil { - d.persister.Undeliverable(r.id) - } - } - return - } - if d.persister != nil { - d.persister.Successful(r.id) - } -} - -func (d *DelivererPool) addClosableTimer(r retryData) { - d.mu.Lock() - defer d.mu.Unlock() - id := d.timerId - d.timerId++ - d.timerMap[id] = time.AfterFunc(r.nextWait, func() { - d.do(r.NextRetry(d.retryTimeFactor, d.maxRetryTime)) - d.removeTimer(id) - }) -} - -func (d *DelivererPool) removeTimer(id uint64) { - d.mu.Lock() - defer d.mu.Unlock() - if _, ok := d.timerMap[id]; ok { - delete(d.timerMap, id) - } -} - -func (d *DelivererPool) closeTimers() { - d.mu.Lock() - defer d.mu.Unlock() - for _, v := range d.timerMap { - v.Stop() - } - d.timerMap = make(map[uint64]*time.Timer, 0) -} diff --git a/deliverer/deliverer_test.go b/deliverer/deliverer_test.go deleted file mode 100644 index 0227845..0000000 --- a/deliverer/deliverer_test.go +++ /dev/null @@ -1,311 +0,0 @@ -package deliverer - -import ( - "fmt" - "github.com/go-test/deep" - "golang.org/x/time/rate" - "net/url" - "sync" - "testing" - "time" -) - -const ( - id1 = "id1" - id2 = "id2" - sending = "sending" - cancel = "cancel" - successful = "successful" - retrying = "retrying" - undeliverable = "undeliverable" - noState = "noState" -) - -var ( - testBytes []byte = []byte{0, 1, 2, 3} - testURL *url.URL -) - -func init() { - var err error - testURL, err = url.Parse("example.com") - if err != nil { - panic(err) - } -} - -var _ DeliveryPersister = &mockDeliveryPersister{} - -type mockDeliveryPersister struct { - t *testing.T - i int - mu *sync.Mutex - id1State string - id2State string -} - -func newMockDeliveryPersister(t *testing.T) *mockDeliveryPersister { - return &mockDeliveryPersister{ - t: t, - mu: &sync.Mutex{}, - id1State: noState, - id2State: noState, - } -} - -func (m *mockDeliveryPersister) Sending(b []byte, to *url.URL) string { - m.mu.Lock() - defer m.mu.Unlock() - if m.i == 0 { - m.i++ - return id1 - } else if m.i == 1 { - m.i++ - return id2 - } else { - m.t.Fatal("too many calls to Sending") - } - return "" -} - -func (m *mockDeliveryPersister) Cancel(id string) { - m.mu.Lock() - defer m.mu.Unlock() - if id == id1 { - m.id1State = cancel - } else if id == id2 { - m.id2State = cancel - } else { - m.t.Fatalf("unknown Cancel id: %s", id) - } -} - -func (m *mockDeliveryPersister) Successful(id string) { - m.mu.Lock() - defer m.mu.Unlock() - if id == id1 { - m.id1State = successful - } else if id == id2 { - m.id2State = successful - } else { - m.t.Fatalf("unknown Successful id: %s", id) - } -} - -func (m *mockDeliveryPersister) Retrying(id string) { - m.mu.Lock() - defer m.mu.Unlock() - if id == id1 { - m.id1State = retrying - } else if id == id2 { - m.id2State = retrying - } else { - m.t.Fatalf("unknown Retrying id: %s", id) - } -} - -func (m *mockDeliveryPersister) Undeliverable(id string) { - m.mu.Lock() - defer m.mu.Unlock() - if id == id1 { - m.id1State = undeliverable - } else if id == id2 { - m.id2State = undeliverable - } else { - m.t.Fatalf("unknown Retrying id: %s", id) - } -} - -func TestDelivererPoolSuccessNoPersister(t *testing.T) { - testSendFn := func(b []byte, u *url.URL) error { - if diff := deep.Equal(b, testBytes); diff != nil { - t.Fatal(diff) - } else if u != testURL { - t.Fatal("wrong testURL") - } - return nil - } - pool := NewDelivererPool(DeliveryOptions{ - InitialRetryTime: time.Microsecond, - MaximumRetryTime: time.Microsecond, - BackoffFactor: 2, - MaxRetries: 1, - RateLimit: rate.NewLimiter(1, 1), - }) - pool.Do(testBytes, testURL, testSendFn) - time.Sleep(time.Microsecond * 500) -} - -func TestDelivererPoolSuccessPersister(t *testing.T) { - testSendFn := func(b []byte, u *url.URL) error { - if diff := deep.Equal(b, testBytes); diff != nil { - t.Fatal(diff) - } else if u != testURL { - t.Fatal("wrong testURL") - } - return nil - } - p := newMockDeliveryPersister(t) - pool := NewDelivererPool(DeliveryOptions{ - InitialRetryTime: time.Microsecond, - MaximumRetryTime: time.Microsecond, - BackoffFactor: 2, - MaxRetries: 1, - RateLimit: rate.NewLimiter(1, 1), - Persister: p, - }) - pool.Do(testBytes, testURL, testSendFn) - time.Sleep(time.Microsecond * 500) - if p.id1State != successful { - t.Fatalf("want: %s, got %s", successful, p.id1State) - } -} - -func TestRestartSuccess(t *testing.T) { - testSendFn := func(b []byte, u *url.URL) error { - if diff := deep.Equal(b, testBytes); diff != nil { - t.Fatal(diff) - } else if u != testURL { - t.Fatal("wrong testURL") - } - return nil - } - p := newMockDeliveryPersister(t) - pool := NewDelivererPool(DeliveryOptions{ - InitialRetryTime: time.Microsecond, - MaximumRetryTime: time.Microsecond, - BackoffFactor: 2, - MaxRetries: 1, - RateLimit: rate.NewLimiter(1, 1), - Persister: p, - }) - pool.Restart(testBytes, testURL, id2, testSendFn) - time.Sleep(time.Microsecond * 500) - if p.id2State != successful { - t.Fatalf("want: %s, got %s", successful, p.id1State) - } -} - -func TestDelivererPoolRetrying(t *testing.T) { - testSendFn := func(b []byte, u *url.URL) error { - if diff := deep.Equal(b, testBytes); diff != nil { - t.Fatal(diff) - } else if u != testURL { - t.Fatal("wrong testURL") - } - return fmt.Errorf("expected") - } - p := newMockDeliveryPersister(t) - pool := NewDelivererPool(DeliveryOptions{ - InitialRetryTime: time.Microsecond, - MaximumRetryTime: time.Microsecond, - BackoffFactor: 2, - MaxRetries: 1, - RateLimit: rate.NewLimiter(1000000, 10000000), - Persister: p, - }) - pool.Do(testBytes, testURL, testSendFn) - time.Sleep(time.Microsecond * 500) - select { - case <-pool.Errors(): - default: - t.Fatal("expected error") - } - time.Sleep(time.Microsecond * 500) - if p.id1State != retrying { - t.Fatalf("want: %s, got %s", retrying, p.id1State) - } -} - -func TestDelivererPoolUndeliverable(t *testing.T) { - testSendFn := func(b []byte, u *url.URL) error { - if diff := deep.Equal(b, testBytes); diff != nil { - t.Fatal(diff) - } else if u != testURL { - t.Fatal("wrong testURL") - } - return fmt.Errorf("expected") - } - p := newMockDeliveryPersister(t) - pool := NewDelivererPool(DeliveryOptions{ - InitialRetryTime: time.Microsecond, - MaximumRetryTime: time.Microsecond, - BackoffFactor: 2, - MaxRetries: 1, - RateLimit: rate.NewLimiter(1000000, 10000000), - Persister: p, - }) - pool.Do(testBytes, testURL, testSendFn) - time.Sleep(time.Microsecond * 500) - <-pool.Errors() - time.Sleep(time.Microsecond * 500) - <-pool.Errors() - time.Sleep(time.Microsecond * 500) - <-pool.Errors() - time.Sleep(time.Microsecond * 500) - if p.id1State != undeliverable { - t.Fatalf("want: %s, got %s", undeliverable, p.id1State) - } -} - -func TestRestartRetrying(t *testing.T) { - testSendFn := func(b []byte, u *url.URL) error { - if diff := deep.Equal(b, testBytes); diff != nil { - t.Fatal(diff) - } else if u != testURL { - t.Fatal("wrong testURL") - } - return fmt.Errorf("expected") - } - p := newMockDeliveryPersister(t) - pool := NewDelivererPool(DeliveryOptions{ - InitialRetryTime: time.Microsecond, - MaximumRetryTime: time.Microsecond, - BackoffFactor: 2, - MaxRetries: 1, - RateLimit: rate.NewLimiter(1000000, 10000000), - Persister: p, - }) - pool.Restart(testBytes, testURL, id2, testSendFn) - time.Sleep(time.Microsecond * 500) - select { - case <-pool.Errors(): - default: - t.Fatal("expected error") - } - time.Sleep(time.Microsecond * 500) - if p.id2State != retrying { - t.Fatalf("want: %s, got %s", retrying, p.id2State) - } -} - -func TestRestartUndeliverable(t *testing.T) { - testSendFn := func(b []byte, u *url.URL) error { - if diff := deep.Equal(b, testBytes); diff != nil { - t.Fatal(diff) - } else if u != testURL { - t.Fatal("wrong testURL") - } - return fmt.Errorf("expected") - } - p := newMockDeliveryPersister(t) - pool := NewDelivererPool(DeliveryOptions{ - InitialRetryTime: time.Microsecond, - MaximumRetryTime: time.Microsecond, - BackoffFactor: 2, - MaxRetries: 1, - RateLimit: rate.NewLimiter(1000000, 10000000), - Persister: p, - }) - pool.Restart(testBytes, testURL, id2, testSendFn) - time.Sleep(time.Microsecond * 500) - <-pool.Errors() - time.Sleep(time.Microsecond * 500) - <-pool.Errors() - time.Sleep(time.Microsecond * 500) - <-pool.Errors() - time.Sleep(time.Microsecond * 500) - if p.id2State != undeliverable { - t.Fatalf("want: %s, got %s", undeliverable, p.id2State) - } -}