Fix numerous bugs. Identify more.

このコミットが含まれているのは:
Cory Slep 2019-05-03 21:44:18 +02:00
コミット 3bc578e7dd
13個のファイルの変更132行の追加105行の削除

ファイルの表示

@ -40,4 +40,6 @@ type Activity interface {
SetActivityStreamsObject(i vocab.ActivityStreamsObjectProperty)
// SetActivityStreamsTo sets the "to" property.
SetActivityStreamsTo(i vocab.ActivityStreamsToProperty)
// SetActivityStreamsAttributedTo sets the "attributedTo" property.
SetActivityStreamsAttributedTo(i vocab.ActivityStreamsAttributedToProperty)
}

ファイルの表示

@ -2,6 +2,7 @@ package pub
import (
"context"
"github.com/go-fed/activity/streams/vocab"
"net/http"
"net/url"
)
@ -52,6 +53,15 @@ type CommonBehavior interface {
// authenticated must be true and error nil. The request will continue
// to be processed.
AuthenticateGetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (authenticated bool, err error)
// GetOutbox returns the OrderedCollection inbox of the actor for this
// context. It is up to the implementation to provide the correct
// collection for the kind of authorization given in the request.
//
// AuthenticateGetOutbox will be called prior to this.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error)
// NewTransport returns a new Transport on behalf of a specific actor.
//
// The actorBoxIRI will be either the inbox or outbox of an actor who is

ファイルの表示

@ -53,6 +53,11 @@ type Database interface {
//
// The library makes this call only after acquiring a lock first.
ActorForInbox(c context.Context, inboxIRI *url.URL) (actorIRI *url.URL, err error)
// OutboxForInbox fetches the corresponding actor's outbox IRI for the
// actor's inbox IRI.
//
// The library makes this call only after acquiring a lock first.
OutboxForInbox(c context.Context, inboxIRI *url.URL) (outboxIRI *url.URL, err error)
// Exists returns true if the database has an entry for the specified
// id. It may not be owned by this application instance.
//

ファイルの表示

@ -131,6 +131,8 @@ type FederatingWrappedCallbacks struct {
db Database
// inboxIRI is the inboxIRI that is handling this callback.
inboxIRI *url.URL
// deliver delivers an outgoing message.
deliver func(c context.Context, outboxIRI *url.URL, activity Activity) error
// newTransport creates a new Transport.
newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error)
}
@ -422,6 +424,10 @@ func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivitySt
to.AppendIRI(id)
recipients = append(recipients, id)
}
// Set the 'attributedTo' property on the activity.
attrTo := streams.NewActivityStreamsAttributedToProperty()
attrTo.AppendIRI(actorIRI)
response.SetActivityStreamsAttributedTo(attrTo)
if w.OnFollow == OnFollowAutomaticallyAccept {
// If automatically accepting, then also update our
// followers collection with the new actors.
@ -448,19 +454,16 @@ func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivitySt
w.db.Unlock(c, actorIRI)
// Unlock must be called by now and every branch above.
}
m, err := serialize(response)
// Lock without defer!
w.db.Lock(c, w.inboxIRI)
outboxIRI, err := w.db.OutboxForInbox(c, w.inboxIRI)
if err != nil {
w.db.Unlock(c, w.inboxIRI)
return err
}
b, err := json.Marshal(m)
if err != nil {
return err
}
t, err := w.newTransport(c, w.inboxIRI, goFedUserAgent())
if err != nil {
return err
}
if err := t.BatchDeliver(c, b, recipients); err != nil {
w.db.Unlock(c, w.inboxIRI)
// Everything must be unlocked by now.
if err := w.deliver(c, outboxIRI, response); err != nil {
return err
}
}

ファイルの表示

