From db08c01e8548688245a8004810413a711d307489 Mon Sep 17 00:00:00 2001 From: Cory Slep Date: Wed, 13 Jun 2018 22:52:19 +0200 Subject: [PATCH] Initial deliverer tests --- deliverer/deliverer.go | 12 +- deliverer/deliverer_test.go | 311 ++++++++++++++++++++++++++++++++++++ 2 files changed, 322 insertions(+), 1 deletion(-) create mode 100644 deliverer/deliverer_test.go diff --git a/deliverer/deliverer.go b/deliverer/deliverer.go index 8aacc55..54fccb2 100644 --- a/deliverer/deliverer.go +++ b/deliverer/deliverer.go @@ -125,7 +125,9 @@ func (r retryData) ShouldRetry(max int) bool { return r.n < max } -// Do spawns a goroutine that retries f until it returns no error. +// Do spawns a goroutine that retries f until it returns no error. Retry +// behavior is determined by the DeliveryOptions passed to the DelivererPool +// upon construction. func (d *DelivererPool) Do(b []byte, to *url.URL, sendFn func([]byte, *url.URL) error) { f := func() error { return sendFn(b, to) @@ -144,6 +146,10 @@ func (d *DelivererPool) Do(b []byte, to *url.URL, sendFn func([]byte, *url.URL) }() } +// Restart resumes a previous attempt at delivering a payload to the specified +// URL. Retry behavior is determined by the DeliveryOptions passed to this +// DelivererPool upon construction, and is not governed by the previous +// DelivererPool that attempted to deliver the message. func (d *DelivererPool) Restart(b []byte, to *url.URL, id string, sendFn func([]byte, *url.URL) error) { f := func() error { return sendFn(b, to) @@ -158,11 +164,14 @@ func (d *DelivererPool) Restart(b []byte, to *url.URL, id string, sendFn func([] }() } +// Stop turns down and stops any in-flight requests or retries. func (d *DelivererPool) Stop() { d.cancel() d.closeTimers() } +// Provides a channel streaming any errors the pool encounters, including errors +// that it retries on. func (d *DelivererPool) Errors() <-chan error { return d.errChan } @@ -188,6 +197,7 @@ func (d *DelivererPool) do(r retryData) { d.persister.Undeliverable(r.id) } } + return } if d.persister != nil { d.persister.Successful(r.id) diff --git a/deliverer/deliverer_test.go b/deliverer/deliverer_test.go new file mode 100644 index 0000000..0227845 --- /dev/null +++ b/deliverer/deliverer_test.go @@ -0,0 +1,311 @@ +package deliverer + +import ( + "fmt" + "github.com/go-test/deep" + "golang.org/x/time/rate" + "net/url" + "sync" + "testing" + "time" +) + +const ( + id1 = "id1" + id2 = "id2" + sending = "sending" + cancel = "cancel" + successful = "successful" + retrying = "retrying" + undeliverable = "undeliverable" + noState = "noState" +) + +var ( + testBytes []byte = []byte{0, 1, 2, 3} + testURL *url.URL +) + +func init() { + var err error + testURL, err = url.Parse("example.com") + if err != nil { + panic(err) + } +} + +var _ DeliveryPersister = &mockDeliveryPersister{} + +type mockDeliveryPersister struct { + t *testing.T + i int + mu *sync.Mutex + id1State string + id2State string +} + +func newMockDeliveryPersister(t *testing.T) *mockDeliveryPersister { + return &mockDeliveryPersister{ + t: t, + mu: &sync.Mutex{}, + id1State: noState, + id2State: noState, + } +} + +func (m *mockDeliveryPersister) Sending(b []byte, to *url.URL) string { + m.mu.Lock() + defer m.mu.Unlock() + if m.i == 0 { + m.i++ + return id1 + } else if m.i == 1 { + m.i++ + return id2 + } else { + m.t.Fatal("too many calls to Sending") + } + return "" +} + +func (m *mockDeliveryPersister) Cancel(id string) { + m.mu.Lock() + defer m.mu.Unlock() + if id == id1 { + m.id1State = cancel + } else if id == id2 { + m.id2State = cancel + } else { + m.t.Fatalf("unknown Cancel id: %s", id) + } +} + +func (m *mockDeliveryPersister) Successful(id string) { + m.mu.Lock() + defer m.mu.Unlock() + if id == id1 { + m.id1State = successful + } else if id == id2 { + m.id2State = successful + } else { + m.t.Fatalf("unknown Successful id: %s", id) + } +} + +func (m *mockDeliveryPersister) Retrying(id string) { + m.mu.Lock() + defer m.mu.Unlock() + if id == id1 { + m.id1State = retrying + } else if id == id2 { + m.id2State = retrying + } else { + m.t.Fatalf("unknown Retrying id: %s", id) + } +} + +func (m *mockDeliveryPersister) Undeliverable(id string) { + m.mu.Lock() + defer m.mu.Unlock() + if id == id1 { + m.id1State = undeliverable + } else if id == id2 { + m.id2State = undeliverable + } else { + m.t.Fatalf("unknown Retrying id: %s", id) + } +} + +func TestDelivererPoolSuccessNoPersister(t *testing.T) { + testSendFn := func(b []byte, u *url.URL) error { + if diff := deep.Equal(b, testBytes); diff != nil { + t.Fatal(diff) + } else if u != testURL { + t.Fatal("wrong testURL") + } + return nil + } + pool := NewDelivererPool(DeliveryOptions{ + InitialRetryTime: time.Microsecond, + MaximumRetryTime: time.Microsecond, + BackoffFactor: 2, + MaxRetries: 1, + RateLimit: rate.NewLimiter(1, 1), + }) + pool.Do(testBytes, testURL, testSendFn) + time.Sleep(time.Microsecond * 500) +} + +func TestDelivererPoolSuccessPersister(t *testing.T) { + testSendFn := func(b []byte, u *url.URL) error { + if diff := deep.Equal(b, testBytes); diff != nil { + t.Fatal(diff) + } else if u != testURL { + t.Fatal("wrong testURL") + } + return nil + } + p := newMockDeliveryPersister(t) + pool := NewDelivererPool(DeliveryOptions{ + InitialRetryTime: time.Microsecond, + MaximumRetryTime: time.Microsecond, + BackoffFactor: 2, + MaxRetries: 1, + RateLimit: rate.NewLimiter(1, 1), + Persister: p, + }) + pool.Do(testBytes, testURL, testSendFn) + time.Sleep(time.Microsecond * 500) + if p.id1State != successful { + t.Fatalf("want: %s, got %s", successful, p.id1State) + } +} + +func TestRestartSuccess(t *testing.T) { + testSendFn := func(b []byte, u *url.URL) error { + if diff := deep.Equal(b, testBytes); diff != nil { + t.Fatal(diff) + } else if u != testURL { + t.Fatal("wrong testURL") + } + return nil + } + p := newMockDeliveryPersister(t) + pool := NewDelivererPool(DeliveryOptions{ + InitialRetryTime: time.Microsecond, + MaximumRetryTime: time.Microsecond, + BackoffFactor: 2, + MaxRetries: 1, + RateLimit: rate.NewLimiter(1, 1), + Persister: p, + }) + pool.Restart(testBytes, testURL, id2, testSendFn) + time.Sleep(time.Microsecond * 500) + if p.id2State != successful { + t.Fatalf("want: %s, got %s", successful, p.id1State) + } +} + +func TestDelivererPoolRetrying(t *testing.T) { + testSendFn := func(b []byte, u *url.URL) error { + if diff := deep.Equal(b, testBytes); diff != nil { + t.Fatal(diff) + } else if u != testURL { + t.Fatal("wrong testURL") + } + return fmt.Errorf("expected") + } + p := newMockDeliveryPersister(t) + pool := NewDelivererPool(DeliveryOptions{ + InitialRetryTime: time.Microsecond, + MaximumRetryTime: time.Microsecond, + BackoffFactor: 2, + MaxRetries: 1, + RateLimit: rate.NewLimiter(1000000, 10000000), + Persister: p, + }) + pool.Do(testBytes, testURL, testSendFn) + time.Sleep(time.Microsecond * 500) + select { + case <-pool.Errors(): + default: + t.Fatal("expected error") + } + time.Sleep(time.Microsecond * 500) + if p.id1State != retrying { + t.Fatalf("want: %s, got %s", retrying, p.id1State) + } +} + +func TestDelivererPoolUndeliverable(t *testing.T) { + testSendFn := func(b []byte, u *url.URL) error { + if diff := deep.Equal(b, testBytes); diff != nil { + t.Fatal(diff) + } else if u != testURL { + t.Fatal("wrong testURL") + } + return fmt.Errorf("expected") + } + p := newMockDeliveryPersister(t) + pool := NewDelivererPool(DeliveryOptions{ + InitialRetryTime: time.Microsecond, + MaximumRetryTime: time.Microsecond, + BackoffFactor: 2, + MaxRetries: 1, + RateLimit: rate.NewLimiter(1000000, 10000000), + Persister: p, + }) + pool.Do(testBytes, testURL, testSendFn) + time.Sleep(time.Microsecond * 500) + <-pool.Errors() + time.Sleep(time.Microsecond * 500) + <-pool.Errors() + time.Sleep(time.Microsecond * 500) + <-pool.Errors() + time.Sleep(time.Microsecond * 500) + if p.id1State != undeliverable { + t.Fatalf("want: %s, got %s", undeliverable, p.id1State) + } +} + +func TestRestartRetrying(t *testing.T) { + testSendFn := func(b []byte, u *url.URL) error { + if diff := deep.Equal(b, testBytes); diff != nil { + t.Fatal(diff) + } else if u != testURL { + t.Fatal("wrong testURL") + } + return fmt.Errorf("expected") + } + p := newMockDeliveryPersister(t) + pool := NewDelivererPool(DeliveryOptions{ + InitialRetryTime: time.Microsecond, + MaximumRetryTime: time.Microsecond, + BackoffFactor: 2, + MaxRetries: 1, + RateLimit: rate.NewLimiter(1000000, 10000000), + Persister: p, + }) + pool.Restart(testBytes, testURL, id2, testSendFn) + time.Sleep(time.Microsecond * 500) + select { + case <-pool.Errors(): + default: + t.Fatal("expected error") + } + time.Sleep(time.Microsecond * 500) + if p.id2State != retrying { + t.Fatalf("want: %s, got %s", retrying, p.id2State) + } +} + +func TestRestartUndeliverable(t *testing.T) { + testSendFn := func(b []byte, u *url.URL) error { + if diff := deep.Equal(b, testBytes); diff != nil { + t.Fatal(diff) + } else if u != testURL { + t.Fatal("wrong testURL") + } + return fmt.Errorf("expected") + } + p := newMockDeliveryPersister(t) + pool := NewDelivererPool(DeliveryOptions{ + InitialRetryTime: time.Microsecond, + MaximumRetryTime: time.Microsecond, + BackoffFactor: 2, + MaxRetries: 1, + RateLimit: rate.NewLimiter(1000000, 10000000), + Persister: p, + }) + pool.Restart(testBytes, testURL, id2, testSendFn) + time.Sleep(time.Microsecond * 500) + <-pool.Errors() + time.Sleep(time.Microsecond * 500) + <-pool.Errors() + time.Sleep(time.Microsecond * 500) + <-pool.Errors() + time.Sleep(time.Microsecond * 500) + if p.id2State != undeliverable { + t.Fatalf("want: %s, got %s", undeliverable, p.id2State) + } +}