From 2099e89851b28d591ba47f92d307f1f7e2167785 Mon Sep 17 00:00:00 2001 From: Cory Slep Date: Fri, 15 Feb 2019 21:59:02 +0100 Subject: [PATCH] Address several TODOs in side_effect_actor --- pub/side_effect_actor.go | 56 ++++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/pub/side_effect_actor.go b/pub/side_effect_actor.go index 1bb95cc..ebfb7f4 100644 --- a/pub/side_effect_actor.go +++ b/pub/side_effect_actor.go @@ -226,7 +226,7 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, // by this server. This is only a boolean trigger: As soon as we get // a hit that we own something, then we should do inbox forwarding. maxDepth := a.s2s.MaxInboxForwardingRecursionDepth(c) - ownsValue, err := a.hasInboxForwardingValues(c, activity, maxDepth, 0) + ownsValue, err := a.hasInboxForwardingValues(c, inboxIRI, activity, maxDepth, 0) if err != nil { return err } @@ -339,12 +339,19 @@ func (a *sideEffectActor) Deliver(c context.Context, outboxIRI *url.URL, activit // WrapInCreate wraps an object with a Create activity. func (a *sideEffectActor) WrapInCreate(c context.Context, obj vocab.Type, outboxIRI *url.URL) (create vocab.ActivityStreamsCreate, err error) { - // TODO: Acquire a lock. - actorIri, err := a.db.ActorForOutbox(c, outboxIRI) + err = a.db.Lock(c, outboxIRI) if err != nil { return } - return wrapInCreate(c, obj, actorIri) + // WARNING: No deferring the Unlock + actorIRI, err := a.db.ActorForOutbox(c, outboxIRI) + if err != nil { + a.db.Unlock(c, outboxIRI) + return + } + a.db.Unlock(c, outboxIRI) + // Unlock the lock at this point and every branch above + return wrapInCreate(c, obj, actorIRI) } // deliverToRecipients will take a prepared Activity and send it to specific @@ -465,7 +472,7 @@ func (a *sideEffectActor) addToInboxIfNew(c context.Context, inboxIRI *url.URL, // // Recursion may be limited by providing a 'maxDepth' greater than zero. A // value of zero or a negative number will result in infinite recursion. -func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, val vocab.Type, maxDepth, currDepth int) (bool, error) { +func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, inboxIRI *url.URL, val vocab.Type, maxDepth, currDepth int) (bool, error) { // Stop recurring if we are exceeding the maximum depth and the maximum // is a positive number. if maxDepth > 0 && currDepth >= maxDepth { @@ -485,17 +492,39 @@ func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, val vocab. // about. types, iris := getInboxForwardingValues(val) // For IRIs, simply check if we own them. - // TODO: Dereference and recur. for _, iri := range iris { if owns, err := a.db.Owns(c, iri); err != nil { return false, err } else if owns { return true, nil } + // Attempt to dereference the IRI instead, and add it to the + // types we need to examine. + tport, err := a.common.NewTransport(c, inboxIRI, goFedUserAgent()) + if err != nil { + return false, err + } + b, err := tport.Dereference(c, iri) + if err != nil { + // Do not fail the entire process if the data is + // missing. + continue + } + var m map[string]interface{} + if err = json.Unmarshal(b, &m); err != nil { + return false, err + } + t, err := toType(c, m) + if err != nil { + // Do not fail the entire process if we cannot handle + // the type. + continue + } + types = append(types, t) } // For embedded literals, recur. for _, nextVal := range types { - if has, err := a.hasInboxForwardingValues(c, nextVal, maxDepth, currDepth+1); err != nil { + if has, err := a.hasInboxForwardingValues(c, inboxIRI, nextVal, maxDepth, currDepth+1); err != nil { return false, err } else if has { return true, nil @@ -561,7 +590,6 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit r = append(r, val) } } - // TODO: Support delivery to shared inbox // 1. When an object is being delivered to the originating actor's // followers, a server MAY reduce the number of receiving actors // delivered to by identifying all followers which share the same @@ -584,11 +612,17 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit return nil, err } // Get inboxes of sender. - // TODO: Acquire a lock. + err = a.db.Lock(c, outboxIRI) + if err != nil { + return + } + // WARNING: No deferring the Unlock actorIRI, err := a.db.ActorForOutbox(c, outboxIRI) if err != nil { - return nil, err + a.db.Unlock(c, outboxIRI) + return } + a.db.Unlock(c, outboxIRI) // Make sure this matches the 'attributedTo' on the activity. attrTo := activity.GetActivityStreamsAttributedTo() if attrTo.Len() != 1 { @@ -631,8 +665,8 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit // dereference the collection, WITH the user's credentials. // // Note that this also applies to CollectionPage and OrderedCollectionPage. -// TODO: Handle Page types by paginating. func (a *sideEffectActor) resolveInboxes(c context.Context, t Transport, r []*url.URL, depth, maxDepth int) (actors []vocab.Type, err error) { + // TODO: Handle Page types by fetching additional pages. if maxDepth > 0 && depth >= maxDepth { return }