From d3b0afef5e95a29b3d36019982b7868f2607e165 Mon Sep 17 00:00:00 2001 From: Cory Slep Date: Thu, 14 Feb 2019 21:51:57 +0100 Subject: [PATCH] Finish porting the core of pub. --- pub/activity.go | 3 + pub/base_actor.go | 5 +- pub/database.go | 7 + pub/delegate_actor.go | 8 +- pub/federating_wrapped_callbacks.go | 301 +++++------------- pub/property_interfaces.go | 5 + pub/side_effect_actor.go | 8 +- pub/social_wrapped_callbacks.go | 452 ++++++++++++++-------------- pub/transport.go | 1 + pub/util.go | 359 ++++++++++++++++------ 10 files changed, 589 insertions(+), 560 deletions(-) diff --git a/pub/activity.go b/pub/activity.go index a1fbecb..e96d6cc 100644 --- a/pub/activity.go +++ b/pub/activity.go @@ -31,6 +31,9 @@ type Activity interface { // GetActivityStreamsAttributedTo returns the "attributedTo" property if // it exists, and nil otherwise. GetActivityStreamsAttributedTo() vocab.ActivityStreamsAttributedToProperty + // GetActivityStreamsObject returns the "object" property if it exists, + // and nil otherwise. + GetActivityStreamsObject() vocab.ActivityStreamsObjectProperty // SetActivityStreamsActor sets the "actor" property. SetActivityStreamsActor(i vocab.ActivityStreamsActorProperty) // SetActivityStreamsObject sets the "object" property. diff --git a/pub/base_actor.go b/pub/base_actor.go index 8b939a9..63a7e9c 100644 --- a/pub/base_actor.go +++ b/pub/base_actor.go @@ -57,6 +57,7 @@ func NewSocialActor(c CommonBehavior, common: c, c2s: c2s, db: db, + clock: clock, }, enableSocialProtocol: true, clock: clock, @@ -84,6 +85,7 @@ func NewFederatingActor(c CommonBehavior, common: c, s2s: s2s, db: db, + clock: clock, }, enableFederatedProtocol: true, clock: clock, @@ -110,6 +112,7 @@ func NewActor(c CommonBehavior, c2s: c2s, s2s: s2s, db: db, + clock: clock, }, enableSocialProtocol: true, enableFederatedProtocol: true, @@ -325,7 +328,7 @@ func (b *baseActor) PostOutbox(c context.Context, w http.ResponseWriter, r *http } // Post the activity to the actor's outbox and trigger side effects for // that particular Activity type. - deliverable, err := b.delegate.PostOutbox(c, activity, r.URL) + deliverable, err := b.delegate.PostOutbox(c, activity, r.URL, m) if err != nil { // Special case: We know it is a bad request if the object or // target properties needed to be populated, but weren't. diff --git a/pub/database.go b/pub/database.go index 8242eee..7f46a6c 100644 --- a/pub/database.go +++ b/pub/database.go @@ -126,4 +126,11 @@ type Database interface { // // The library makes this call only after acquiring a lock first. Following(c context.Context, actorIRI *url.URL) (followers vocab.ActivityStreamsCollection, err error) + // Liked obtains the Liked Collection for an actor with the + // given id. + // + // If modified, the library will then call Update. + // + // The library makes this call only after acquiring a lock first. + Liked(c context.Context, actorIRI *url.URL) (followers vocab.ActivityStreamsCollection, err error) } diff --git a/pub/delegate_actor.go b/pub/delegate_actor.go index 90b185e..6b7f07c 100644 --- a/pub/delegate_actor.go +++ b/pub/delegate_actor.go @@ -120,7 +120,13 @@ type DelegateActor interface { // // If the error is ErrObjectRequired or ErrTargetRequired, then a Bad // Request status is sent in the response. - PostOutbox(c context.Context, a Activity, outboxIRI *url.URL) (deliverable bool, e error) + // + // Note that 'rawJSON' is an unfortunate consequence where an 'Update' + // Activity is the only one that explicitly cares about 'null' values in + // JSON. Since go-fed does not differentiate between 'null' values and + // values that are simply not present, the 'rawJSON' map is ONLY needed + // for this narrow and specific use case. + PostOutbox(c context.Context, a Activity, outboxIRI *url.URL, rawJSON map[string]interface{}) (deliverable bool, e error) // AddNewIds sets new URL ids on the activity. It also does so for all // 'object' properties if the Activity is a Create type. // diff --git a/pub/federating_wrapped_callbacks.go b/pub/federating_wrapped_callbacks.go index 0f3a39a..9d3b050 100644 --- a/pub/federating_wrapped_callbacks.go +++ b/pub/federating_wrapped_callbacks.go @@ -67,14 +67,16 @@ type FederatingWrappedCallbacks struct { // The wrapping function determines if this 'Accept' is in response to a // 'Follow'. If so, then the 'actor' is added to the original 'actor's // 'following' collection. + // + // Otherwise, no side effects are done by go-fed. Accept func(context.Context, vocab.ActivityStreamsAccept) error // Reject handles additional side effects for the Reject ActivityStreams // type, specific to the application using go-fed. // - // The wrapping function does nothing. However, if this 'Reject' is in - // response to a 'Follow' then the client MUST NOT go forward with - // adding the 'actor' to the original 'actor's 'following' collection by - // the client application. + // The wrapping function has no default side effects. However, if this + // 'Reject' is in response to a 'Follow' then the client MUST NOT go + // forward with adding the 'actor' to the original 'actor's 'following' + // collection by the client application. Reject func(context.Context, vocab.ActivityStreamsReject) error // Add handles additional side effects for the Add ActivityStreams // type, specific to the application using go-fed. @@ -109,14 +111,17 @@ type FederatingWrappedCallbacks struct { // is be the same as the 'actor' on all Activities being undone. // It enforces that the actors on the Undo must correspond to all of the // 'object' actors in some manner. + // + // It is expected that the application will implement the proper + // reversal of activities that are being undone. Undo func(context.Context, vocab.ActivityStreamsUndo) error // Block handles additional side effects for the Block ActivityStreams // type, specific to the application using go-fed. // - // The wrapping function does nothing. It simply calls this wrapped - // function. However, note that Blocks should not be received from a - // federated peer, as delivering Blocks explicitly deviates from the - // original ActivityPub specification. + // The wrapping function provides no default side effects. It simply + // calls the wrapped function. However, note that Blocks should not be + // received from a federated peer, as delivering Blocks explicitly + // deviates from the original ActivityPub specification. Block func(context.Context, vocab.ActivityStreamsBlock) error // Sidechannel data -- this is set at request handling time. These must @@ -195,11 +200,13 @@ func (w FederatingWrappedCallbacks) create(c context.Context, a vocab.ActivitySt if op == nil || op.Len() == 0 { return ErrObjectRequired } - for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + // Create anonymous loop function to be able to properly scope the defer + // for the database lock at each iteration. + loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error { t := iter.GetType() if t == nil { // TODO: Dereference the IRI and store it. - continue + return nil } id, err := GetId(t) if err != nil { @@ -209,14 +216,16 @@ func (w FederatingWrappedCallbacks) create(c context.Context, a vocab.ActivitySt if err != nil { return err } - // WARNING: Unlock is not deferred. + defer w.db.Unlock(c, id) if err := w.db.Create(c, t); err != nil { - w.db.Unlock(c, id) return err } - w.db.Unlock(c, id) - // At this point, Unlock should be called and in every above - // branch. + return nil + } + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + if err := loopFn(iter); err != nil { + return err + } } if w.Create != nil { return w.Create(c, a) @@ -233,7 +242,9 @@ func (w FederatingWrappedCallbacks) update(c context.Context, a vocab.ActivitySt if err := mustHaveActivityOriginMatchObjects(a); err != nil { return err } - for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + // Create anonymous loop function to be able to properly scope the defer + // for the database lock at each iteration. + loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error { t := iter.GetType() if t == nil { return fmt.Errorf("update requires an object to be wholly provided") @@ -246,14 +257,16 @@ func (w FederatingWrappedCallbacks) update(c context.Context, a vocab.ActivitySt if err != nil { return err } - // WARNING: Unlock is not deferred. + defer w.db.Unlock(c, id) if err := w.db.Update(c, t); err != nil { - w.db.Unlock(c, id) return err } - w.db.Unlock(c, id) - // At this point, Unlock should be called and in every above - // branch. + return nil + } + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + if err := loopFn(iter); err != nil { + return err + } } if w.Update != nil { return w.Update(c, a) @@ -270,7 +283,9 @@ func (w FederatingWrappedCallbacks) deleteFn(c context.Context, a vocab.Activity if err := mustHaveActivityOriginMatchObjects(a); err != nil { return err } - for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + // Create anonymous loop function to be able to properly scope the defer + // for the database lock at each iteration. + loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error { id, err := ToId(iter) if err != nil { return err @@ -279,14 +294,16 @@ func (w FederatingWrappedCallbacks) deleteFn(c context.Context, a vocab.Activity if err != nil { return err } - // WARNING: Unlock is not deferred. + defer w.db.Unlock(c, id) if err := w.db.Delete(c, id); err != nil { - w.db.Unlock(c, id) return err } - w.db.Unlock(c, id) - // At this point, Unlock should be called and in every above - // branch. + return nil + } + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + if err := loopFn(iter); err != nil { + return err + } } if w.Delete != nil { return w.Delete(c, a) @@ -516,75 +533,8 @@ func (w FederatingWrappedCallbacks) add(c context.Context, a vocab.ActivityStrea if target == nil || target.Len() == 0 { return ErrTargetRequired } - opIds := make([]*url.URL, 0, op.Len()) - for iter := op.Begin(); iter != op.End(); iter = iter.Next() { - id, err := ToId(iter) - if err != nil { - return err - } - opIds = append(opIds, id) - } - targetIds := make([]*url.URL, 0, op.Len()) - for iter := target.Begin(); iter != target.End(); iter = iter.Next() { - id, err := ToId(iter) - if err != nil { - return err - } - targetIds = append(targetIds, id) - } - for _, t := range targetIds { - if err := w.db.Lock(c, t); err != nil { - return err - } - // WARNING: Unlock not deferred. - if owns, err := w.db.Owns(c, t); err != nil { - w.db.Unlock(c, t) - return err - } else if !owns { - w.db.Unlock(c, t) - continue - } - tp, err := w.db.Get(c, t) - if err != nil { - w.db.Unlock(c, t) - return err - } - if streams.ActivityStreamsOrderedCollectionIsExtendedBy(tp) { - oi, ok := tp.(orderedItemser) - if !ok { - w.db.Unlock(c, t) - return fmt.Errorf("type extending from OrderedCollection cannot convert to orderedItemser interface") - } - oiProp := oi.GetActivityStreamsOrderedItems() - if oiProp == nil { - oiProp = streams.NewActivityStreamsOrderedItemsProperty() - oi.SetActivityStreamsOrderedItems(oiProp) - } - for _, objId := range opIds { - oiProp.AppendIRI(objId) - } - } else if streams.ActivityStreamsCollectionIsExtendedBy(tp) { - i, ok := tp.(itemser) - if !ok { - w.db.Unlock(c, t) - return fmt.Errorf("type extending from Collection cannot convert to itemser interface") - } - iProp := i.GetActivityStreamsItems() - if iProp == nil { - iProp = streams.NewActivityStreamsItemsProperty() - i.SetActivityStreamsItems(iProp) - } - for _, objId := range opIds { - iProp.AppendIRI(objId) - } - } - err = w.db.Update(c, tp) - if err != nil { - w.db.Unlock(c, t) - return err - } - w.db.Unlock(c, t) - // Unlock must be called by now and every branch above. + if err := add(c, op, target, w.db); err != nil { + return err } if w.Add != nil { return w.Add(c, a) @@ -602,89 +552,8 @@ func (w FederatingWrappedCallbacks) remove(c context.Context, a vocab.ActivitySt if target == nil || target.Len() == 0 { return ErrTargetRequired } - opIds := make(map[string]bool, op.Len()) - for iter := op.Begin(); iter != op.End(); iter = iter.Next() { - id, err := ToId(iter) - if err != nil { - return err - } - opIds[id.String()] = true - } - targetIds := make([]*url.URL, 0, op.Len()) - for iter := target.Begin(); iter != target.End(); iter = iter.Next() { - id, err := ToId(iter) - if err != nil { - return err - } - targetIds = append(targetIds, id) - } - for _, t := range targetIds { - if err := w.db.Lock(c, t); err != nil { - return err - } - // WARNING: Unlock not deferred. - if owns, err := w.db.Owns(c, t); err != nil { - w.db.Unlock(c, t) - return err - } else if !owns { - w.db.Unlock(c, t) - continue - } - tp, err := w.db.Get(c, t) - if err != nil { - w.db.Unlock(c, t) - return err - } - if streams.ActivityStreamsOrderedCollectionIsExtendedBy(tp) { - oi, ok := tp.(orderedItemser) - if !ok { - w.db.Unlock(c, t) - return fmt.Errorf("type extending from OrderedCollection cannot convert to orderedItemser interface") - } - oiProp := oi.GetActivityStreamsOrderedItems() - if oiProp != nil { - for i := 0; i < oiProp.Len(); /*Conditional*/ { - id, err := ToId(oiProp.At(i)) - if err != nil { - w.db.Unlock(c, t) - return err - } - if opIds[id.String()] { - oiProp.Remove(i) - } else { - i++ - } - } - } - } else if streams.ActivityStreamsCollectionIsExtendedBy(tp) { - i, ok := tp.(itemser) - if !ok { - w.db.Unlock(c, t) - return fmt.Errorf("type extending from Collection cannot convert to itemser interface") - } - iProp := i.GetActivityStreamsItems() - if iProp != nil { - for i := 0; i < iProp.Len(); /*Conditional*/ { - id, err := ToId(iProp.At(i)) - if err != nil { - w.db.Unlock(c, t) - return err - } - if opIds[id.String()] { - iProp.Remove(i) - } else { - i++ - } - } - } - } - err = w.db.Update(c, tp) - if err != nil { - w.db.Unlock(c, t) - return err - } - w.db.Unlock(c, t) - // Unlock must be called by now and every branch above. + if err := remove(c, op, target, w.db); err != nil { + return err } if w.Remove != nil { return w.Remove(c, a) @@ -702,7 +571,9 @@ func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStre if err != nil { return err } - for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + // Create anonymous loop function to be able to properly scope the defer + // for the database lock at each iteration. + loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error { objId, err := ToId(iter) if err != nil { return err @@ -710,22 +581,18 @@ func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStre if err := w.db.Lock(c, objId); err != nil { return err } - // WARNING: Unlock not deferred. + defer w.db.Unlock(c, objId) if owns, err := w.db.Owns(c, objId); err != nil { - w.db.Unlock(c, objId) return err } else if !owns { - w.db.Unlock(c, objId) - continue + return nil } t, err := w.db.Get(c, objId) if err != nil { - w.db.Unlock(c, objId) return err } l, ok := t.(likeser) if !ok { - w.db.Unlock(c, objId) return fmt.Errorf("cannot add Like to likes collection for type %T", t) } // Get 'likes' property on the object, creating default if @@ -759,16 +626,18 @@ func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStre } oItems.PrependIRI(id) } else { - w.db.Unlock(c, objId) return fmt.Errorf("likes type is neither a Collection nor an OrderedCollection: %T", likesT) } err = w.db.Update(c, t) if err != nil { - w.db.Unlock(c, objId) return err } - w.db.Unlock(c, objId) - // Unlock must be called by now and every branch above. + return nil + } + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + if err := loopFn(iter); err != nil { + return err + } } if w.Like != nil { return w.Like(c, a) @@ -783,7 +652,9 @@ func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.Activity return err } op := a.GetActivityStreamsObject() - for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + // Create anonymous loop function to be able to properly scope the defer + // for the database lock at each iteration. + loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error { objId, err := ToId(iter) if err != nil { return err @@ -791,22 +662,18 @@ func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.Activity if err := w.db.Lock(c, objId); err != nil { return err } - // WARNING: Unlock not deferred. + defer w.db.Unlock(c, objId) if owns, err := w.db.Owns(c, objId); err != nil { - w.db.Unlock(c, objId) return err } else if !owns { - w.db.Unlock(c, objId) - continue + return nil } t, err := w.db.Get(c, objId) if err != nil { - w.db.Unlock(c, objId) return err } s, ok := t.(shareser) if !ok { - w.db.Unlock(c, objId) return fmt.Errorf("cannot add Announce to Shares collection for type %T", t) } // Get 'shares' property on the object, creating default if @@ -840,16 +707,18 @@ func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.Activity } oItems.PrependIRI(id) } else { - w.db.Unlock(c, objId) return fmt.Errorf("shares type is neither a Collection nor an OrderedCollection: %T", sharesT) } err = w.db.Update(c, t) if err != nil { - w.db.Unlock(c, objId) return err } - w.db.Unlock(c, objId) - // Unlock must be called by now and every branch above. + return nil + } + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + if err := loopFn(iter); err != nil { + return err + } } if w.Announce != nil { return w.Announce(c, a) @@ -864,34 +733,8 @@ func (w FederatingWrappedCallbacks) undo(c context.Context, a vocab.ActivityStre return ErrObjectRequired } actors := a.GetActivityStreamsActor() - activityActorMap := make(map[string]bool, actors.Len()) - for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() { - id, err := ToId(iter) - if err != nil { - return err - } - activityActorMap[id.String()] = true - } - for iter := op.Begin(); iter != op.End(); iter = iter.Next() { - t := iter.GetType() - if t == nil { - // TODO: Fetch the IRI - continue - } - ac, ok := t.(actorer) - if !ok { - return fmt.Errorf("cannot undo an object with no 'actor' property") - } - objActors := ac.GetActivityStreamsActor() - for iter := objActors.Begin(); iter != objActors.End(); iter = iter.Next() { - id, err := ToId(iter) - if err != nil { - return err - } - if !activityActorMap[id.String()] { - return fmt.Errorf("activity Undoing another does not have all actors on original activities") - } - } + if err := mustHaveActivityActorsMatchObjectActors(actors, op); err != nil { + return err } if w.Undo != nil { return w.Undo(c, a) diff --git a/pub/property_interfaces.go b/pub/property_interfaces.go index 265d310..343bf9e 100644 --- a/pub/property_interfaces.go +++ b/pub/property_interfaces.go @@ -47,6 +47,11 @@ type publisheder interface { GetActivityStreamsPublished() vocab.ActivityStreamsPublishedProperty } +// updateder is an ActivityStreams type with a 'updateder' property +type updateder interface { + GetActivityStreamsUpdated() vocab.ActivityStreamsUpdatedProperty +} + // toer is an ActivityStreams type with a 'to' property type toer interface { GetActivityStreamsTo() vocab.ActivityStreamsToProperty diff --git a/pub/side_effect_actor.go b/pub/side_effect_actor.go index f85ce2a..b22523e 100644 --- a/pub/side_effect_actor.go +++ b/pub/side_effect_actor.go @@ -26,6 +26,7 @@ type sideEffectActor struct { s2s FederatingProtocol c2s SocialProtocol db Database + clock Clock } // AuthenticatePostInbox defers to the delegate to authenticate the request. @@ -274,14 +275,17 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, // // This implementation assumes all types are meant to be delivered except for // the ActivityStreams Block type. -func (a *sideEffectActor) PostOutbox(c context.Context, activity Activity, outboxIRI *url.URL) (deliverable bool, e error) { +func (a *sideEffectActor) PostOutbox(c context.Context, activity Activity, outboxIRI *url.URL, rawJSON map[string]interface{}) (deliverable bool, e error) { wrapped, other := a.c2s.Callbacks(c) // Populate side channels. wrapped.db = a.db + wrapped.outboxIRI = outboxIRI + wrapped.rawActivity = rawJSON + wrapped.clock = a.clock + wrapped.deliverable = &deliverable if e = wrapped.disjoint(other); e != nil { return } - // TODO: populate deliverable res, err := streams.NewTypeResolver(append(wrapped.callbacks(), other...)) if err != nil { return diff --git a/pub/social_wrapped_callbacks.go b/pub/social_wrapped_callbacks.go index a436caa..642afb5 100644 --- a/pub/social_wrapped_callbacks.go +++ b/pub/social_wrapped_callbacks.go @@ -16,24 +16,28 @@ type SocialWrappedCallbacks struct { // Create handles additional side effects for the Create ActivityStreams // type. // - // The wrapping callback for the Social Protocol copies the actor(s) to - // the 'attributedTo' property, copying recipients between the Create - // activity and all objects. It then saves the entry in the database. + // The wrapping callback copies the actor(s) to the 'attributedTo' + // property and copies recipients between the Create activity and all + // objects. It then saves the entry in the database. Create func(context.Context, vocab.ActivityStreamsCreate) error // Update handles additional side effects for the Update ActivityStreams // type. // - // TODO: Describe + // The wrapping callback applies new top-level values on an object to + // the stored objects. Any top-level null literals will be deleted on + // the stored objects as well. Update func(context.Context, vocab.ActivityStreamsUpdate) error // Delete handles additional side effects for the Delete ActivityStreams // type. // - // TODO: Describe + // The wrapping callback replaces the object(s) with tombstones in the + // database. Delete func(context.Context, vocab.ActivityStreamsDelete) error // Follow handles additional side effects for the Follow ActivityStreams // type. // - // TODO: Describe + // The wrapping callback only ensures the 'Follow' has at least one + // 'object' entry, but otherwise has no default side effect. Follow func(context.Context, vocab.ActivityStreamsFollow) error // Add handles additional side effects for the Add ActivityStreams // type. @@ -58,7 +62,13 @@ type SocialWrappedCallbacks struct { // Block handles additional side effects for the Block ActivityStreams // type. // - // TODO: Describe + // The wrapping callback only ensures the 'Block' has at least one + // 'object' entry, but otherwise has no default side effect. It is up + // to the wrapped application function to properly enforce the new + // blocking behavior. + // + // Note that go-fed does not federate 'Block' activities received in the + // Social Protocol. Block func(context.Context, vocab.ActivityStreamsBlock) error // Sidechannel data -- this is set at request handling time. These must @@ -67,9 +77,16 @@ type SocialWrappedCallbacks struct { // db is the Database the SocialWrappedCallbacks should use. It must be // set before calling the callbacks. db Database + // outboxIRI is the outboxIRI that is handling this callback. + outboxIRI *url.URL + // rawActivity is the JSON map literal received when deserializing the + // request body. + rawActivity map[string]interface{} + // clock is the server's clock. + clock Clock // deliverable is a sidechannel out, indicating if the handled activity // should be delivered to a peer. - deliverable bool + deliverable *bool } // disjoint ensures that the functions given do not share a type signature with @@ -123,7 +140,7 @@ func (w SocialWrappedCallbacks) callbacks() []interface{} { // create implements the social Create activity side effects. func (w SocialWrappedCallbacks) create(c context.Context, a vocab.ActivityStreamsCreate) error { - w.deliverable = true + *w.deliverable = true op := a.GetActivityStreamsObject() if op == nil || op.Len() == 0 { return ErrObjectRequired @@ -189,12 +206,28 @@ func (w SocialWrappedCallbacks) create(c context.Context, a vocab.ActivityStream if err := normalizeRecipients(a); err != nil { return err } + // Create anonymous loop function to be able to properly scope the defer + // for the database lock at each iteration. + loopFn := func(i int) error { + obj := op.At(i).GetType() + id, err := GetId(obj) + if err != nil { + return err + } + err = w.db.Lock(c, id) + if err != nil { + return err + } + defer w.db.Unlock(c, id) + if err := w.db.Create(c, obj); err != nil { + return err + } + return nil + } // Persist all objects we've created, which will include sensitive // recipients such as 'bcc' and 'bto'. for i := 0; i < op.Len(); i++ { - obj := op.At(i).GetType() - // TODO: Lock - if err := w.db.Create(c, obj); err != nil { + if err := loopFn(i); err != nil { return err } } @@ -206,296 +239,249 @@ func (w SocialWrappedCallbacks) create(c context.Context, a vocab.ActivityStream // update implements the social Update activity side effects. func (w SocialWrappedCallbacks) update(c context.Context, a vocab.ActivityStreamsUpdate) error { - *deliverable = true - if s.LenObject() == 0 { - return errObjectRequired + *w.deliverable = true + op := a.GetActivityStreamsObject() + if op == nil || op.Len() == 0 { + return ErrObjectRequired } - // Update should partially replace the 'object' with only the - // changed top-level fields. - ids, err := getObjectIds(s.Raw()) - if err != nil { - return err - } else if len(ids) == 0 { - return fmt.Errorf("update has no id: %v", s) + // Obtain all object ids, which should be owned by this server. + objIds := make([]*url.URL, 0, op.Len()) + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + id, err := ToId(iter) + if err != nil { + return err + } + objIds = append(objIds, id) } - for idx, id := range ids { - pObj, err := f.App.Get(c, id, ReadWrite) + // Create anonymous loop function to be able to properly scope the defer + // for the database lock at each iteration. + loopFn := func(idx int, loopId *url.URL) error { + err := w.db.Lock(c, loopId) if err != nil { return err } - m, err := pObj.Serialize() + defer w.db.Unlock(c, loopId) + t, err := w.db.Get(c, loopId) if err != nil { return err } - if !s.Raw().IsObject(idx) { - return fmt.Errorf("update requires object to be wholly provided at index %d", idx) - } - updated, err := s.Raw().GetObject(idx).Serialize() + m, err := t.Serialize() if err != nil { return err } - for k, v := range updated { + // Copy over new top-level values. + objType := op.At(idx).GetType() + if objType == nil { + return fmt.Errorf("object at index %d is not a literal type value", idx) + } + newM, err := objType.Serialize() + if err != nil { + return err + } + for k, v := range newM { m[k] = v } - if rawUpdatedObject := getRawObject(rawJson, id.String()); rawUpdatedObject != nil { - recursivelyApplyDeletes(m, rawUpdatedObject) + // Delete top-level values where the raw Activity had nils. + for k, v := range w.rawActivity { + if _, ok := m[k]; v == nil && ok { + delete(m, k) + } } - p, err := ToPubObject(m) + newT, err := toType(c, m) if err != nil { return err } - for _, elem := range p { - if err := f.App.Set(c, elem); err != nil { - return err - } + if err = w.db.Update(c, newT); err != nil { + return err + } + return nil + } + for i, id := range objIds { + if err := loopFn(i, id); err != nil { + return err } } - return f.ClientCallbacker.Update(c, s) + if w.Update != nil { + return w.Update(c, a) + } + return nil } // deleteFn implements the social Delete activity side effects. func (w SocialWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStreamsDelete) error { - *deliverable = true - if s.LenObject() == 0 { - return errObjectRequired + *w.deliverable = true + op := a.GetActivityStreamsObject() + if op == nil || op.Len() == 0 { + return ErrObjectRequired } - ids, err := getObjectIds(s.Raw()) - if err != nil { - return err - } else if len(ids) == 0 { - return fmt.Errorf("delete has no id: %v", s) - } - for _, id := range ids { - pObj, err := f.App.Get(c, id, ReadWrite) + // Obtain all object ids, which should be owned by this server. + objIds := make([]*url.URL, 0, op.Len()) + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + id, err := ToId(iter) if err != nil { return err } - obj, ok := pObj.(vocab.ObjectType) - if !ok { - return fmt.Errorf("cannot delete non-ObjectType: %T", pObj) + objIds = append(objIds, id) + } + // Create anonymous loop function to be able to properly scope the defer + // for the database lock at each iteration. + loopFn := func(idx int, loopId *url.URL) error { + err := w.db.Lock(c, loopId) + if err != nil { + return err } - tomb := toTombstone(obj, id, f.Clock.Now()) - if err := f.App.Set(c, tomb); err != nil { + defer w.db.Unlock(c, loopId) + t, err := w.db.Get(c, loopId) + if err != nil { + return err + } + tomb := toTombstone(t, loopId, w.clock.Now()) + if err := w.db.Update(c, tomb); err != nil { + return err + } + return nil + } + for i, id := range objIds { + if err := loopFn(i, id); err != nil { return err } } - return f.ClientCallbacker.Delete(c, s) + if w.Delete != nil { + return w.Delete(c, a) + } + return nil } // follow implements the social Follow activity side effects. func (w SocialWrappedCallbacks) follow(c context.Context, a vocab.ActivityStreamsFollow) error { - *deliverable = true - if s.LenObject() == 0 { - return errObjectRequired + *w.deliverable = true + op := a.GetActivityStreamsObject() + if op == nil || op.Len() == 0 { + return ErrObjectRequired } - return f.ClientCallbacker.Follow(c, s) + if w.Follow != nil { + return w.Follow(c, a) + } + return nil } // add implements the social Add activity side effects. func (w SocialWrappedCallbacks) add(c context.Context, a vocab.ActivityStreamsAdd) error { - // TODO: Dedupe with FederatingWrappedCallbacks - *deliverable = true - if s.LenObject() == 0 { - return errObjectRequired - } else if s.LenTarget() == 0 { - return errTargetRequired + *w.deliverable = true + op := a.GetActivityStreamsObject() + if op == nil || op.Len() == 0 { + return ErrObjectRequired } - raw := s.Raw() - ids, err := getTargetIds(raw) - if err != nil { + target := a.GetActivityStreamsTarget() + if target == nil || target.Len() == 0 { + return ErrTargetRequired + } + if err := add(c, op, target, w.db); err != nil { return err - } else if len(ids) == 0 { - return fmt.Errorf("add target has no ids: %v", s) } - objIds, err := getObjectIds(s.Raw()) - if err != nil { - return err - } else if len(objIds) == 0 { - return fmt.Errorf("add object has no ids: %v", s) + if w.Add != nil { + return w.Add(c, a) } - var targets []vocab.ObjectType - for _, id := range ids { - if !f.App.Owns(c, id) { - continue - } - target, err := f.App.Get(c, id, ReadWrite) - if err != nil { - return err - } - ct, okCollection := target.(vocab.CollectionType) - oct, okOrdered := target.(vocab.OrderedCollectionType) - if !okCollection && !okOrdered { - return fmt.Errorf("cannot add to type that is not Collection and not OrderedCollection: %v", target) - } else if okCollection { - targets = append(targets, ct) - } else { - targets = append(targets, oct) - } - } - for i := 0; i < raw.ObjectLen(); i++ { - var obj vocab.ObjectType - var objId *url.URL - if raw.IsObjectIRI(i) { - objId = raw.GetObjectIRI(i) - if f.App.Owns(c, objId) { - pObj, err := f.App.Get(c, objId, Read) - var ok bool - if obj, ok = pObj.(vocab.ObjectType); !ok { - return fmt.Errorf("add object must be an activitypub object: %v", raw) - } - if err != nil { - return err - } - } else { - obj, err = f.dereferenceAsUser(outboxURL, objId) - if err != nil { - return err - } - } - } else if raw.IsObject(i) { - obj = raw.GetObject(i) - if !obj.HasId() { - return fmt.Errorf("add object missing iri") - } - objId = obj.GetId() - } else { - return fmt.Errorf("add object must be of object or iri type: %v", raw) - } - for _, target := range targets { - if !f.App.CanAdd(c, obj, target) { - continue - } - if ct, ok := target.(vocab.CollectionType); ok { - ct.AppendItemsIRI(objId) - } else if oct, ok := target.(vocab.OrderedCollectionType); ok { - oct.AppendOrderedItemsIRI(objId) - } - if err := f.App.Set(c, target); err != nil { - return err - } - } - } - return f.ClientCallbacker.Add(c, s) + return nil } // remove implements the social Remove activity side effects. func (w SocialWrappedCallbacks) remove(c context.Context, a vocab.ActivityStreamsRemove) error { - // TODO: Dedupe with FederatingWrappedCallbacks - *deliverable = true - if s.LenObject() == 0 { - return errObjectRequired - } else if s.LenTarget() == 0 { - return errTargetRequired + *w.deliverable = true + op := a.GetActivityStreamsObject() + if op == nil || op.Len() == 0 { + return ErrObjectRequired } - raw := s.Raw() - ids, err := getTargetIds(raw) - if err != nil { + target := a.GetActivityStreamsTarget() + if target == nil || target.Len() == 0 { + return ErrTargetRequired + } + if err := remove(c, op, target, w.db); err != nil { return err - } else if len(ids) == 0 { - return fmt.Errorf("remove target has no ids: %v", s) } - objIds, err := getObjectIds(s.Raw()) - if err != nil { - return err - } else if len(objIds) == 0 { - return fmt.Errorf("remove object has no ids: %v", s) + if w.Remove != nil { + return w.Remove(c, a) } - var targets []vocab.ObjectType - for _, id := range ids { - if !f.App.Owns(c, id) { - continue - } - target, err := f.App.Get(c, id, ReadWrite) - if err != nil { - return err - } - ct, okCollection := target.(vocab.CollectionType) - oct, okOrdered := target.(vocab.OrderedCollectionType) - if !okCollection && !okOrdered { - return fmt.Errorf("cannot remove from type that is not Collection and not OrderedCollection: %v", target) - } else if okCollection { - targets = append(targets, ct) - } else { - targets = append(targets, oct) - } - } - for i := 0; i < raw.ObjectLen(); i++ { - if !raw.IsObject(i) { - // TODO: Fetch IRIs as well - return fmt.Errorf("remove object must be object type: %v", raw) - } - obj := raw.GetObject(i) - for _, target := range targets { - if !f.App.CanRemove(c, obj, target) { - continue - } - if ct, ok := target.(vocab.CollectionType); ok { - removeCollectionItemWithId(ct, obj.GetId()) - } else if oct, ok := target.(vocab.OrderedCollectionType); ok { - removeOrderedCollectionItemWithId(oct, obj.GetId()) - } - if err := f.App.Set(c, target); err != nil { - return err - } - } - } - return f.ClientCallbacker.Remove(c, s) + return nil } // like implements the social Like activity side effects. func (w SocialWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsLike) error { - *deliverable = true - if s.LenObject() == 0 { - return errObjectRequired + *w.deliverable = true + op := a.GetActivityStreamsObject() + if op == nil || op.Len() == 0 { + return ErrObjectRequired } - getter := func(actor vocab.ObjectType, lc *vocab.CollectionType, loc *vocab.OrderedCollectionType) (bool, error) { - if actor.IsLikedAnyURI() { - pObj, err := f.App.Get(ctx, actor.GetLikedAnyURI(), ReadWrite) - if err != nil { - return true, err - } - ok := false - if *lc, ok = pObj.(vocab.CollectionType); !ok { - if *loc, ok = pObj.(vocab.OrderedCollectionType); !ok { - return true, fmt.Errorf("actors liked collection not CollectionType nor OrderedCollectionType") - } - } - return true, nil - } else if actor.IsLikedCollection() { - *lc = actor.GetLikedCollection() - return false, nil - } else if actor.IsLikedOrderedCollection() { - *loc = actor.GetLikedOrderedCollection() - return false, nil - } - *loc = &vocab.OrderedCollection{} - actor.SetLikedOrderedCollection(*loc) - return false, nil - } - if err := f.addAllObjectsToActorCollection(ctx, getter, s.Raw(), true); err != nil { + // Get this actor's IRI. + if err := w.db.Lock(c, w.outboxIRI); err != nil { return err } - return f.ClientCallbacker.Like(ctx, s) + // WARNING: Unlock not deferred. + actorIRI, err := w.db.ActorForInbox(c, w.outboxIRI) + if err != nil { + w.db.Unlock(c, w.outboxIRI) + return err + } + w.db.Unlock(c, w.outboxIRI) + // Unlock must be called by now and every branch above. + // + // Now obtain this actor's 'liked' collection. + if err := w.db.Lock(c, actorIRI); err != nil { + return err + } + defer w.db.Unlock(c, actorIRI) + liked, err := w.db.Liked(c, actorIRI) + if err != nil { + return err + } + likedItems := liked.GetActivityStreamsItems() + if likedItems == nil { + likedItems = streams.NewActivityStreamsItemsProperty() + liked.SetActivityStreamsItems(likedItems) + } + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + objId, err := ToId(iter) + if err != nil { + return err + } + likedItems.PrependIRI(objId) + } + err = w.db.Update(c, liked) + if err != nil { + return err + } + if w.Like != nil { + return w.Like(c, a) + } + return nil } // undo implements the social Undo activity side effects. func (w SocialWrappedCallbacks) undo(c context.Context, a vocab.ActivityStreamsUndo) error { - *deliverable = true - if s.LenObject() == 0 { - return errObjectRequired + *w.deliverable = true + op := a.GetActivityStreamsObject() + if op == nil || op.Len() == 0 { + return ErrObjectRequired } - raw := s.Raw() - if err := f.ensureActivityActorsMatchObjectActors(raw); err != nil { + actors := a.GetActivityStreamsActor() + if err := mustHaveActivityActorsMatchObjectActors(actors, op); err != nil { return err } - return f.ClientCallbacker.Undo(c, s) + if w.Undo != nil { + return w.Undo(c, a) + } + return nil } // block implements the social Block activity side effects. func (w SocialWrappedCallbacks) block(c context.Context, a vocab.ActivityStreamsBlock) error { - *deliverable = false - if s.LenObject() == 0 { - return errObjectRequired + *w.deliverable = false + op := a.GetActivityStreamsObject() + if op == nil || op.Len() == 0 { + return ErrObjectRequired } - return f.ClientCallbacker.Block(c, s) + if w.Block != nil { + return w.Block(c, a) + } + return nil } diff --git a/pub/transport.go b/pub/transport.go index bc5f7e2..0eace82 100644 --- a/pub/transport.go +++ b/pub/transport.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "net/http" "net/url" + "strings" "sync" ) diff --git a/pub/util.go b/pub/util.go index 8938386..4648352 100644 --- a/pub/util.go +++ b/pub/util.go @@ -12,6 +12,7 @@ import ( "net/http" "net/url" "strings" + "time" ) var ( @@ -452,89 +453,22 @@ func stripHiddenRecipients(activity Activity) { // mustHaveActivityOriginMatchObjects ensures that the Host in the activity id // IRI matches all of the Hosts in the object id IRIs. func mustHaveActivityOriginMatchObjects(a Activity) error { - if !a.HasId() { - return fmt.Errorf("activity has no iri") + originIRI, err := GetId(a) + if err != nil { + return err } - originIRI := a.GetId() originHost := originIRI.Host - for i := 0; i < a.ObjectLen(); i++ { - if a.IsObject(i) { - obj := a.GetObject(i) - if !obj.HasId() { - return fmt.Errorf("object at index %d has no id", i) - } - iri := obj.GetId() - if originHost != iri.Host { - return fmt.Errorf("object %q: not in activity origin", iri) - } - } else if a.IsObjectIRI(i) { - iri := a.GetObjectIRI(i) - if originHost != iri.Host { - return fmt.Errorf("object %q: not in activity origin", iri) - } - } + op := a.GetActivityStreamsObject() + if op == nil || op.Len() == 0 { + return nil } - return nil -} - -// mustHaveActivityActorsMatchObjectActors ensures that the set of actors on all -// objects in an activity are a subset of actor(s) on the activity. -func mustHaveActivityActorsMatchObjectActors(a vocab.ActivityType) error { - actorSet := make(map[string]bool, a.ActorLen()) - for i := 0; i < a.ActorLen(); i++ { - if a.IsActorObject(i) { - obj := a.GetActorObject(i) - if !obj.HasId() { - return fmt.Errorf("actor object at index %d has no id", i) - } - actorSet[obj.GetId().String()] = true - } else if a.IsActorLink(i) { - l := a.GetActorLink(i) - if !l.HasHref() { - return fmt.Errorf("actor link at index %d has no href", i) - } - actorSet[l.GetHref().String()] = true - } else if a.IsActorIRI(i) { - actorSet[a.GetActorIRI(i).String()] = true + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + iri, err := ToId(iter) + if err != nil { + return err } - } - objectActors := make(map[string]bool, a.ObjectLen()) - for i := 0; i < a.ObjectLen(); i++ { - if a.IsObject(i) { - obj := a.GetObject(i) - if !obj.HasId() { - return fmt.Errorf("object at index %d has no id", i) - } - objectActivity, ok := obj.(vocab.ActivityType) - if !ok { - return fmt.Errorf("object at index %d is not an activity", i) - } - for j := 0; j < objectActivity.ActorLen(); j++ { - if objectActivity.IsActorObject(j) { - obj := objectActivity.GetActorObject(j) - if !obj.HasId() { - return fmt.Errorf("actor object at index (%d,%d) has no id", i, j) - } - objectActors[obj.GetId().String()] = true - } else if objectActivity.IsActorLink(j) { - l := objectActivity.GetActorLink(j) - if !l.HasHref() { - return fmt.Errorf("actor link at index (%d,%d) has no href", i, j) - } - objectActors[l.GetHref().String()] = true - } else if objectActivity.IsActorIRI(j) { - objectActors[objectActivity.GetActorIRI(j).String()] = true - } - } - } else if a.IsObjectIRI(i) { - // TODO: Dereference IRI - iri := a.GetObjectIRI(i) - return fmt.Errorf("unimplemented: fetching IRI for UNDO verification of ownership of %q", iri) - } - } - for k := range objectActors { - if !actorSet[k] { - return fmt.Errorf("at least 1 activity actors missing: %q", k) + if originHost != iri.Host { + return fmt.Errorf("object %q: not in activity origin", iri) } } return nil @@ -548,7 +482,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error { // Phase 0: Acquire all recipients on the activity. // // Obtain the actorTo map - actorToMap := make(map[string]interface{}) + actorToMap := make(map[string]*url.URL) actorTo := a.GetActivityStreamsTo() if actorTo == nil { actorTo = streams.NewActivityStreamsToProperty() @@ -562,7 +496,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error { actorToMap[id.String()] = id } // Obtain the actorBto map - actorBtoMap := make(map[string]interface{}) + actorBtoMap := make(map[string]*url.URL) actorBto := a.GetActivityStreamsBto() if actorBto == nil { actorBto = streams.NewActivityStreamsBtoProperty() @@ -576,7 +510,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error { actorBtoMap[id.String()] = id } // Obtain the actorCc map - actorCcMap := make(map[string]interface{}) + actorCcMap := make(map[string]*url.URL) actorCc := a.GetActivityStreamsCc() if actorCc == nil { actorCc = streams.NewActivityStreamsCcProperty() @@ -590,7 +524,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error { actorCcMap[id.String()] = id } // Obtain the actorBcc map - actorBccMap := make(map[string]interface{}) + actorBccMap := make(map[string]*url.URL) actorBcc := a.GetActivityStreamsBcc() if actorBcc == nil { actorBcc = streams.NewActivityStreamsBccProperty() @@ -604,7 +538,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error { actorBccMap[id.String()] = id } // Obtain the actorAudience map - actorAudienceMap := make(map[string]interface{}) + actorAudienceMap := make(map[string]*url.URL) actorAudience := a.GetActivityStreamsAudience() if actorAudience == nil { actorAudience = streams.NewActivityStreamsAudienceProperty() @@ -619,16 +553,16 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error { } // Obtain the objects maps for each recipient type. o := a.GetActivityStreamsObject() - objsTo := make([]map[string]interface{}, o.Len()) - objsBto := make([]map[string]interface{}, o.Len()) - objsCco := make([]map[string]interface{}, o.Len()) - objsBcc := make([]map[string]interface{}, o.Len()) - objsAudience := make([]map[string]interface{}, o.Len()) + objsTo := make([]map[string]*url.URL, o.Len()) + objsBto := make([]map[string]*url.URL, o.Len()) + objsCc := make([]map[string]*url.URL, o.Len()) + objsBcc := make([]map[string]*url.URL, o.Len()) + objsAudience := make([]map[string]*url.URL, o.Len()) for i := 0; i < o.Len(); i++ { // Phase 1: Acquire all existing recipients on the object. // // Object to - objsTo[i] = make(map[string]interface{}) + objsTo[i] = make(map[string]*url.URL) var oTo vocab.ActivityStreamsToProperty if tr, ok := o.At(i).(toer); !ok { return fmt.Errorf("the Create object at %d has no 'to' property", i) @@ -647,7 +581,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error { objsTo[i][id.String()] = id } // Object bto - objsBto[i] = make(map[string]interface{}) + objsBto[i] = make(map[string]*url.URL) var oBto vocab.ActivityStreamsBtoProperty if tr, ok := o.At(i).(btoer); !ok { return fmt.Errorf("the Create object at %d has no 'bto' property", i) @@ -666,7 +600,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error { objsBto[i][id.String()] = id } // Object cc - objsCc[i] = make(map[string]interface{}) + objsCc[i] = make(map[string]*url.URL) var oCc vocab.ActivityStreamsCcProperty if tr, ok := o.At(i).(ccer); !ok { return fmt.Errorf("the Create object at %d has no 'cc' property", i) @@ -685,7 +619,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error { objsCc[i][id.String()] = id } // Object bcc - objsBcc[i] = make(map[string]interface{}) + objsBcc[i] = make(map[string]*url.URL) var oBcc vocab.ActivityStreamsBccProperty if tr, ok := o.At(i).(bccer); !ok { return fmt.Errorf("the Create object at %d has no 'bcc' property", i) @@ -704,7 +638,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error { objsBcc[i][id.String()] = id } // Object audience - objsAudience[i] = make(map[string]interface{}) + objsAudience[i] = make(map[string]*url.URL) var oAudience vocab.ActivityStreamsAudienceProperty if tr, ok := o.At(i).(audiencer); !ok { return fmt.Errorf("the Create object at %d has no 'audience' property", i) @@ -800,3 +734,240 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error { } return nil } + +// toTombstone creates a Tombstone object for the given ActivityStreams value. +func toTombstone(obj vocab.Type, id *url.URL, now time.Time) vocab.ActivityStreamsTombstone { + tomb := streams.NewActivityStreamsTombstone() + // id property + idProp := streams.NewActivityStreamsIdProperty() + idProp.Set(id) + tomb.SetActivityStreamsId(idProp) + // formerType property + former := streams.NewActivityStreamsFormerTypeProperty() + tomb.SetActivityStreamsFormerType(former) + // Populate Former Type + former.AppendXMLSchemaString(obj.GetName()) + // Copy over the published property if it existed + if pubber, ok := obj.(publisheder); ok { + if pub := pubber.GetActivityStreamsPublished(); pub != nil { + tomb.SetActivityStreamsPublished(pub) + } + } + // Copy over the updated property if it existed + if upder, ok := obj.(updateder); ok { + if upd := upder.GetActivityStreamsUpdated(); upd != nil { + tomb.SetActivityStreamsUpdated(upd) + } + } + // Set deleted time to now. + deleted := streams.NewActivityStreamsDeletedProperty() + deleted.Set(now) + tomb.SetActivityStreamsDeleted(deleted) + return tomb +} + +// mustHaveActivityActorsMatchObjectActors ensures that the actors on types in +// the 'object' property are all listed in the 'actor' property. +func mustHaveActivityActorsMatchObjectActors(actors vocab.ActivityStreamsActorProperty, + op vocab.ActivityStreamsObjectProperty) error { + activityActorMap := make(map[string]bool, actors.Len()) + for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() { + id, err := ToId(iter) + if err != nil { + return err + } + activityActorMap[id.String()] = true + } + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + t := iter.GetType() + if t == nil { + // TODO: Fetch the IRI + continue + } + ac, ok := t.(actorer) + if !ok { + return fmt.Errorf("cannot verify actors: object value has no 'actor' property") + } + objActors := ac.GetActivityStreamsActor() + for iter := objActors.Begin(); iter != objActors.End(); iter = iter.Next() { + id, err := ToId(iter) + if err != nil { + return err + } + if !activityActorMap[id.String()] { + return fmt.Errorf("activity does not have all actors from its object's actors") + } + } + } + return nil +} + +// add implements the logic of adding object ids to a target Collection or +// OrderedCollection. This logic is shared by both the C2S and S2S protocols. +func add(c context.Context, + op vocab.ActivityStreamsObjectProperty, + target vocab.ActivityStreamsTargetProperty, + db Database) error { + opIds := make([]*url.URL, 0, op.Len()) + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + id, err := ToId(iter) + if err != nil { + return err + } + opIds = append(opIds, id) + } + targetIds := make([]*url.URL, 0, op.Len()) + for iter := target.Begin(); iter != target.End(); iter = iter.Next() { + id, err := ToId(iter) + if err != nil { + return err + } + targetIds = append(targetIds, id) + } + // Create anonymous loop function to be able to properly scope the defer + // for the database lock at each iteration. + loopFn := func(t *url.URL) error { + if err := db.Lock(c, t); err != nil { + return err + } + defer db.Unlock(c, t) + if owns, err := db.Owns(c, t); err != nil { + return err + } else if !owns { + return nil + } + tp, err := db.Get(c, t) + if err != nil { + return err + } + if streams.ActivityStreamsOrderedCollectionIsExtendedBy(tp) { + oi, ok := tp.(orderedItemser) + if !ok { + return fmt.Errorf("type extending from OrderedCollection cannot convert to orderedItemser interface") + } + oiProp := oi.GetActivityStreamsOrderedItems() + if oiProp == nil { + oiProp = streams.NewActivityStreamsOrderedItemsProperty() + oi.SetActivityStreamsOrderedItems(oiProp) + } + for _, objId := range opIds { + oiProp.AppendIRI(objId) + } + } else if streams.ActivityStreamsCollectionIsExtendedBy(tp) { + i, ok := tp.(itemser) + if !ok { + return fmt.Errorf("type extending from Collection cannot convert to itemser interface") + } + iProp := i.GetActivityStreamsItems() + if iProp == nil { + iProp = streams.NewActivityStreamsItemsProperty() + i.SetActivityStreamsItems(iProp) + } + for _, objId := range opIds { + iProp.AppendIRI(objId) + } + } + err = db.Update(c, tp) + if err != nil { + return err + } + return nil + } + for _, t := range targetIds { + if err := loopFn(t); err != nil { + return err + } + } + return nil +} + +// remove implements the logic of removing object ids to a target Collection or +// OrderedCollection. This logic is shared by both the C2S and S2S protocols. +func remove(c context.Context, + op vocab.ActivityStreamsObjectProperty, + target vocab.ActivityStreamsTargetProperty, + db Database) error { + opIds := make(map[string]bool, op.Len()) + for iter := op.Begin(); iter != op.End(); iter = iter.Next() { + id, err := ToId(iter) + if err != nil { + return err + } + opIds[id.String()] = true + } + targetIds := make([]*url.URL, 0, op.Len()) + for iter := target.Begin(); iter != target.End(); iter = iter.Next() { + id, err := ToId(iter) + if err != nil { + return err + } + targetIds = append(targetIds, id) + } + // Create anonymous loop function to be able to properly scope the defer + // for the database lock at each iteration. + loopFn := func(t *url.URL) error { + if err := db.Lock(c, t); err != nil { + return err + } + defer db.Unlock(c, t) + if owns, err := db.Owns(c, t); err != nil { + return err + } else if !owns { + return nil + } + tp, err := db.Get(c, t) + if err != nil { + return err + } + if streams.ActivityStreamsOrderedCollectionIsExtendedBy(tp) { + oi, ok := tp.(orderedItemser) + if !ok { + return fmt.Errorf("type extending from OrderedCollection cannot convert to orderedItemser interface") + } + oiProp := oi.GetActivityStreamsOrderedItems() + if oiProp != nil { + for i := 0; i < oiProp.Len(); /*Conditional*/ { + id, err := ToId(oiProp.At(i)) + if err != nil { + return err + } + if opIds[id.String()] { + oiProp.Remove(i) + } else { + i++ + } + } + } + } else if streams.ActivityStreamsCollectionIsExtendedBy(tp) { + i, ok := tp.(itemser) + if !ok { + return fmt.Errorf("type extending from Collection cannot convert to itemser interface") + } + iProp := i.GetActivityStreamsItems() + if iProp != nil { + for i := 0; i < iProp.Len(); /*Conditional*/ { + id, err := ToId(iProp.At(i)) + if err != nil { + return err + } + if opIds[id.String()] { + iProp.Remove(i) + } else { + i++ + } + } + } + } + err = db.Update(c, tp) + if err != nil { + return err + } + return nil + } + for _, t := range targetIds { + if err := loopFn(t); err != nil { + return err + } + } + return nil +}