Address several TODOs in side_effect_actor
このコミットが含まれているのは:
コミット
2099e89851
|
@ -226,7 +226,7 @@ func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL,
|
||||||
// by this server. This is only a boolean trigger: As soon as we get
|
// by this server. This is only a boolean trigger: As soon as we get
|
||||||
// a hit that we own something, then we should do inbox forwarding.
|
// a hit that we own something, then we should do inbox forwarding.
|
||||||
maxDepth := a.s2s.MaxInboxForwardingRecursionDepth(c)
|
maxDepth := a.s2s.MaxInboxForwardingRecursionDepth(c)
|
||||||
ownsValue, err := a.hasInboxForwardingValues(c, activity, maxDepth, 0)
|
ownsValue, err := a.hasInboxForwardingValues(c, inboxIRI, activity, maxDepth, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -339,12 +339,19 @@ func (a *sideEffectActor) Deliver(c context.Context, outboxIRI *url.URL, activit
|
||||||
|
|
||||||
// WrapInCreate wraps an object with a Create activity.
|
// WrapInCreate wraps an object with a Create activity.
|
||||||
func (a *sideEffectActor) WrapInCreate(c context.Context, obj vocab.Type, outboxIRI *url.URL) (create vocab.ActivityStreamsCreate, err error) {
|
func (a *sideEffectActor) WrapInCreate(c context.Context, obj vocab.Type, outboxIRI *url.URL) (create vocab.ActivityStreamsCreate, err error) {
|
||||||
// TODO: Acquire a lock.
|
err = a.db.Lock(c, outboxIRI)
|
||||||
actorIri, err := a.db.ActorForOutbox(c, outboxIRI)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return wrapInCreate(c, obj, actorIri)
|
// WARNING: No deferring the Unlock
|
||||||
|
actorIRI, err := a.db.ActorForOutbox(c, outboxIRI)
|
||||||
|
if err != nil {
|
||||||
|
a.db.Unlock(c, outboxIRI)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
a.db.Unlock(c, outboxIRI)
|
||||||
|
// Unlock the lock at this point and every branch above
|
||||||
|
return wrapInCreate(c, obj, actorIRI)
|
||||||
}
|
}
|
||||||
|
|
||||||
// deliverToRecipients will take a prepared Activity and send it to specific
|
// deliverToRecipients will take a prepared Activity and send it to specific
|
||||||
|
@ -465,7 +472,7 @@ func (a *sideEffectActor) addToInboxIfNew(c context.Context, inboxIRI *url.URL,
|
||||||
//
|
//
|
||||||
// Recursion may be limited by providing a 'maxDepth' greater than zero. A
|
// Recursion may be limited by providing a 'maxDepth' greater than zero. A
|
||||||
// value of zero or a negative number will result in infinite recursion.
|
// value of zero or a negative number will result in infinite recursion.
|
||||||
func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, val vocab.Type, maxDepth, currDepth int) (bool, error) {
|
func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, inboxIRI *url.URL, val vocab.Type, maxDepth, currDepth int) (bool, error) {
|
||||||
// Stop recurring if we are exceeding the maximum depth and the maximum
|
// Stop recurring if we are exceeding the maximum depth and the maximum
|
||||||
// is a positive number.
|
// is a positive number.
|
||||||
if maxDepth > 0 && currDepth >= maxDepth {
|
if maxDepth > 0 && currDepth >= maxDepth {
|
||||||
|
@ -485,17 +492,39 @@ func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, val vocab.
|
||||||
// about.
|
// about.
|
||||||
types, iris := getInboxForwardingValues(val)
|
types, iris := getInboxForwardingValues(val)
|
||||||
// For IRIs, simply check if we own them.
|
// For IRIs, simply check if we own them.
|
||||||
// TODO: Dereference and recur.
|
|
||||||
for _, iri := range iris {
|
for _, iri := range iris {
|
||||||
if owns, err := a.db.Owns(c, iri); err != nil {
|
if owns, err := a.db.Owns(c, iri); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
} else if owns {
|
} else if owns {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
// Attempt to dereference the IRI instead, and add it to the
|
||||||
|
// types we need to examine.
|
||||||
|
tport, err := a.common.NewTransport(c, inboxIRI, goFedUserAgent())
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
b, err := tport.Dereference(c, iri)
|
||||||
|
if err != nil {
|
||||||
|
// Do not fail the entire process if the data is
|
||||||
|
// missing.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var m map[string]interface{}
|
||||||
|
if err = json.Unmarshal(b, &m); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
t, err := toType(c, m)
|
||||||
|
if err != nil {
|
||||||
|
// Do not fail the entire process if we cannot handle
|
||||||
|
// the type.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
types = append(types, t)
|
||||||
}
|
}
|
||||||
// For embedded literals, recur.
|
// For embedded literals, recur.
|
||||||
for _, nextVal := range types {
|
for _, nextVal := range types {
|
||||||
if has, err := a.hasInboxForwardingValues(c, nextVal, maxDepth, currDepth+1); err != nil {
|
if has, err := a.hasInboxForwardingValues(c, inboxIRI, nextVal, maxDepth, currDepth+1); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
} else if has {
|
} else if has {
|
||||||
return true, nil
|
return true, nil
|
||||||
|
@ -561,7 +590,6 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit
|
||||||
r = append(r, val)
|
r = append(r, val)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// TODO: Support delivery to shared inbox
|
|
||||||
// 1. When an object is being delivered to the originating actor's
|
// 1. When an object is being delivered to the originating actor's
|
||||||
// followers, a server MAY reduce the number of receiving actors
|
// followers, a server MAY reduce the number of receiving actors
|
||||||
// delivered to by identifying all followers which share the same
|
// delivered to by identifying all followers which share the same
|
||||||
|
@ -584,11 +612,17 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Get inboxes of sender.
|
// Get inboxes of sender.
|
||||||
// TODO: Acquire a lock.
|
err = a.db.Lock(c, outboxIRI)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// WARNING: No deferring the Unlock
|
||||||
actorIRI, err := a.db.ActorForOutbox(c, outboxIRI)
|
actorIRI, err := a.db.ActorForOutbox(c, outboxIRI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
a.db.Unlock(c, outboxIRI)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
a.db.Unlock(c, outboxIRI)
|
||||||
// Make sure this matches the 'attributedTo' on the activity.
|
// Make sure this matches the 'attributedTo' on the activity.
|
||||||
attrTo := activity.GetActivityStreamsAttributedTo()
|
attrTo := activity.GetActivityStreamsAttributedTo()
|
||||||
if attrTo.Len() != 1 {
|
if attrTo.Len() != 1 {
|
||||||
|
@ -631,8 +665,8 @@ func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activit
|
||||||
// dereference the collection, WITH the user's credentials.
|
// dereference the collection, WITH the user's credentials.
|
||||||
//
|
//
|
||||||
// Note that this also applies to CollectionPage and OrderedCollectionPage.
|
// Note that this also applies to CollectionPage and OrderedCollectionPage.
|
||||||
// TODO: Handle Page types by paginating.
|
|
||||||
func (a *sideEffectActor) resolveInboxes(c context.Context, t Transport, r []*url.URL, depth, maxDepth int) (actors []vocab.Type, err error) {
|
func (a *sideEffectActor) resolveInboxes(c context.Context, t Transport, r []*url.URL, depth, maxDepth int) (actors []vocab.Type, err error) {
|
||||||
|
// TODO: Handle Page types by fetching additional pages.
|
||||||
if maxDepth > 0 && depth >= maxDepth {
|
if maxDepth > 0 && depth >= maxDepth {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
読み込み中…
新しいイシューから参照