Finish porting the core of pub.

このコミットが含まれているのは:
Cory Slep 2019-02-14 21:51:57 +01:00
コミット d3b0afef5e
10個のファイルの変更589行の追加560行の削除

ファイルの表示

@ -31,6 +31,9 @@ type Activity interface {
// GetActivityStreamsAttributedTo returns the "attributedTo" property if
// it exists, and nil otherwise.
GetActivityStreamsAttributedTo() vocab.ActivityStreamsAttributedToProperty
// GetActivityStreamsObject returns the "object" property if it exists,
// and nil otherwise.
GetActivityStreamsObject() vocab.ActivityStreamsObjectProperty
// SetActivityStreamsActor sets the "actor" property.
SetActivityStreamsActor(i vocab.ActivityStreamsActorProperty)
// SetActivityStreamsObject sets the "object" property.

ファイルの表示

@ -57,6 +57,7 @@ func NewSocialActor(c CommonBehavior,
common: c,
c2s: c2s,
db: db,
clock: clock,
},
enableSocialProtocol: true,
clock: clock,
@ -84,6 +85,7 @@ func NewFederatingActor(c CommonBehavior,
common: c,
s2s: s2s,
db: db,
clock: clock,
},
enableFederatedProtocol: true,
clock: clock,
@ -110,6 +112,7 @@ func NewActor(c CommonBehavior,
c2s: c2s,
s2s: s2s,
db: db,
clock: clock,
},
enableSocialProtocol: true,
enableFederatedProtocol: true,
@ -325,7 +328,7 @@ func (b *baseActor) PostOutbox(c context.Context, w http.ResponseWriter, r *http
}
// Post the activity to the actor's outbox and trigger side effects for
// that particular Activity type.
deliverable, err := b.delegate.PostOutbox(c, activity, r.URL)
deliverable, err := b.delegate.PostOutbox(c, activity, r.URL, m)
if err != nil {
// Special case: We know it is a bad request if the object or
// target properties needed to be populated, but weren't.

ファイルの表示

@ -126,4 +126,11 @@ type Database interface {
//
// The library makes this call only after acquiring a lock first.
Following(c context.Context, actorIRI *url.URL) (followers vocab.ActivityStreamsCollection, err error)
// Liked obtains the Liked Collection for an actor with the
// given id.
//
// If modified, the library will then call Update.
//
// The library makes this call only after acquiring a lock first.
Liked(c context.Context, actorIRI *url.URL) (followers vocab.ActivityStreamsCollection, err error)
}

ファイルの表示

@ -120,7 +120,13 @@ type DelegateActor interface {
//
// If the error is ErrObjectRequired or ErrTargetRequired, then a Bad
// Request status is sent in the response.
PostOutbox(c context.Context, a Activity, outboxIRI *url.URL) (deliverable bool, e error)
//
// Note that 'rawJSON' is an unfortunate consequence where an 'Update'
// Activity is the only one that explicitly cares about 'null' values in
// JSON. Since go-fed does not differentiate between 'null' values and
// values that are simply not present, the 'rawJSON' map is ONLY needed
// for this narrow and specific use case.
PostOutbox(c context.Context, a Activity, outboxIRI *url.URL, rawJSON map[string]interface{}) (deliverable bool, e error)
// AddNewIds sets new URL ids on the activity. It also does so for all
// 'object' properties if the Activity is a Create type.
//

ファイルの表示

@ -67,14 +67,16 @@ type FederatingWrappedCallbacks struct {
// The wrapping function determines if this 'Accept' is in response to a
// 'Follow'. If so, then the 'actor' is added to the original 'actor's
// 'following' collection.
//
// Otherwise, no side effects are done by go-fed.
Accept func(context.Context, vocab.ActivityStreamsAccept) error
// Reject handles additional side effects for the Reject ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping function does nothing. However, if this 'Reject' is in
// response to a 'Follow' then the client MUST NOT go forward with
// adding the 'actor' to the original 'actor's 'following' collection by
// the client application.
// The wrapping function has no default side effects. However, if this
// 'Reject' is in response to a 'Follow' then the client MUST NOT go
// forward with adding the 'actor' to the original 'actor's 'following'
// collection by the client application.
Reject func(context.Context, vocab.ActivityStreamsReject) error
// Add handles additional side effects for the Add ActivityStreams
// type, specific to the application using go-fed.
@ -109,14 +111,17 @@ type FederatingWrappedCallbacks struct {
// is be the same as the 'actor' on all Activities being undone.
// It enforces that the actors on the Undo must correspond to all of the
// 'object' actors in some manner.
//
// It is expected that the application will implement the proper
// reversal of activities that are being undone.
Undo func(context.Context, vocab.ActivityStreamsUndo) error
// Block handles additional side effects for the Block ActivityStreams
// type, specific to the application using go-fed.
//
// The wrapping function does nothing. It simply calls this wrapped
// function. However, note that Blocks should not be received from a
// federated peer, as delivering Blocks explicitly deviates from the
// original ActivityPub specification.
// The wrapping function provides no default side effects. It simply
// calls the wrapped function. However, note that Blocks should not be
// received from a federated peer, as delivering Blocks explicitly
// deviates from the original ActivityPub specification.
Block func(context.Context, vocab.ActivityStreamsBlock) error
// Sidechannel data -- this is set at request handling time. These must
@ -195,11 +200,13 @@ func (w FederatingWrappedCallbacks) create(c context.Context, a vocab.ActivitySt
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
t := iter.GetType()
if t == nil {
// TODO: Dereference the IRI and store it.
continue
return nil
}
id, err := GetId(t)
if err != nil {
@ -209,14 +216,16 @@ func (w FederatingWrappedCallbacks) create(c context.Context, a vocab.ActivitySt
if err != nil {
return err
}
// WARNING: Unlock is not deferred.
defer w.db.Unlock(c, id)
if err := w.db.Create(c, t); err != nil {
w.db.Unlock(c, id)
return err
}
w.db.Unlock(c, id)
// At this point, Unlock should be called and in every above
// branch.
return nil
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
if err := loopFn(iter); err != nil {
return err
}
}
if w.Create != nil {
return w.Create(c, a)
@ -233,7 +242,9 @@ func (w FederatingWrappedCallbacks) update(c context.Context, a vocab.ActivitySt
if err := mustHaveActivityOriginMatchObjects(a); err != nil {
return err
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
t := iter.GetType()
if t == nil {
return fmt.Errorf("update requires an object to be wholly provided")
@ -246,14 +257,16 @@ func (w FederatingWrappedCallbacks) update(c context.Context, a vocab.ActivitySt
if err != nil {
return err
}
// WARNING: Unlock is not deferred.
defer w.db.Unlock(c, id)
if err := w.db.Update(c, t); err != nil {
w.db.Unlock(c, id)
return err
}
w.db.Unlock(c, id)
// At this point, Unlock should be called and in every above
// branch.
return nil
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
if err := loopFn(iter); err != nil {
return err
}
}
if w.Update != nil {
return w.Update(c, a)
@ -270,7 +283,9 @@ func (w FederatingWrappedCallbacks) deleteFn(c context.Context, a vocab.Activity
if err := mustHaveActivityOriginMatchObjects(a); err != nil {
return err
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
id, err := ToId(iter)
if err != nil {
return err
@ -279,14 +294,16 @@ func (w FederatingWrappedCallbacks) deleteFn(c context.Context, a vocab.Activity
if err != nil {
return err
}
// WARNING: Unlock is not deferred.
defer w.db.Unlock(c, id)
if err := w.db.Delete(c, id); err != nil {
w.db.Unlock(c, id)
return err
}
w.db.Unlock(c, id)
// At this point, Unlock should be called and in every above
// branch.
return nil
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
if err := loopFn(iter); err != nil {
return err
}
}
if w.Delete != nil {
return w.Delete(c, a)
@ -516,75 +533,8 @@ func (w FederatingWrappedCallbacks) add(c context.Context, a vocab.ActivityStrea
if target == nil || target.Len() == 0 {
return ErrTargetRequired
}
opIds := make([]*url.URL, 0, op.Len())
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
opIds = append(opIds, id)
}
targetIds := make([]*url.URL, 0, op.Len())
for iter := target.Begin(); iter != target.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
targetIds = append(targetIds, id)
}
for _, t := range targetIds {
if err := w.db.Lock(c, t); err != nil {
return err
}
// WARNING: Unlock not deferred.
if owns, err := w.db.Owns(c, t); err != nil {
w.db.Unlock(c, t)
return err
} else if !owns {
w.db.Unlock(c, t)
continue
}
tp, err := w.db.Get(c, t)
if err != nil {
w.db.Unlock(c, t)
return err
}
if streams.ActivityStreamsOrderedCollectionIsExtendedBy(tp) {
oi, ok := tp.(orderedItemser)
if !ok {
w.db.Unlock(c, t)
return fmt.Errorf("type extending from OrderedCollection cannot convert to orderedItemser interface")
}
oiProp := oi.GetActivityStreamsOrderedItems()
if oiProp == nil {
oiProp = streams.NewActivityStreamsOrderedItemsProperty()
oi.SetActivityStreamsOrderedItems(oiProp)
}
for _, objId := range opIds {
oiProp.AppendIRI(objId)
}
} else if streams.ActivityStreamsCollectionIsExtendedBy(tp) {
i, ok := tp.(itemser)
if !ok {
w.db.Unlock(c, t)
return fmt.Errorf("type extending from Collection cannot convert to itemser interface")
}
iProp := i.GetActivityStreamsItems()
if iProp == nil {
iProp = streams.NewActivityStreamsItemsProperty()
i.SetActivityStreamsItems(iProp)
}
for _, objId := range opIds {
iProp.AppendIRI(objId)
}
}
err = w.db.Update(c, tp)
if err != nil {
w.db.Unlock(c, t)
return err
}
w.db.Unlock(c, t)
// Unlock must be called by now and every branch above.
if err := add(c, op, target, w.db); err != nil {
return err
}
if w.Add != nil {
return w.Add(c, a)
@ -602,89 +552,8 @@ func (w FederatingWrappedCallbacks) remove(c context.Context, a vocab.ActivitySt
if target == nil || target.Len() == 0 {
return ErrTargetRequired
}
opIds := make(map[string]bool, op.Len())
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
opIds[id.String()] = true
}
targetIds := make([]*url.URL, 0, op.Len())
for iter := target.Begin(); iter != target.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
targetIds = append(targetIds, id)
}
for _, t := range targetIds {
if err := w.db.Lock(c, t); err != nil {
return err
}
// WARNING: Unlock not deferred.
if owns, err := w.db.Owns(c, t); err != nil {
w.db.Unlock(c, t)
return err
} else if !owns {
w.db.Unlock(c, t)
continue
}
tp, err := w.db.Get(c, t)
if err != nil {
w.db.Unlock(c, t)
return err
}
if streams.ActivityStreamsOrderedCollectionIsExtendedBy(tp) {
oi, ok := tp.(orderedItemser)
if !ok {
w.db.Unlock(c, t)
return fmt.Errorf("type extending from OrderedCollection cannot convert to orderedItemser interface")
}
oiProp := oi.GetActivityStreamsOrderedItems()
if oiProp != nil {
for i := 0; i < oiProp.Len(); /*Conditional*/ {
id, err := ToId(oiProp.At(i))
if err != nil {
w.db.Unlock(c, t)
return err
}
if opIds[id.String()] {
oiProp.Remove(i)
} else {
i++
}
}
}
} else if streams.ActivityStreamsCollectionIsExtendedBy(tp) {
i, ok := tp.(itemser)
if !ok {
w.db.Unlock(c, t)
return fmt.Errorf("type extending from Collection cannot convert to itemser interface")
}
iProp := i.GetActivityStreamsItems()
if iProp != nil {
for i := 0; i < iProp.Len(); /*Conditional*/ {
id, err := ToId(iProp.At(i))
if err != nil {
w.db.Unlock(c, t)
return err
}
if opIds[id.String()] {
iProp.Remove(i)
} else {
i++
}
}
}
}
err = w.db.Update(c, tp)
if err != nil {
w.db.Unlock(c, t)
return err
}
w.db.Unlock(c, t)
// Unlock must be called by now and every branch above.
if err := remove(c, op, target, w.db); err != nil {
return err
}
if w.Remove != nil {
return w.Remove(c, a)
@ -702,7 +571,9 @@ func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStre
if err != nil {
return err
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
objId, err := ToId(iter)
if err != nil {
return err
@ -710,22 +581,18 @@ func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStre
if err := w.db.Lock(c, objId); err != nil {
return err
}
// WARNING: Unlock not deferred.
defer w.db.Unlock(c, objId)
if owns, err := w.db.Owns(c, objId); err != nil {
w.db.Unlock(c, objId)
return err
} else if !owns {
w.db.Unlock(c, objId)
continue
return nil
}
t, err := w.db.Get(c, objId)
if err != nil {
w.db.Unlock(c, objId)
return err
}
l, ok := t.(likeser)
if !ok {
w.db.Unlock(c, objId)
return fmt.Errorf("cannot add Like to likes collection for type %T", t)
}
// Get 'likes' property on the object, creating default if
@ -759,16 +626,18 @@ func (w FederatingWrappedCallbacks) like(c context.Context, a vocab.ActivityStre
}
oItems.PrependIRI(id)
} else {
w.db.Unlock(c, objId)
return fmt.Errorf("likes type is neither a Collection nor an OrderedCollection: %T", likesT)
}
err = w.db.Update(c, t)
if err != nil {
w.db.Unlock(c, objId)
return err
}
w.db.Unlock(c, objId)
// Unlock must be called by now and every branch above.
return nil
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
if err := loopFn(iter); err != nil {
return err
}
}
if w.Like != nil {
return w.Like(c, a)
@ -783,7 +652,9 @@ func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.Activity
return err
}
op := a.GetActivityStreamsObject()
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(iter vocab.ActivityStreamsObjectPropertyIterator) error {
objId, err := ToId(iter)
if err != nil {
return err
@ -791,22 +662,18 @@ func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.Activity
if err := w.db.Lock(c, objId); err != nil {
return err
}
// WARNING: Unlock not deferred.
defer w.db.Unlock(c, objId)
if owns, err := w.db.Owns(c, objId); err != nil {
w.db.Unlock(c, objId)
return err
} else if !owns {
w.db.Unlock(c, objId)
continue
return nil
}
t, err := w.db.Get(c, objId)
if err != nil {
w.db.Unlock(c, objId)
return err
}
s, ok := t.(shareser)
if !ok {
w.db.Unlock(c, objId)
return fmt.Errorf("cannot add Announce to Shares collection for type %T", t)
}
// Get 'shares' property on the object, creating default if
@ -840,16 +707,18 @@ func (w FederatingWrappedCallbacks) announce(c context.Context, a vocab.Activity
}
oItems.PrependIRI(id)
} else {
w.db.Unlock(c, objId)
return fmt.Errorf("shares type is neither a Collection nor an OrderedCollection: %T", sharesT)
}
err = w.db.Update(c, t)
if err != nil {
w.db.Unlock(c, objId)
return err
}
w.db.Unlock(c, objId)
// Unlock must be called by now and every branch above.
return nil
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
if err := loopFn(iter); err != nil {
return err
}
}
if w.Announce != nil {
return w.Announce(c, a)
@ -864,34 +733,8 @@ func (w FederatingWrappedCallbacks) undo(c context.Context, a vocab.ActivityStre
return ErrObjectRequired
}
actors := a.GetActivityStreamsActor()
activityActorMap := make(map[string]bool, actors.Len())
for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
activityActorMap[id.String()] = true
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
t := iter.GetType()
if t == nil {
// TODO: Fetch the IRI
continue
}
ac, ok := t.(actorer)
if !ok {
return fmt.Errorf("cannot undo an object with no 'actor' property")
}
objActors := ac.GetActivityStreamsActor()
for iter := objActors.Begin(); iter != objActors.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
if !activityActorMap[id.String()] {
return fmt.Errorf("activity Undoing another does not have all actors on original activities")
}
}
if err := mustHaveActivityActorsMatchObjectActors(actors, op); err != nil {
return err
}
if w.Undo != nil {
return w.Undo(c, a)

ファイルの表示

@ -47,6 +47,11 @@ type publisheder interface {
GetActivityStreamsPublished() vocab.ActivityStreamsPublishedProperty
}
// updateder is an ActivityStreams type with a 'updateder' property
type updateder interface {
GetActivityStreamsUpdated() vocab.ActivityStreamsUpdatedProperty
}
// toer is an ActivityStreams type with a 'to' property
type toer interface {
GetActivityStreamsTo() vocab.ActivityStreamsToProperty

ファイルの表示

@ -26,6 +26,7 @@ type sideEffectActor struct {
s2s FederatingProtocol
c2s SocialProtocol
db Database
clock Clock
}
// AuthenticatePostInbox defers to the delegate to authenticate the request.
@ -274,14 +275,17 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL,
//
// This implementation assumes all types are meant to be delivered except for
// the ActivityStreams Block type.
func (a *sideEffectActor) PostOutbox(c context.Context, activity Activity, outboxIRI *url.URL) (deliverable bool, e error) {
func (a *sideEffectActor) PostOutbox(c context.Context, activity Activity, outboxIRI *url.URL, rawJSON map[string]interface{}) (deliverable bool, e error) {
wrapped, other := a.c2s.Callbacks(c)
// Populate side channels.
wrapped.db = a.db
wrapped.outboxIRI = outboxIRI
wrapped.rawActivity = rawJSON
wrapped.clock = a.clock
wrapped.deliverable = &deliverable
if e = wrapped.disjoint(other); e != nil {
return
}
// TODO: populate deliverable
res, err := streams.NewTypeResolver(append(wrapped.callbacks(), other...))
if err != nil {
return

ファイルの表示

@ -16,24 +16,28 @@ type SocialWrappedCallbacks struct {
// Create handles additional side effects for the Create ActivityStreams
// type.
//
// The wrapping callback for the Social Protocol copies the actor(s) to
// the 'attributedTo' property, copying recipients between the Create
// activity and all objects. It then saves the entry in the database.
// The wrapping callback copies the actor(s) to the 'attributedTo'
// property and copies recipients between the Create activity and all
// objects. It then saves the entry in the database.
Create func(context.Context, vocab.ActivityStreamsCreate) error
// Update handles additional side effects for the Update ActivityStreams
// type.
//
// TODO: Describe
// The wrapping callback applies new top-level values on an object to
// the stored objects. Any top-level null literals will be deleted on
// the stored objects as well.
Update func(context.Context, vocab.ActivityStreamsUpdate) error
// Delete handles additional side effects for the Delete ActivityStreams
// type.
//
// TODO: Describe
// The wrapping callback replaces the object(s) with tombstones in the
// database.
Delete func(context.Context, vocab.ActivityStreamsDelete) error
// Follow handles additional side effects for the Follow ActivityStreams
// type.
//
// TODO: Describe
// The wrapping callback only ensures the 'Follow' has at least one
// 'object' entry, but otherwise has no default side effect.
Follow func(context.Context, vocab.ActivityStreamsFollow) error
// Add handles additional side effects for the Add ActivityStreams
// type.
@ -58,7 +62,13 @@ type SocialWrappedCallbacks struct {
// Block handles additional side effects for the Block ActivityStreams
// type.
//
// TODO: Describe
// The wrapping callback only ensures the 'Block' has at least one
// 'object' entry, but otherwise has no default side effect. It is up
// to the wrapped application function to properly enforce the new
// blocking behavior.
//
// Note that go-fed does not federate 'Block' activities received in the
// Social Protocol.
Block func(context.Context, vocab.ActivityStreamsBlock) error
// Sidechannel data -- this is set at request handling time. These must
@ -67,9 +77,16 @@ type SocialWrappedCallbacks struct {
// db is the Database the SocialWrappedCallbacks should use. It must be
// set before calling the callbacks.
db Database
// outboxIRI is the outboxIRI that is handling this callback.
outboxIRI *url.URL
// rawActivity is the JSON map literal received when deserializing the
// request body.
rawActivity map[string]interface{}
// clock is the server's clock.
clock Clock
// deliverable is a sidechannel out, indicating if the handled activity
// should be delivered to a peer.
deliverable bool
deliverable *bool
}
// disjoint ensures that the functions given do not share a type signature with
@ -123,7 +140,7 @@ func (w SocialWrappedCallbacks) callbacks() []interface{} {
// create implements the social Create activity side effects.
func (w SocialWrappedCallbacks) create(c context.Context, a vocab.ActivityStreamsCreate) error {
w.deliverable = true
*w.deliverable = true
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
@ -189,12 +206,28 @@ func (w SocialWrappedCallbacks) create(c context.Context, a vocab.ActivityStream
if err := normalizeRecipients(a); err != nil {
return err
}
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(i int) error {
obj := op.At(i).GetType()
id, err := GetId(obj)
if err != nil {
return err
}
err = w.db.Lock(c, id)
if err != nil {
return err
}
defer w.db.Unlock(c, id)
if err := w.db.Create(c, obj); err != nil {
return err
}
return nil
}
// Persist all objects we've created, which will include sensitive
// recipients such as 'bcc' and 'bto'.
for i := 0; i < op.Len(); i++ {
obj := op.At(i).GetType()
// TODO: Lock
if err := w.db.Create(c, obj); err != nil {
if err := loopFn(i); err != nil {
return err
}
}
@ -206,296 +239,249 @@ func (w SocialWrappedCallbacks) create(c context.Context, a vocab.ActivityStream
// update implements the social Update activity side effects.
func (w SocialWrappedCallbacks) update(c context.Context, a vocab.ActivityStreamsUpdate) error {
*deliverable = true
if s.LenObject() == 0 {
return errObjectRequired
*w.deliverable = true
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
// Update should partially replace the 'object' with only the
// changed top-level fields.
ids, err := getObjectIds(s.Raw())
if err != nil {
return err
} else if len(ids) == 0 {
return fmt.Errorf("update has no id: %v", s)
// Obtain all object ids, which should be owned by this server.
objIds := make([]*url.URL, 0, op.Len())
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
objIds = append(objIds, id)
}
for idx, id := range ids {
pObj, err := f.App.Get(c, id, ReadWrite)
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(idx int, loopId *url.URL) error {
err := w.db.Lock(c, loopId)
if err != nil {
return err
}
m, err := pObj.Serialize()
defer w.db.Unlock(c, loopId)
t, err := w.db.Get(c, loopId)
if err != nil {
return err
}
if !s.Raw().IsObject(idx) {
return fmt.Errorf("update requires object to be wholly provided at index %d", idx)
}
updated, err := s.Raw().GetObject(idx).Serialize()
m, err := t.Serialize()
if err != nil {
return err
}
for k, v := range updated {
// Copy over new top-level values.
objType := op.At(idx).GetType()
if objType == nil {
return fmt.Errorf("object at index %d is not a literal type value", idx)
}
newM, err := objType.Serialize()
if err != nil {
return err
}
for k, v := range newM {
m[k] = v
}
if rawUpdatedObject := getRawObject(rawJson, id.String()); rawUpdatedObject != nil {
recursivelyApplyDeletes(m, rawUpdatedObject)
// Delete top-level values where the raw Activity had nils.
for k, v := range w.rawActivity {
if _, ok := m[k]; v == nil && ok {
delete(m, k)
}
}
p, err := ToPubObject(m)
newT, err := toType(c, m)
if err != nil {
return err
}
for _, elem := range p {
if err := f.App.Set(c, elem); err != nil {
return err
}
if err = w.db.Update(c, newT); err != nil {
return err
}
return nil
}
for i, id := range objIds {
if err := loopFn(i, id); err != nil {
return err
}
}
return f.ClientCallbacker.Update(c, s)
if w.Update != nil {
return w.Update(c, a)
}
return nil
}
// deleteFn implements the social Delete activity side effects.
func (w SocialWrappedCallbacks) deleteFn(c context.Context, a vocab.ActivityStreamsDelete) error {
*deliverable = true
if s.LenObject() == 0 {
return errObjectRequired
*w.deliverable = true
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
ids, err := getObjectIds(s.Raw())
if err != nil {
return err
} else if len(ids) == 0 {
return fmt.Errorf("delete has no id: %v", s)
}
for _, id := range ids {
pObj, err := f.App.Get(c, id, ReadWrite)
// Obtain all object ids, which should be owned by this server.
objIds := make([]*url.URL, 0, op.Len())
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
obj, ok := pObj.(vocab.ObjectType)
if !ok {
return fmt.Errorf("cannot delete non-ObjectType: %T", pObj)
objIds = append(objIds, id)
}
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(idx int, loopId *url.URL) error {
err := w.db.Lock(c, loopId)
if err != nil {
return err
}
tomb := toTombstone(obj, id, f.Clock.Now())
if err := f.App.Set(c, tomb); err != nil {
defer w.db.Unlock(c, loopId)
t, err := w.db.Get(c, loopId)
if err != nil {
return err
}
tomb := toTombstone(t, loopId, w.clock.Now())
if err := w.db.Update(c, tomb); err != nil {
return err
}
return nil
}
for i, id := range objIds {
if err := loopFn(i, id); err != nil {
return err
}
}
return f.ClientCallbacker.Delete(c, s)
if w.Delete != nil {
return w.Delete(c, a)
}
return nil
}
// follow implements the social Follow activity side effects.
func (w SocialWrappedCallbacks) follow(c context.Context, a vocab.ActivityStreamsFollow) error {
*deliverable = true
if s.LenObject() == 0 {
return errObjectRequired
*w.deliverable = true
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
return f.ClientCallbacker.Follow(c, s)
if w.Follow != nil {
return w.Follow(c, a)
}
return nil
}
// add implements the social Add activity side effects.
func (w SocialWrappedCallbacks) add(c context.Context, a vocab.ActivityStreamsAdd) error {
// TODO: Dedupe with FederatingWrappedCallbacks
*deliverable = true
if s.LenObject() == 0 {
return errObjectRequired
} else if s.LenTarget() == 0 {
return errTargetRequired
*w.deliverable = true
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
raw := s.Raw()
ids, err := getTargetIds(raw)
if err != nil {
target := a.GetActivityStreamsTarget()
if target == nil || target.Len() == 0 {
return ErrTargetRequired
}
if err := add(c, op, target, w.db); err != nil {
return err
} else if len(ids) == 0 {
return fmt.Errorf("add target has no ids: %v", s)
}
objIds, err := getObjectIds(s.Raw())
if err != nil {
return err
} else if len(objIds) == 0 {
return fmt.Errorf("add object has no ids: %v", s)
if w.Add != nil {
return w.Add(c, a)
}
var targets []vocab.ObjectType
for _, id := range ids {
if !f.App.Owns(c, id) {
continue
}
target, err := f.App.Get(c, id, ReadWrite)
if err != nil {
return err
}
ct, okCollection := target.(vocab.CollectionType)
oct, okOrdered := target.(vocab.OrderedCollectionType)
if !okCollection && !okOrdered {
return fmt.Errorf("cannot add to type that is not Collection and not OrderedCollection: %v", target)
} else if okCollection {
targets = append(targets, ct)
} else {
targets = append(targets, oct)
}
}
for i := 0; i < raw.ObjectLen(); i++ {
var obj vocab.ObjectType
var objId *url.URL
if raw.IsObjectIRI(i) {
objId = raw.GetObjectIRI(i)
if f.App.Owns(c, objId) {
pObj, err := f.App.Get(c, objId, Read)
var ok bool
if obj, ok = pObj.(vocab.ObjectType); !ok {
return fmt.Errorf("add object must be an activitypub object: %v", raw)
}
if err != nil {
return err
}
} else {
obj, err = f.dereferenceAsUser(outboxURL, objId)
if err != nil {
return err
}
}
} else if raw.IsObject(i) {
obj = raw.GetObject(i)
if !obj.HasId() {
return fmt.Errorf("add object missing iri")
}
objId = obj.GetId()
} else {
return fmt.Errorf("add object must be of object or iri type: %v", raw)
}
for _, target := range targets {
if !f.App.CanAdd(c, obj, target) {
continue
}
if ct, ok := target.(vocab.CollectionType); ok {
ct.AppendItemsIRI(objId)
} else if oct, ok := target.(vocab.OrderedCollectionType); ok {
oct.AppendOrderedItemsIRI(objId)
}
if err := f.App.Set(c, target); err != nil {
return err
}
}
}
return f.ClientCallbacker.Add(c, s)
return nil
}
// remove implements the social Remove activity side effects.
func (w SocialWrappedCallbacks) remove(c context.Context, a vocab.ActivityStreamsRemove) error {
// TODO: Dedupe with FederatingWrappedCallbacks
*deliverable = true
if s.LenObject() == 0 {
return errObjectRequired
} else if s.LenTarget() == 0 {
return errTargetRequired
*w.deliverable = true
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
raw := s.Raw()
ids, err := getTargetIds(raw)
if err != nil {
target := a.GetActivityStreamsTarget()
if target == nil || target.Len() == 0 {
return ErrTargetRequired
}
if err := remove(c, op, target, w.db); err != nil {
return err
} else if len(ids) == 0 {
return fmt.Errorf("remove target has no ids: %v", s)
}
objIds, err := getObjectIds(s.Raw())
if err != nil {
return err
} else if len(objIds) == 0 {
return fmt.Errorf("remove object has no ids: %v", s)
if w.Remove != nil {
return w.Remove(c, a)
}
var targets []vocab.ObjectType
for _, id := range ids {
if !f.App.Owns(c, id) {
continue
}
target, err := f.App.Get(c, id, ReadWrite)
if err != nil {
return err
}
ct, okCollection := target.(vocab.CollectionType)
oct, okOrdered := target.(vocab.OrderedCollectionType)
if !okCollection && !okOrdered {
return fmt.Errorf("cannot remove from type that is not Collection and not OrderedCollection: %v", target)
} else if okCollection {
targets = append(targets, ct)
} else {
targets = append(targets, oct)
}
}
for i := 0; i < raw.ObjectLen(); i++ {
if !raw.IsObject(i) {
// TODO: Fetch IRIs as well
return fmt.Errorf("remove object must be object type: %v", raw)
}
obj := raw.GetObject(i)
for _, target := range targets {
if !f.App.CanRemove(c, obj, target) {
continue
}
if ct, ok := target.(vocab.CollectionType); ok {
removeCollectionItemWithId(ct, obj.GetId())
} else if oct, ok := target.(vocab.OrderedCollectionType); ok {
removeOrderedCollectionItemWithId(oct, obj.GetId())
}
if err := f.App.Set(c, target); err != nil {
return err
}
}
}
return f.ClientCallbacker.Remove(c, s)
return nil
}
// like implements the social Like activity side effects.
func (w SocialWrappedCallbacks) like(c context.Context, a vocab.ActivityStreamsLike) error {
*deliverable = true
if s.LenObject() == 0 {
return errObjectRequired
*w.deliverable = true
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
getter := func(actor vocab.ObjectType, lc *vocab.CollectionType, loc *vocab.OrderedCollectionType) (bool, error) {
if actor.IsLikedAnyURI() {
pObj, err := f.App.Get(ctx, actor.GetLikedAnyURI(), ReadWrite)
if err != nil {
return true, err
}
ok := false
if *lc, ok = pObj.(vocab.CollectionType); !ok {
if *loc, ok = pObj.(vocab.OrderedCollectionType); !ok {
return true, fmt.Errorf("actors liked collection not CollectionType nor OrderedCollectionType")
}
}
return true, nil
} else if actor.IsLikedCollection() {
*lc = actor.GetLikedCollection()
return false, nil
} else if actor.IsLikedOrderedCollection() {
*loc = actor.GetLikedOrderedCollection()
return false, nil
}
*loc = &vocab.OrderedCollection{}
actor.SetLikedOrderedCollection(*loc)
return false, nil
}
if err := f.addAllObjectsToActorCollection(ctx, getter, s.Raw(), true); err != nil {
// Get this actor's IRI.
if err := w.db.Lock(c, w.outboxIRI); err != nil {
return err
}
return f.ClientCallbacker.Like(ctx, s)
// WARNING: Unlock not deferred.
actorIRI, err := w.db.ActorForInbox(c, w.outboxIRI)
if err != nil {
w.db.Unlock(c, w.outboxIRI)
return err
}
w.db.Unlock(c, w.outboxIRI)
// Unlock must be called by now and every branch above.
//
// Now obtain this actor's 'liked' collection.
if err := w.db.Lock(c, actorIRI); err != nil {
return err
}
defer w.db.Unlock(c, actorIRI)
liked, err := w.db.Liked(c, actorIRI)
if err != nil {
return err
}
likedItems := liked.GetActivityStreamsItems()
if likedItems == nil {
likedItems = streams.NewActivityStreamsItemsProperty()
liked.SetActivityStreamsItems(likedItems)
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
objId, err := ToId(iter)
if err != nil {
return err
}
likedItems.PrependIRI(objId)
}
err = w.db.Update(c, liked)
if err != nil {
return err
}
if w.Like != nil {
return w.Like(c, a)
}
return nil
}
// undo implements the social Undo activity side effects.
func (w SocialWrappedCallbacks) undo(c context.Context, a vocab.ActivityStreamsUndo) error {
*deliverable = true
if s.LenObject() == 0 {
return errObjectRequired
*w.deliverable = true
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
raw := s.Raw()
if err := f.ensureActivityActorsMatchObjectActors(raw); err != nil {
actors := a.GetActivityStreamsActor()
if err := mustHaveActivityActorsMatchObjectActors(actors, op); err != nil {
return err
}
return f.ClientCallbacker.Undo(c, s)
if w.Undo != nil {
return w.Undo(c, a)
}
return nil
}
// block implements the social Block activity side effects.
func (w SocialWrappedCallbacks) block(c context.Context, a vocab.ActivityStreamsBlock) error {
*deliverable = false
if s.LenObject() == 0 {
return errObjectRequired
*w.deliverable = false
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return ErrObjectRequired
}
return f.ClientCallbacker.Block(c, s)
if w.Block != nil {
return w.Block(c, a)
}
return nil
}

ファイルの表示

@ -9,6 +9,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"strings"
"sync"
)

ファイルの表示

@ -12,6 +12,7 @@ import (
"net/http"
"net/url"
"strings"
"time"
)
var (
@ -452,89 +453,22 @@ func stripHiddenRecipients(activity Activity) {
// mustHaveActivityOriginMatchObjects ensures that the Host in the activity id
// IRI matches all of the Hosts in the object id IRIs.
func mustHaveActivityOriginMatchObjects(a Activity) error {
if !a.HasId() {
return fmt.Errorf("activity has no iri")
originIRI, err := GetId(a)
if err != nil {
return err
}
originIRI := a.GetId()
originHost := originIRI.Host
for i := 0; i < a.ObjectLen(); i++ {
if a.IsObject(i) {
obj := a.GetObject(i)
if !obj.HasId() {
return fmt.Errorf("object at index %d has no id", i)
}
iri := obj.GetId()
if originHost != iri.Host {
return fmt.Errorf("object %q: not in activity origin", iri)
}
} else if a.IsObjectIRI(i) {
iri := a.GetObjectIRI(i)
if originHost != iri.Host {
return fmt.Errorf("object %q: not in activity origin", iri)
}
}
op := a.GetActivityStreamsObject()
if op == nil || op.Len() == 0 {
return nil
}
return nil
}
// mustHaveActivityActorsMatchObjectActors ensures that the set of actors on all
// objects in an activity are a subset of actor(s) on the activity.
func mustHaveActivityActorsMatchObjectActors(a vocab.ActivityType) error {
actorSet := make(map[string]bool, a.ActorLen())
for i := 0; i < a.ActorLen(); i++ {
if a.IsActorObject(i) {
obj := a.GetActorObject(i)
if !obj.HasId() {
return fmt.Errorf("actor object at index %d has no id", i)
}
actorSet[obj.GetId().String()] = true
} else if a.IsActorLink(i) {
l := a.GetActorLink(i)
if !l.HasHref() {
return fmt.Errorf("actor link at index %d has no href", i)
}
actorSet[l.GetHref().String()] = true
} else if a.IsActorIRI(i) {
actorSet[a.GetActorIRI(i).String()] = true
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
iri, err := ToId(iter)
if err != nil {
return err
}
}
objectActors := make(map[string]bool, a.ObjectLen())
for i := 0; i < a.ObjectLen(); i++ {
if a.IsObject(i) {
obj := a.GetObject(i)
if !obj.HasId() {
return fmt.Errorf("object at index %d has no id", i)
}
objectActivity, ok := obj.(vocab.ActivityType)
if !ok {
return fmt.Errorf("object at index %d is not an activity", i)
}
for j := 0; j < objectActivity.ActorLen(); j++ {
if objectActivity.IsActorObject(j) {
obj := objectActivity.GetActorObject(j)
if !obj.HasId() {
return fmt.Errorf("actor object at index (%d,%d) has no id", i, j)
}
objectActors[obj.GetId().String()] = true
} else if objectActivity.IsActorLink(j) {
l := objectActivity.GetActorLink(j)
if !l.HasHref() {
return fmt.Errorf("actor link at index (%d,%d) has no href", i, j)
}
objectActors[l.GetHref().String()] = true
} else if objectActivity.IsActorIRI(j) {
objectActors[objectActivity.GetActorIRI(j).String()] = true
}
}
} else if a.IsObjectIRI(i) {
// TODO: Dereference IRI
iri := a.GetObjectIRI(i)
return fmt.Errorf("unimplemented: fetching IRI for UNDO verification of ownership of %q", iri)
}
}
for k := range objectActors {
if !actorSet[k] {
return fmt.Errorf("at least 1 activity actors missing: %q", k)
if originHost != iri.Host {
return fmt.Errorf("object %q: not in activity origin", iri)
}
}
return nil
@ -548,7 +482,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error {
// Phase 0: Acquire all recipients on the activity.
//
// Obtain the actorTo map
actorToMap := make(map[string]interface{})
actorToMap := make(map[string]*url.URL)
actorTo := a.GetActivityStreamsTo()
if actorTo == nil {
actorTo = streams.NewActivityStreamsToProperty()
@ -562,7 +496,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error {
actorToMap[id.String()] = id
}
// Obtain the actorBto map
actorBtoMap := make(map[string]interface{})
actorBtoMap := make(map[string]*url.URL)
actorBto := a.GetActivityStreamsBto()
if actorBto == nil {
actorBto = streams.NewActivityStreamsBtoProperty()
@ -576,7 +510,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error {
actorBtoMap[id.String()] = id
}
// Obtain the actorCc map
actorCcMap := make(map[string]interface{})
actorCcMap := make(map[string]*url.URL)
actorCc := a.GetActivityStreamsCc()
if actorCc == nil {
actorCc = streams.NewActivityStreamsCcProperty()
@ -590,7 +524,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error {
actorCcMap[id.String()] = id
}
// Obtain the actorBcc map
actorBccMap := make(map[string]interface{})
actorBccMap := make(map[string]*url.URL)
actorBcc := a.GetActivityStreamsBcc()
if actorBcc == nil {
actorBcc = streams.NewActivityStreamsBccProperty()
@ -604,7 +538,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error {
actorBccMap[id.String()] = id
}
// Obtain the actorAudience map
actorAudienceMap := make(map[string]interface{})
actorAudienceMap := make(map[string]*url.URL)
actorAudience := a.GetActivityStreamsAudience()
if actorAudience == nil {
actorAudience = streams.NewActivityStreamsAudienceProperty()
@ -619,16 +553,16 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error {
}
// Obtain the objects maps for each recipient type.
o := a.GetActivityStreamsObject()
objsTo := make([]map[string]interface{}, o.Len())
objsBto := make([]map[string]interface{}, o.Len())
objsCco := make([]map[string]interface{}, o.Len())
objsBcc := make([]map[string]interface{}, o.Len())
objsAudience := make([]map[string]interface{}, o.Len())
objsTo := make([]map[string]*url.URL, o.Len())
objsBto := make([]map[string]*url.URL, o.Len())
objsCc := make([]map[string]*url.URL, o.Len())
objsBcc := make([]map[string]*url.URL, o.Len())
objsAudience := make([]map[string]*url.URL, o.Len())
for i := 0; i < o.Len(); i++ {
// Phase 1: Acquire all existing recipients on the object.
//
// Object to
objsTo[i] = make(map[string]interface{})
objsTo[i] = make(map[string]*url.URL)
var oTo vocab.ActivityStreamsToProperty
if tr, ok := o.At(i).(toer); !ok {
return fmt.Errorf("the Create object at %d has no 'to' property", i)
@ -647,7 +581,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error {
objsTo[i][id.String()] = id
}
// Object bto
objsBto[i] = make(map[string]interface{})
objsBto[i] = make(map[string]*url.URL)
var oBto vocab.ActivityStreamsBtoProperty
if tr, ok := o.At(i).(btoer); !ok {
return fmt.Errorf("the Create object at %d has no 'bto' property", i)
@ -666,7 +600,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error {
objsBto[i][id.String()] = id
}
// Object cc
objsCc[i] = make(map[string]interface{})
objsCc[i] = make(map[string]*url.URL)
var oCc vocab.ActivityStreamsCcProperty
if tr, ok := o.At(i).(ccer); !ok {
return fmt.Errorf("the Create object at %d has no 'cc' property", i)
@ -685,7 +619,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error {
objsCc[i][id.String()] = id
}
// Object bcc
objsBcc[i] = make(map[string]interface{})
objsBcc[i] = make(map[string]*url.URL)
var oBcc vocab.ActivityStreamsBccProperty
if tr, ok := o.At(i).(bccer); !ok {
return fmt.Errorf("the Create object at %d has no 'bcc' property", i)
@ -704,7 +638,7 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error {
objsBcc[i][id.String()] = id
}
// Object audience
objsAudience[i] = make(map[string]interface{})
objsAudience[i] = make(map[string]*url.URL)
var oAudience vocab.ActivityStreamsAudienceProperty
if tr, ok := o.At(i).(audiencer); !ok {
return fmt.Errorf("the Create object at %d has no 'audience' property", i)
@ -800,3 +734,240 @@ func normalizeRecipients(a vocab.ActivityStreamsCreate) error {
}
return nil
}
// toTombstone creates a Tombstone object for the given ActivityStreams value.
func toTombstone(obj vocab.Type, id *url.URL, now time.Time) vocab.ActivityStreamsTombstone {
tomb := streams.NewActivityStreamsTombstone()
// id property
idProp := streams.NewActivityStreamsIdProperty()
idProp.Set(id)
tomb.SetActivityStreamsId(idProp)
// formerType property
former := streams.NewActivityStreamsFormerTypeProperty()
tomb.SetActivityStreamsFormerType(former)
// Populate Former Type
former.AppendXMLSchemaString(obj.GetName())
// Copy over the published property if it existed
if pubber, ok := obj.(publisheder); ok {
if pub := pubber.GetActivityStreamsPublished(); pub != nil {
tomb.SetActivityStreamsPublished(pub)
}
}
// Copy over the updated property if it existed
if upder, ok := obj.(updateder); ok {
if upd := upder.GetActivityStreamsUpdated(); upd != nil {
tomb.SetActivityStreamsUpdated(upd)
}
}
// Set deleted time to now.
deleted := streams.NewActivityStreamsDeletedProperty()
deleted.Set(now)
tomb.SetActivityStreamsDeleted(deleted)
return tomb
}
// mustHaveActivityActorsMatchObjectActors ensures that the actors on types in
// the 'object' property are all listed in the 'actor' property.
func mustHaveActivityActorsMatchObjectActors(actors vocab.ActivityStreamsActorProperty,
op vocab.ActivityStreamsObjectProperty) error {
activityActorMap := make(map[string]bool, actors.Len())
for iter := actors.Begin(); iter != actors.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
activityActorMap[id.String()] = true
}
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
t := iter.GetType()
if t == nil {
// TODO: Fetch the IRI
continue
}
ac, ok := t.(actorer)
if !ok {
return fmt.Errorf("cannot verify actors: object value has no 'actor' property")
}
objActors := ac.GetActivityStreamsActor()
for iter := objActors.Begin(); iter != objActors.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
if !activityActorMap[id.String()] {
return fmt.Errorf("activity does not have all actors from its object's actors")
}
}
}
return nil
}
// add implements the logic of adding object ids to a target Collection or
// OrderedCollection. This logic is shared by both the C2S and S2S protocols.
func add(c context.Context,
op vocab.ActivityStreamsObjectProperty,
target vocab.ActivityStreamsTargetProperty,
db Database) error {
opIds := make([]*url.URL, 0, op.Len())
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
opIds = append(opIds, id)
}
targetIds := make([]*url.URL, 0, op.Len())
for iter := target.Begin(); iter != target.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
targetIds = append(targetIds, id)
}
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(t *url.URL) error {
if err := db.Lock(c, t); err != nil {
return err
}
defer db.Unlock(c, t)
if owns, err := db.Owns(c, t); err != nil {
return err
} else if !owns {
return nil
}
tp, err := db.Get(c, t)
if err != nil {
return err
}
if streams.ActivityStreamsOrderedCollectionIsExtendedBy(tp) {
oi, ok := tp.(orderedItemser)
if !ok {
return fmt.Errorf("type extending from OrderedCollection cannot convert to orderedItemser interface")
}
oiProp := oi.GetActivityStreamsOrderedItems()
if oiProp == nil {
oiProp = streams.NewActivityStreamsOrderedItemsProperty()
oi.SetActivityStreamsOrderedItems(oiProp)
}
for _, objId := range opIds {
oiProp.AppendIRI(objId)
}
} else if streams.ActivityStreamsCollectionIsExtendedBy(tp) {
i, ok := tp.(itemser)
if !ok {
return fmt.Errorf("type extending from Collection cannot convert to itemser interface")
}
iProp := i.GetActivityStreamsItems()
if iProp == nil {
iProp = streams.NewActivityStreamsItemsProperty()
i.SetActivityStreamsItems(iProp)
}
for _, objId := range opIds {
iProp.AppendIRI(objId)
}
}
err = db.Update(c, tp)
if err != nil {
return err
}
return nil
}
for _, t := range targetIds {
if err := loopFn(t); err != nil {
return err
}
}
return nil
}
// remove implements the logic of removing object ids to a target Collection or
// OrderedCollection. This logic is shared by both the C2S and S2S protocols.
func remove(c context.Context,
op vocab.ActivityStreamsObjectProperty,
target vocab.ActivityStreamsTargetProperty,
db Database) error {
opIds := make(map[string]bool, op.Len())
for iter := op.Begin(); iter != op.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
opIds[id.String()] = true
}
targetIds := make([]*url.URL, 0, op.Len())
for iter := target.Begin(); iter != target.End(); iter = iter.Next() {
id, err := ToId(iter)
if err != nil {
return err
}
targetIds = append(targetIds, id)
}
// Create anonymous loop function to be able to properly scope the defer
// for the database lock at each iteration.
loopFn := func(t *url.URL) error {
if err := db.Lock(c, t); err != nil {
return err
}
defer db.Unlock(c, t)
if owns, err := db.Owns(c, t); err != nil {
return err
} else if !owns {
return nil
}
tp, err := db.Get(c, t)
if err != nil {
return err
}
if streams.ActivityStreamsOrderedCollectionIsExtendedBy(tp) {
oi, ok := tp.(orderedItemser)
if !ok {
return fmt.Errorf("type extending from OrderedCollection cannot convert to orderedItemser interface")
}
oiProp := oi.GetActivityStreamsOrderedItems()
if oiProp != nil {
for i := 0; i < oiProp.Len(); /*Conditional*/ {
id, err := ToId(oiProp.At(i))
if err != nil {
return err
}
if opIds[id.String()] {
oiProp.Remove(i)
} else {
i++
}
}
}
} else if streams.ActivityStreamsCollectionIsExtendedBy(tp) {
i, ok := tp.(itemser)
if !ok {
return fmt.Errorf("type extending from Collection cannot convert to itemser interface")
}
iProp := i.GetActivityStreamsItems()
if iProp != nil {
for i := 0; i < iProp.Len(); /*Conditional*/ {
id, err := ToId(iProp.At(i))
if err != nil {
return err
}
if opIds[id.String()] {
iProp.Remove(i)
} else {
i++
}
}
}
}
err = db.Update(c, tp)
if err != nil {
return err
}
return nil
}
for _, t := range targetIds {
if err := loopFn(t); err != nil {
return err
}
}
return nil
}