@ -6,6 +6,7 @@ package pub
import (
context "context"
vocab "github.com/go-fed/activity/streams/vocab"
gomock "github.com/golang/mock/gomock"
http "net/http"
url "net/url"
@ -65,6 +66,21 @@ func (mr *MockCommonBehaviorMockRecorder) AuthenticateGetOutbox(c, w, r interfac
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AuthenticateGetOutbox", reflect.TypeOf((*MockCommonBehavior)(nil).AuthenticateGetOutbox), c, w, r)
}
// GetOutbox mocks base method
func (m *MockCommonBehavior) GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetOutbox", c, r)
ret0, _ := ret[0].(vocab.ActivityStreamsOrderedCollectionPage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetOutbox indicates an expected call of GetOutbox
func (mr *MockCommonBehaviorMockRecorder) GetOutbox(c, r interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOutbox", reflect.TypeOf((*MockCommonBehavior)(nil).GetOutbox), c, r)
}
// NewTransport mocks base method
func (m *MockCommonBehavior) NewTransport(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (Transport, error) {
m.ctrl.T.Helper()

ファイルの表示

@ -152,6 +152,21 @@ func (mr *MockDatabaseMockRecorder) ActorForInbox(c, inboxIRI interface{}) *gomo
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ActorForInbox", reflect.TypeOf((*MockDatabase)(nil).ActorForInbox), c, inboxIRI)
}
// OutboxForInbox mocks base method
func (m *MockDatabase) OutboxForInbox(c context.Context, inboxIRI *url.URL) (*url.URL, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "OutboxForInbox", c, inboxIRI)
ret0, _ := ret[0].(*url.URL)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// OutboxForInbox indicates an expected call of OutboxForInbox
func (mr *MockDatabaseMockRecorder) OutboxForInbox(c, inboxIRI interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OutboxForInbox", reflect.TypeOf((*MockDatabase)(nil).OutboxForInbox), c, inboxIRI)
}
// Exists mocks base method
func (m *MockDatabase) Exists(c context.Context, id *url.URL) (bool, error) {
m.ctrl.T.Helper()

ファイルの表示

@ -6,7 +6,6 @@ package pub
import (
context "context"
vocab "github.com/go-fed/activity/streams/vocab"
gomock "github.com/golang/mock/gomock"
http "net/http"
reflect "reflect"
@ -78,18 +77,3 @@ func (mr *MockSocialProtocolMockRecorder) DefaultCallback(c, activity interface{
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DefaultCallback", reflect.TypeOf((*MockSocialProtocol)(nil).DefaultCallback), c, activity)
}
// GetOutbox mocks base method
func (m *MockSocialProtocol) GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetOutbox", c, r)
ret0, _ := ret[0].(vocab.ActivityStreamsOrderedCollectionPage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetOutbox indicates an expected call of GetOutbox
func (mr *MockSocialProtocolMockRecorder) GetOutbox(c, r interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOutbox", reflect.TypeOf((*MockSocialProtocol)(nil).GetOutbox), c, r)
}

ファイルの表示

@ -229,7 +229,7 @@ func setupData() {
oi.AppendIRI(mustParse(testNoteId1))
oi.AppendIRI(mustParse(testNoteId2))
testOrderedCollectionUniqueElems.SetActivityStreamsOrderedItems(oi)
testOrderedCollectionUniqueElemsString = `{"@context":"https://www.w3.org/TR/activitystreams-vocabulary","orderedItems":["https://example.com/note/1","https://example.com/note/2"],"type":"OrderedCollectionPage"}`
testOrderedCollectionUniqueElemsString = `{"@context":"https://www.w3.org/ns/activitystreams","orderedItems":["https://example.com/note/1","https://example.com/note/2"],"type":"OrderedCollectionPage"}`
}()
// testOrderedCollectionDupedElems and
// testOrderedCollectionDedupedElemsString
@ -239,7 +239,7 @@ func setupData() {
oi.AppendIRI(mustParse(testNoteId1))
oi.AppendIRI(mustParse(testNoteId1))
testOrderedCollectionDupedElems.SetActivityStreamsOrderedItems(oi)
testOrderedCollectionDedupedElemsString = `{"@context":"https://www.w3.org/TR/activitystreams-vocabulary","orderedItems":"https://example.com/note/1","type":"OrderedCollectionPage"}`
testOrderedCollectionDedupedElemsString = `{"@context":"https://www.w3.org/ns/activitystreams","orderedItems":"https://example.com/note/1","type":"OrderedCollectionPage"}`
}()
// testEmptyOrderedCollection
func() {

ファイルの表示

@ -50,7 +50,7 @@ func (a *sideEffectActor) AuthenticateGetOutbox(c context.Context, w http.Respon
// GetOutbox delegates to the SocialProtocol.
func (a *sideEffectActor) GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
return a.c2s.GetOutbox(c, r)
return a.common.GetOutbox(c, r)
}
// GetInbox delegates to the FederatingProtocol.
@ -105,6 +105,7 @@ func (a *sideEffectActor) PostInbox(c context.Context, inboxIRI *url.URL, activi
wrapped.db = a.db
wrapped.inboxIRI = inboxIRI
wrapped.newTransport = a.common.NewTransport
wrapped.deliver = a.Deliver
res, err := streams.NewTypeResolver(wrapped.callbacks(other)...)
if err != nil {
return err
@ -224,7 +225,7 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL,
if err != nil {
return err
}
defer a.db.Unlock(c, iri)
// WARNING: Not Unlocked
t, err := a.db.Get(c, iri)
if err != nil {
return err
@ -233,6 +234,7 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL,
if im, ok := t.(orderedItemser); ok {
oCol[iri.String()] = im
colIRIs = append(colIRIs, iri)
defer a.db.Unlock(c, iri)
} else {
a.db.Unlock(c, iri)
}
@ -240,6 +242,7 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL,
if im, ok := t.(itemser); ok {
col[iri.String()] = im
colIRIs = append(colIRIs, iri)
defer a.db.Unlock(c, iri)
} else {
a.db.Unlock(c, iri)
}
@ -275,22 +278,24 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL,
recipients := make([]*url.URL, 0, len(toSend))
for _, iri := range toSend {
if c, ok := col[iri.String()]; ok {
it := c.GetActivityStreamsItems()
for iter := it.Begin(); iter != it.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
if it := c.GetActivityStreamsItems(); it != nil {
for iter := it.Begin(); iter != it.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
recipients = append(recipients, id)
}
recipients = append(recipients, id)
}
} else if oc, ok := oCol[iri.String()]; ok {
oit := oc.GetActivityStreamsOrderedItems()
for iter := oit.Begin(); iter != oit.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
if oit := oc.GetActivityStreamsOrderedItems(); oit != nil {
for iter := oit.Begin(); iter != oit.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
recipients = append(recipients, id)
}
recipients = append(recipients, id)
}
}
}
@ -351,19 +356,20 @@ func (a *sideEffectActor) AddNewIds(c context.Context, activity Activity) error
if !ok {
return fmt.Errorf("cannot add new id for Create: %T has no object property", activity)
}
oProp := o.GetActivityStreamsObject()
for iter := oProp.Begin(); iter != oProp.End(); iter = iter.Next() {
t := iter.GetType()
if t == nil {
return fmt.Errorf("cannot add new id for object in Create: object is not embedded as a value literal")
if oProp := o.GetActivityStreamsObject(); oProp != nil {
for iter := oProp.Begin(); iter != oProp.End(); iter = iter.Next() {
t := iter.GetType()
if t == nil {
return fmt.Errorf("cannot add new id for object in Create: object is not embedded as a value literal")
}
id, err = a.db.NewId(c, t)
if err != nil {
return err
}
objId := streams.NewActivityStreamsIdProperty()
objId.Set(id)
t.SetActivityStreamsId(objId)
}
id, err = a.db.NewId(c, t)
if err != nil {
return err
}
objId := streams.NewActivityStreamsIdProperty()
objId.Set(id)
t.SetActivityStreamsId(objId)
}
}
return nil
@ -675,15 +681,6 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit
return
}
a.db.Unlock(c, outboxIRI)
// Make sure this matches the 'attributedTo' on the activity.
attrTo := activity.GetActivityStreamsAttributedTo()
if attrTo.Len() != 1 {
return nil, fmt.Errorf("federated c2s object does not have exactly one attributedTo value: %d", attrTo.Len())
} else if attrToIRI, err := ToId(attrTo.At(0)); err != nil {
return nil, err
} else if attrToIRI.String() != actorIRI.String() {
return nil, fmt.Errorf("federated c2s object attributedTo value does not match this actor")
}
// Get the inbox on the sender.
err = a.db.Lock(c, actorIRI)
if err != nil {
@ -724,6 +721,8 @@ func (a *sideEffectActor) resolveInboxes(c context.Context, t Transport, r []*ur
for _, u := range r {
var act vocab.Type
var more []*url.URL
// TODO: Determine if more logic is needed here for inaccessible
// collections owned by peer servers.
act, more, err = a.dereferenceForResolvingInboxes(c, t, u)
if err != nil {
return
@ -733,7 +732,9 @@ func (a *sideEffectActor) resolveInboxes(c context.Context, t Transport, r []*ur
if err != nil {
return
}
actors = append(actors, act)
if act != nil {
actors = append(actors, act)
}
actors = append(actors, recurActors...)
}
return
@ -741,6 +742,9 @@ func (a *sideEffectActor) resolveInboxes(c context.Context, t Transport, r []*ur
// dereferenceForResolvingInboxes dereferences an IRI solely for finding an
// actor's inbox IRI to deliver to.
//
// The returned actor could be nil, if it wasn't an actor (ex: a Collection or
// OrderedCollection).
func (a *sideEffectActor) dereferenceForResolvingInboxes(c context.Context, t Transport, actorIRI *url.URL) (actor vocab.Type, moreActorIRIs []*url.URL, err error) {
var resp []byte
resp, err = t.Dereference(c, actorIRI)
@ -758,25 +762,29 @@ func (a *sideEffectActor) dereferenceForResolvingInboxes(c context.Context, t Tr
// Attempt to see if the 'actor' is really some sort of type that has
// an 'items' or 'orderedItems' property.
if v, ok := actor.(itemser); ok {
i := v.GetActivityStreamsItems()
for iter := i.Begin(); iter != i.End(); iter = iter.Next() {
var id *url.URL
id, err = ToId(iter)
if err != nil {
return
if i := v.GetActivityStreamsItems(); i != nil {
for iter := i.Begin(); iter != i.End(); iter = iter.Next() {
var id *url.URL
id, err = ToId(iter)
if err != nil {
return
}
moreActorIRIs = append(moreActorIRIs, id)
}
moreActorIRIs = append(moreActorIRIs, id)
}
actor = nil
} else if v, ok := actor.(orderedItemser); ok {
i := v.GetActivityStreamsOrderedItems()
for iter := i.Begin(); iter != i.End(); iter = iter.Next() {
var id *url.URL
id, err = ToId(iter)
if err != nil {
return
if i := v.GetActivityStreamsOrderedItems(); i != nil {
for iter := i.Begin(); iter != i.End(); iter = iter.Next() {
var id *url.URL
id, err = ToId(iter)
if err != nil {
return
}
moreActorIRIs = append(moreActorIRIs, id)
}
moreActorIRIs = append(moreActorIRIs, id)
}
actor = nil
}
return
}

ファイルの表示

@ -87,9 +87,9 @@ func TestPassThroughMethods(t *testing.T) {
// Setup
ctl := gomock.NewController(t)
defer ctl.Finish()
_, _, sp, _, _, a := setupFn(ctl)
c, _, _, _, _, a := setupFn(ctl)
req := toAPRequest(toGetOutboxRequest())
sp.EXPECT().GetOutbox(ctx, req).Return(testOrderedCollectionUniqueElems, testErr)
c.EXPECT().GetOutbox(ctx, req).Return(testOrderedCollectionUniqueElems, testErr)
// Run
p, err := a.GetOutbox(ctx, req)
// Verify
@ -469,9 +469,6 @@ func TestInboxForwarding(t *testing.T) {
db.EXPECT().Lock(ctx, mustParse(testToIRI2)),
db.EXPECT().Get(ctx, mustParse(testToIRI2)).Return(testService, nil),
db.EXPECT().Unlock(ctx, mustParse(testToIRI2)),
// Deferred
db.EXPECT().Unlock(ctx, mustParse(testToIRI2)),
db.EXPECT().Unlock(ctx, mustParse(testToIRI)),
)
// Run
err := a.InboxForwarding(ctx, mustParse(testMyInboxIRI), input)
@ -501,9 +498,6 @@ func TestInboxForwarding(t *testing.T) {
db.EXPECT().Lock(ctx, mustParse(testCcIRI2)),
db.EXPECT().Get(ctx, mustParse(testCcIRI2)).Return(testService, nil),
db.EXPECT().Unlock(ctx, mustParse(testCcIRI2)),
// Deferred
db.EXPECT().Unlock(ctx, mustParse(testCcIRI2)),
db.EXPECT().Unlock(ctx, mustParse(testCcIRI)),
)
// Run
err := a.InboxForwarding(ctx, mustParse(testMyInboxIRI), input)
@ -533,9 +527,6 @@ func TestInboxForwarding(t *testing.T) {
db.EXPECT().Lock(ctx, mustParse(testAudienceIRI2)),
db.EXPECT().Get(ctx, mustParse(testAudienceIRI2)).Return(testService, nil),
db.EXPECT().Unlock(ctx, mustParse(testAudienceIRI2)),
// Deferred
db.EXPECT().Unlock(ctx, mustParse(testAudienceIRI2)),
db.EXPECT().Unlock(ctx, mustParse(testAudienceIRI)),
)
// Run
err := a.InboxForwarding(ctx, mustParse(testMyInboxIRI), input)

ファイルの表示

@ -2,7 +2,6 @@ package pub
import (
"context"
"github.com/go-fed/activity/streams/vocab"
"net/http"
)
@ -62,13 +61,4 @@ type SocialProtocol interface {
// type and extension, so the unhandled ones are passed to
// DefaultCallback.
DefaultCallback(c context.Context, activity Activity) error
// GetOutbox returns the OrderedCollection inbox of the actor for this
// context. It is up to the implementation to provide the correct
// collection for the kind of authorization given in the request.
//
// AuthenticateGetOutbox will be called prior to this.
//
// Always called, regardless whether the Federated Protocol or Social
// API is enabled.
GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error)
}

ファイルの表示

@ -56,8 +56,8 @@ type HttpSigTransport struct {
appAgent string
gofedAgent string
clock Clock
getSigner httpsig.Signer
postSigner httpsig.Signer
getSigner httpsig.Signer
postSigner httpsig.Signer
pubKeyId string
privKey crypto.PrivateKey
}
@ -89,8 +89,8 @@ func NewHttpSigTransport(
appAgent: appAgent,
gofedAgent: goFedUserAgent(),
clock: clock,
getSigner: getSigner,
postSigner: postSigner,
getSigner: getSigner,
postSigner: postSigner,
pubKeyId: pubKeyId,
privKey: privKey,
}
@ -140,7 +140,7 @@ func (h HttpSigTransport) Deliver(c context.Context, b []byte, to *url.URL) erro
sum := sha256.Sum256(b)
req.Header.Add("Digest",
fmt.Sprintf("SHA-256=%s",
base64.StdEncoding.EncodeToString(sum[:])))
base64.StdEncoding.EncodeToString(sum[:])))
err = h.postSigner.SignRequest(h.privKey, h.pubKeyId, req)
if err != nil {
return err
@ -150,6 +150,7 @@ func (h HttpSigTransport) Deliver(c context.Context, b []byte, to *url.URL) erro
return err
}
defer resp.Body.Close()
// TODO: Other 20X response codes could be OK, too.
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("POST request to %s failed (%d): %s", to.String(), resp.StatusCode, resp.Status)
}
@ -172,12 +173,13 @@ func (h HttpSigTransport) BatchDeliver(c context.Context, b []byte, recipients [
}
wg.Wait()
errs := make([]string, 0, len(recipients))
outer:
for {
select {
case e := <-errCh:
errs = append(errs, e.Error())
default:
break
break outer
}
}
if len(errs) > 0 {

ファイルの表示

@ -157,6 +157,7 @@ func serialize(a vocab.Type) (m map[string]interface{}, e error) {
}
contextValue = append(arr, aliases)
}
// TODO: Update the context instead if it already exists
m[jsonLDContext] = contextValue
return
}