diff --git a/pub/activity.go b/pub/activity.go index e96d6cc..c1de39f 100644 --- a/pub/activity.go +++ b/pub/activity.go @@ -40,4 +40,6 @@ type Activity interface { SetActivityStreamsObject(i vocab.ActivityStreamsObjectProperty) // SetActivityStreamsTo sets the "to" property. SetActivityStreamsTo(i vocab.ActivityStreamsToProperty) + // SetActivityStreamsAttributedTo sets the "attributedTo" property. + SetActivityStreamsAttributedTo(i vocab.ActivityStreamsAttributedToProperty) } diff --git a/pub/common_behavior.go b/pub/common_behavior.go index 504b514..5d602ee 100644 --- a/pub/common_behavior.go +++ b/pub/common_behavior.go @@ -2,6 +2,7 @@ package pub import ( "context" + "github.com/go-fed/activity/streams/vocab" "net/http" "net/url" ) @@ -52,6 +53,15 @@ type CommonBehavior interface { // authenticated must be true and error nil. The request will continue // to be processed. AuthenticateGetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (authenticated bool, err error) + // GetOutbox returns the OrderedCollection inbox of the actor for this + // context. It is up to the implementation to provide the correct + // collection for the kind of authorization given in the request. + // + // AuthenticateGetOutbox will be called prior to this. + // + // Always called, regardless whether the Federated Protocol or Social + // API is enabled. + GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) // NewTransport returns a new Transport on behalf of a specific actor. // // The actorBoxIRI will be either the inbox or outbox of an actor who is diff --git a/pub/database.go b/pub/database.go index a6dff4f..10b8484 100644 --- a/pub/database.go +++ b/pub/database.go @@ -53,6 +53,11 @@ type Database interface { // // The library makes this call only after acquiring a lock first. ActorForInbox(c context.Context, inboxIRI *url.URL) (actorIRI *url.URL, err error) + // OutboxForInbox fetches the corresponding actor's outbox IRI for the + // actor's inbox IRI. + // + // The library makes this call only after acquiring a lock first. + OutboxForInbox(c context.Context, inboxIRI *url.URL) (outboxIRI *url.URL, err error) // Exists returns true if the database has an entry for the specified // id. It may not be owned by this application instance. // diff --git a/pub/federating_wrapped_callbacks.go b/pub/federating_wrapped_callbacks.go index 294108d..01cb9c0 100644 --- a/pub/federating_wrapped_callbacks.go +++ b/pub/federating_wrapped_callbacks.go @@ -131,6 +131,8 @@ type FederatingWrappedCallbacks struct { db Database // inboxIRI is the inboxIRI that is handling this callback. inboxIRI *url.URL + // deliver delivers an outgoing message. + deliver func(c context.Context, outboxIRI *url.URL, activity Activity) error // newTransport creates a new Transport. newTransport func(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (t Transport, err error) } @@ -422,6 +424,10 @@ func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivitySt to.AppendIRI(id) recipients = append(recipients, id) } + // Set the 'attributedTo' property on the activity. + attrTo := streams.NewActivityStreamsAttributedToProperty() + attrTo.AppendIRI(actorIRI) + response.SetActivityStreamsAttributedTo(attrTo) if w.OnFollow == OnFollowAutomaticallyAccept { // If automatically accepting, then also update our // followers collection with the new actors. @@ -448,19 +454,16 @@ func (w FederatingWrappedCallbacks) follow(c context.Context, a vocab.ActivitySt w.db.Unlock(c, actorIRI) // Unlock must be called by now and every branch above. } - m, err := serialize(response) + // Lock without defer! + w.db.Lock(c, w.inboxIRI) + outboxIRI, err := w.db.OutboxForInbox(c, w.inboxIRI) if err != nil { + w.db.Unlock(c, w.inboxIRI) return err } - b, err := json.Marshal(m) - if err != nil { - return err - } - t, err := w.newTransport(c, w.inboxIRI, goFedUserAgent()) - if err != nil { - return err - } - if err := t.BatchDeliver(c, b, recipients); err != nil { + w.db.Unlock(c, w.inboxIRI) + // Everything must be unlocked by now. + if err := w.deliver(c, outboxIRI, response); err != nil { return err } } diff --git a/pub/mock_common_behavior_test.go b/pub/mock_common_behavior_test.go index 8c0e993..c07ae41 100644 --- a/pub/mock_common_behavior_test.go +++ b/pub/mock_common_behavior_test.go @@ -6,6 +6,7 @@ package pub import ( context "context" + vocab "github.com/go-fed/activity/streams/vocab" gomock "github.com/golang/mock/gomock" http "net/http" url "net/url" @@ -65,6 +66,21 @@ func (mr *MockCommonBehaviorMockRecorder) AuthenticateGetOutbox(c, w, r interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AuthenticateGetOutbox", reflect.TypeOf((*MockCommonBehavior)(nil).AuthenticateGetOutbox), c, w, r) } +// GetOutbox mocks base method +func (m *MockCommonBehavior) GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOutbox", c, r) + ret0, _ := ret[0].(vocab.ActivityStreamsOrderedCollectionPage) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetOutbox indicates an expected call of GetOutbox +func (mr *MockCommonBehaviorMockRecorder) GetOutbox(c, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOutbox", reflect.TypeOf((*MockCommonBehavior)(nil).GetOutbox), c, r) +} + // NewTransport mocks base method func (m *MockCommonBehavior) NewTransport(c context.Context, actorBoxIRI *url.URL, gofedAgent string) (Transport, error) { m.ctrl.T.Helper() diff --git a/pub/mock_database_test.go b/pub/mock_database_test.go index 985609d..35162e3 100644 --- a/pub/mock_database_test.go +++ b/pub/mock_database_test.go @@ -152,6 +152,21 @@ func (mr *MockDatabaseMockRecorder) ActorForInbox(c, inboxIRI interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ActorForInbox", reflect.TypeOf((*MockDatabase)(nil).ActorForInbox), c, inboxIRI) } +// OutboxForInbox mocks base method +func (m *MockDatabase) OutboxForInbox(c context.Context, inboxIRI *url.URL) (*url.URL, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OutboxForInbox", c, inboxIRI) + ret0, _ := ret[0].(*url.URL) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OutboxForInbox indicates an expected call of OutboxForInbox +func (mr *MockDatabaseMockRecorder) OutboxForInbox(c, inboxIRI interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OutboxForInbox", reflect.TypeOf((*MockDatabase)(nil).OutboxForInbox), c, inboxIRI) +} + // Exists mocks base method func (m *MockDatabase) Exists(c context.Context, id *url.URL) (bool, error) { m.ctrl.T.Helper() diff --git a/pub/mock_social_protocol_test.go b/pub/mock_social_protocol_test.go index fd7ab7f..7920773 100644 --- a/pub/mock_social_protocol_test.go +++ b/pub/mock_social_protocol_test.go @@ -6,7 +6,6 @@ package pub import ( context "context" - vocab "github.com/go-fed/activity/streams/vocab" gomock "github.com/golang/mock/gomock" http "net/http" reflect "reflect" @@ -78,18 +77,3 @@ func (mr *MockSocialProtocolMockRecorder) DefaultCallback(c, activity interface{ mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DefaultCallback", reflect.TypeOf((*MockSocialProtocol)(nil).DefaultCallback), c, activity) } - -// GetOutbox mocks base method -func (m *MockSocialProtocol) GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetOutbox", c, r) - ret0, _ := ret[0].(vocab.ActivityStreamsOrderedCollectionPage) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetOutbox indicates an expected call of GetOutbox -func (mr *MockSocialProtocolMockRecorder) GetOutbox(c, r interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOutbox", reflect.TypeOf((*MockSocialProtocol)(nil).GetOutbox), c, r) -} diff --git a/pub/pub_test.go b/pub/pub_test.go index 3e4b945..a4ff700 100644 --- a/pub/pub_test.go +++ b/pub/pub_test.go @@ -229,7 +229,7 @@ func setupData() { oi.AppendIRI(mustParse(testNoteId1)) oi.AppendIRI(mustParse(testNoteId2)) testOrderedCollectionUniqueElems.SetActivityStreamsOrderedItems(oi) - testOrderedCollectionUniqueElemsString = `{"@context":"https://www.w3.org/TR/activitystreams-vocabulary","orderedItems":["https://example.com/note/1","https://example.com/note/2"],"type":"OrderedCollectionPage"}` + testOrderedCollectionUniqueElemsString = `{"@context":"https://www.w3.org/ns/activitystreams","orderedItems":["https://example.com/note/1","https://example.com/note/2"],"type":"OrderedCollectionPage"}` }() // testOrderedCollectionDupedElems and // testOrderedCollectionDedupedElemsString @@ -239,7 +239,7 @@ func setupData() { oi.AppendIRI(mustParse(testNoteId1)) oi.AppendIRI(mustParse(testNoteId1)) testOrderedCollectionDupedElems.SetActivityStreamsOrderedItems(oi) - testOrderedCollectionDedupedElemsString = `{"@context":"https://www.w3.org/TR/activitystreams-vocabulary","orderedItems":"https://example.com/note/1","type":"OrderedCollectionPage"}` + testOrderedCollectionDedupedElemsString = `{"@context":"https://www.w3.org/ns/activitystreams","orderedItems":"https://example.com/note/1","type":"OrderedCollectionPage"}` }() // testEmptyOrderedCollection func() { diff --git a/pub/side_effect_actor.go b/pub/side_effect_actor.go index b1054c7..5a1d9cc 100644 --- a/pub/side_effect_actor.go +++ b/pub/side_effect_actor.go @@ -50,7 +50,7 @@ func (a *sideEffectActor) AuthenticateGetOutbox(c context.Context, w http.Respon // GetOutbox delegates to the SocialProtocol. func (a *sideEffectActor) GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) { - return a.c2s.GetOutbox(c, r) + return a.common.GetOutbox(c, r) } // GetInbox delegates to the FederatingProtocol. @@ -105,6 +105,7 @@ func (a *sideEffectActor) PostInbox(c context.Context, inboxIRI *url.URL, activi wrapped.db = a.db wrapped.inboxIRI = inboxIRI wrapped.newTransport = a.common.NewTransport + wrapped.deliver = a.Deliver res, err := streams.NewTypeResolver(wrapped.callbacks(other)...) if err != nil { return err @@ -224,7 +225,7 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, if err != nil { return err } - defer a.db.Unlock(c, iri) + // WARNING: Not Unlocked t, err := a.db.Get(c, iri) if err != nil { return err @@ -233,6 +234,7 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, if im, ok := t.(orderedItemser); ok { oCol[iri.String()] = im colIRIs = append(colIRIs, iri) + defer a.db.Unlock(c, iri) } else { a.db.Unlock(c, iri) } @@ -240,6 +242,7 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, if im, ok := t.(itemser); ok { col[iri.String()] = im colIRIs = append(colIRIs, iri) + defer a.db.Unlock(c, iri) } else { a.db.Unlock(c, iri) } @@ -275,22 +278,24 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, recipients := make([]*url.URL, 0, len(toSend)) for _, iri := range toSend { if c, ok := col[iri.String()]; ok { - it := c.GetActivityStreamsItems() - for iter := it.Begin(); iter != it.End(); iter = iter.Next() { - id, err := ToId(iter) - if err != nil { - return err + if it := c.GetActivityStreamsItems(); it != nil { + for iter := it.Begin(); iter != it.End(); iter = iter.Next() { + id, err := ToId(iter) + if err != nil { + return err + } + recipients = append(recipients, id) } - recipients = append(recipients, id) } } else if oc, ok := oCol[iri.String()]; ok { - oit := oc.GetActivityStreamsOrderedItems() - for iter := oit.Begin(); iter != oit.End(); iter = iter.Next() { - id, err := ToId(iter) - if err != nil { - return err + if oit := oc.GetActivityStreamsOrderedItems(); oit != nil { + for iter := oit.Begin(); iter != oit.End(); iter = iter.Next() { + id, err := ToId(iter) + if err != nil { + return err + } + recipients = append(recipients, id) } - recipients = append(recipients, id) } } } @@ -351,19 +356,20 @@ func (a *sideEffectActor) AddNewIds(c context.Context, activity Activity) error if !ok { return fmt.Errorf("cannot add new id for Create: %T has no object property", activity) } - oProp := o.GetActivityStreamsObject() - for iter := oProp.Begin(); iter != oProp.End(); iter = iter.Next() { - t := iter.GetType() - if t == nil { - return fmt.Errorf("cannot add new id for object in Create: object is not embedded as a value literal") + if oProp := o.GetActivityStreamsObject(); oProp != nil { + for iter := oProp.Begin(); iter != oProp.End(); iter = iter.Next() { + t := iter.GetType() + if t == nil { + return fmt.Errorf("cannot add new id for object in Create: object is not embedded as a value literal") + } + id, err = a.db.NewId(c, t) + if err != nil { + return err + } + objId := streams.NewActivityStreamsIdProperty() + objId.Set(id) + t.SetActivityStreamsId(objId) } - id, err = a.db.NewId(c, t) - if err != nil { - return err - } - objId := streams.NewActivityStreamsIdProperty() - objId.Set(id) - t.SetActivityStreamsId(objId) } } return nil @@ -675,15 +681,6 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit return } a.db.Unlock(c, outboxIRI) - // Make sure this matches the 'attributedTo' on the activity. - attrTo := activity.GetActivityStreamsAttributedTo() - if attrTo.Len() != 1 { - return nil, fmt.Errorf("federated c2s object does not have exactly one attributedTo value: %d", attrTo.Len()) - } else if attrToIRI, err := ToId(attrTo.At(0)); err != nil { - return nil, err - } else if attrToIRI.String() != actorIRI.String() { - return nil, fmt.Errorf("federated c2s object attributedTo value does not match this actor") - } // Get the inbox on the sender. err = a.db.Lock(c, actorIRI) if err != nil { @@ -724,6 +721,8 @@ func (a *sideEffectActor) resolveInboxes(c context.Context, t Transport, r []*ur for _, u := range r { var act vocab.Type var more []*url.URL + // TODO: Determine if more logic is needed here for inaccessible + // collections owned by peer servers. act, more, err = a.dereferenceForResolvingInboxes(c, t, u) if err != nil { return @@ -733,7 +732,9 @@ func (a *sideEffectActor) resolveInboxes(c context.Context, t Transport, r []*ur if err != nil { return } - actors = append(actors, act) + if act != nil { + actors = append(actors, act) + } actors = append(actors, recurActors...) } return @@ -741,6 +742,9 @@ func (a *sideEffectActor) resolveInboxes(c context.Context, t Transport, r []*ur // dereferenceForResolvingInboxes dereferences an IRI solely for finding an // actor's inbox IRI to deliver to. +// +// The returned actor could be nil, if it wasn't an actor (ex: a Collection or +// OrderedCollection). func (a *sideEffectActor) dereferenceForResolvingInboxes(c context.Context, t Transport, actorIRI *url.URL) (actor vocab.Type, moreActorIRIs []*url.URL, err error) { var resp []byte resp, err = t.Dereference(c, actorIRI) @@ -758,25 +762,29 @@ func (a *sideEffectActor) dereferenceForResolvingInboxes(c context.Context, t Tr // Attempt to see if the 'actor' is really some sort of type that has // an 'items' or 'orderedItems' property. if v, ok := actor.(itemser); ok { - i := v.GetActivityStreamsItems() - for iter := i.Begin(); iter != i.End(); iter = iter.Next() { - var id *url.URL - id, err = ToId(iter) - if err != nil { - return + if i := v.GetActivityStreamsItems(); i != nil { + for iter := i.Begin(); iter != i.End(); iter = iter.Next() { + var id *url.URL + id, err = ToId(iter) + if err != nil { + return + } + moreActorIRIs = append(moreActorIRIs, id) } - moreActorIRIs = append(moreActorIRIs, id) } + actor = nil } else if v, ok := actor.(orderedItemser); ok { - i := v.GetActivityStreamsOrderedItems() - for iter := i.Begin(); iter != i.End(); iter = iter.Next() { - var id *url.URL - id, err = ToId(iter) - if err != nil { - return + if i := v.GetActivityStreamsOrderedItems(); i != nil { + for iter := i.Begin(); iter != i.End(); iter = iter.Next() { + var id *url.URL + id, err = ToId(iter) + if err != nil { + return + } + moreActorIRIs = append(moreActorIRIs, id) } - moreActorIRIs = append(moreActorIRIs, id) } + actor = nil } return } diff --git a/pub/side_effect_actor_test.go b/pub/side_effect_actor_test.go index 1feba4b..94fa9ee 100644 --- a/pub/side_effect_actor_test.go +++ b/pub/side_effect_actor_test.go @@ -87,9 +87,9 @@ func TestPassThroughMethods(t *testing.T) { // Setup ctl := gomock.NewController(t) defer ctl.Finish() - _, _, sp, _, _, a := setupFn(ctl) + c, _, _, _, _, a := setupFn(ctl) req := toAPRequest(toGetOutboxRequest()) - sp.EXPECT().GetOutbox(ctx, req).Return(testOrderedCollectionUniqueElems, testErr) + c.EXPECT().GetOutbox(ctx, req).Return(testOrderedCollectionUniqueElems, testErr) // Run p, err := a.GetOutbox(ctx, req) // Verify @@ -469,9 +469,6 @@ func TestInboxForwarding(t *testing.T) { db.EXPECT().Lock(ctx, mustParse(testToIRI2)), db.EXPECT().Get(ctx, mustParse(testToIRI2)).Return(testService, nil), db.EXPECT().Unlock(ctx, mustParse(testToIRI2)), - // Deferred - db.EXPECT().Unlock(ctx, mustParse(testToIRI2)), - db.EXPECT().Unlock(ctx, mustParse(testToIRI)), ) // Run err := a.InboxForwarding(ctx, mustParse(testMyInboxIRI), input) @@ -501,9 +498,6 @@ func TestInboxForwarding(t *testing.T) { db.EXPECT().Lock(ctx, mustParse(testCcIRI2)), db.EXPECT().Get(ctx, mustParse(testCcIRI2)).Return(testService, nil), db.EXPECT().Unlock(ctx, mustParse(testCcIRI2)), - // Deferred - db.EXPECT().Unlock(ctx, mustParse(testCcIRI2)), - db.EXPECT().Unlock(ctx, mustParse(testCcIRI)), ) // Run err := a.InboxForwarding(ctx, mustParse(testMyInboxIRI), input) @@ -533,9 +527,6 @@ func TestInboxForwarding(t *testing.T) { db.EXPECT().Lock(ctx, mustParse(testAudienceIRI2)), db.EXPECT().Get(ctx, mustParse(testAudienceIRI2)).Return(testService, nil), db.EXPECT().Unlock(ctx, mustParse(testAudienceIRI2)), - // Deferred - db.EXPECT().Unlock(ctx, mustParse(testAudienceIRI2)), - db.EXPECT().Unlock(ctx, mustParse(testAudienceIRI)), ) // Run err := a.InboxForwarding(ctx, mustParse(testMyInboxIRI), input) diff --git a/pub/social_protocol.go b/pub/social_protocol.go index f929a21..d87ca2a 100644 --- a/pub/social_protocol.go +++ b/pub/social_protocol.go @@ -2,7 +2,6 @@ package pub import ( "context" - "github.com/go-fed/activity/streams/vocab" "net/http" ) @@ -62,13 +61,4 @@ type SocialProtocol interface { // type and extension, so the unhandled ones are passed to // DefaultCallback. DefaultCallback(c context.Context, activity Activity) error - // GetOutbox returns the OrderedCollection inbox of the actor for this - // context. It is up to the implementation to provide the correct - // collection for the kind of authorization given in the request. - // - // AuthenticateGetOutbox will be called prior to this. - // - // Always called, regardless whether the Federated Protocol or Social - // API is enabled. - GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) } diff --git a/pub/transport.go b/pub/transport.go index 1353541..e58c68a 100644 --- a/pub/transport.go +++ b/pub/transport.go @@ -56,8 +56,8 @@ type HttpSigTransport struct { appAgent string gofedAgent string clock Clock - getSigner httpsig.Signer - postSigner httpsig.Signer + getSigner httpsig.Signer + postSigner httpsig.Signer pubKeyId string privKey crypto.PrivateKey } @@ -89,8 +89,8 @@ func NewHttpSigTransport( appAgent: appAgent, gofedAgent: goFedUserAgent(), clock: clock, - getSigner: getSigner, - postSigner: postSigner, + getSigner: getSigner, + postSigner: postSigner, pubKeyId: pubKeyId, privKey: privKey, } @@ -140,7 +140,7 @@ func (h HttpSigTransport) Deliver(c context.Context, b []byte, to *url.URL) erro sum := sha256.Sum256(b) req.Header.Add("Digest", fmt.Sprintf("SHA-256=%s", - base64.StdEncoding.EncodeToString(sum[:]))) + base64.StdEncoding.EncodeToString(sum[:]))) err = h.postSigner.SignRequest(h.privKey, h.pubKeyId, req) if err != nil { return err @@ -150,6 +150,7 @@ func (h HttpSigTransport) Deliver(c context.Context, b []byte, to *url.URL) erro return err } defer resp.Body.Close() + // TODO: Other 20X response codes could be OK, too. if resp.StatusCode != http.StatusOK { return fmt.Errorf("POST request to %s failed (%d): %s", to.String(), resp.StatusCode, resp.Status) } @@ -172,12 +173,13 @@ func (h HttpSigTransport) BatchDeliver(c context.Context, b []byte, recipients [ } wg.Wait() errs := make([]string, 0, len(recipients)) +outer: for { select { case e := <-errCh: errs = append(errs, e.Error()) default: - break + break outer } } if len(errs) > 0 { diff --git a/pub/util.go b/pub/util.go index b9a523b..e6c3f01 100644 --- a/pub/util.go +++ b/pub/util.go @@ -157,6 +157,7 @@ func serialize(a vocab.Type) (m map[string]interface{}, e error) { } contextValue = append(arr, aliases) } + // TODO: Update the context instead if it already exists m[jsonLDContext] = contextValue return }