2019-02-12 08:16:33 +09:00
|
|
|
package pub
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"github.com/go-fed/activity/streams"
|
|
|
|
"github.com/go-fed/activity/streams/vocab"
|
|
|
|
"net/http"
|
|
|
|
"net/url"
|
|
|
|
"strings"
|
|
|
|
)
|
|
|
|
|
|
|
|
// sideEffectActor must satisfy the DelegateActor interface.
|
|
|
|
var _ DelegateActor = &sideEffectActor{}
|
|
|
|
|
|
|
|
// sideEffectActor is a DelegateActor that handles the ActivityPub
|
|
|
|
// implementation side effects, but requires a more opinionated application to
|
|
|
|
// be written.
|
|
|
|
//
|
|
|
|
// Note that when using the sideEffectActor with an application that good-faith
|
|
|
|
// implements its required interfaces, the ActivityPub specification is
|
|
|
|
// guaranteed to be correctly followed.
|
|
|
|
type sideEffectActor struct {
|
|
|
|
common CommonBehavior
|
|
|
|
s2s FederatingProtocol
|
|
|
|
c2s SocialProtocol
|
|
|
|
db Database
|
2019-02-15 05:51:57 +09:00
|
|
|
clock Clock
|
2019-02-12 08:16:33 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
// AuthenticatePostInbox defers to the delegate to authenticate the request.
|
2019-02-20 04:40:26 +09:00
|
|
|
func (a *sideEffectActor) AuthenticatePostInbox(c context.Context, w http.ResponseWriter, r *http.Request) (authenticated bool, err error) {
|
2019-02-12 08:16:33 +09:00
|
|
|
return a.s2s.AuthenticatePostInbox(c, w, r)
|
|
|
|
}
|
|
|
|
|
|
|
|
// AuthenticateGetInbox defers to the delegate to authenticate the request.
|
2019-02-20 04:40:26 +09:00
|
|
|
func (a *sideEffectActor) AuthenticateGetInbox(c context.Context, w http.ResponseWriter, r *http.Request) (authenticated bool, err error) {
|
2019-02-12 08:16:33 +09:00
|
|
|
return a.common.AuthenticateGetInbox(c, w, r)
|
|
|
|
}
|
|
|
|
|
|
|
|
// AuthenticatePostOutbox defers to the delegate to authenticate the request.
|
2019-02-20 04:40:26 +09:00
|
|
|
func (a *sideEffectActor) AuthenticatePostOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (authenticated bool, err error) {
|
2019-02-12 08:16:33 +09:00
|
|
|
return a.c2s.AuthenticatePostOutbox(c, w, r)
|
|
|
|
}
|
|
|
|
|
|
|
|
// AuthenticateGetOutbox defers to the delegate to authenticate the request.
|
2019-02-20 04:40:26 +09:00
|
|
|
func (a *sideEffectActor) AuthenticateGetOutbox(c context.Context, w http.ResponseWriter, r *http.Request) (authenticated bool, err error) {
|
2019-02-12 08:16:33 +09:00
|
|
|
return a.common.AuthenticateGetOutbox(c, w, r)
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetOutbox delegates to the SocialProtocol.
|
|
|
|
func (a *sideEffectActor) GetOutbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
|
|
|
|
// Compiler bug? Cannot directly return here:
|
|
|
|
// cannot use <T> as type vocab.ActivityStreamsOrderedCollectionPage in return argument
|
|
|
|
v1, v2 := a.c2s.GetOutbox(c, r)
|
|
|
|
return v1, v2
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetInbox delegates to the FederatingProtocol.
|
|
|
|
func (a *sideEffectActor) GetInbox(c context.Context, r *http.Request) (vocab.ActivityStreamsOrderedCollectionPage, error) {
|
|
|
|
return a.s2s.GetInbox(c, r)
|
|
|
|
}
|
|
|
|
|
|
|
|
// AuthorizePostInbox defers to the federating protocol whether the peer request
|
|
|
|
// is authorized based on the actors' ids.
|
2019-02-20 04:40:26 +09:00
|
|
|
func (a *sideEffectActor) AuthorizePostInbox(c context.Context, w http.ResponseWriter, activity Activity) (authorized bool, err error) {
|
|
|
|
authorized = false
|
2019-02-12 08:16:33 +09:00
|
|
|
actor := activity.GetActivityStreamsActor()
|
|
|
|
var iris []*url.URL
|
|
|
|
for i := 0; i < actor.Len(); i++ {
|
|
|
|
iter := actor.At(i)
|
|
|
|
if iter.IsIRI() {
|
|
|
|
iris = append(iris, iter.GetIRI())
|
|
|
|
} else if t := iter.GetType(); t != nil {
|
|
|
|
iris = append(iris, activity.GetActivityStreamsId().Get())
|
|
|
|
} else {
|
|
|
|
err = fmt.Errorf("actor at index %d is missing an id", i)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Determine if the actor(s) sending this request are blocked.
|
2019-02-20 04:40:26 +09:00
|
|
|
var blocked bool
|
|
|
|
if blocked, err = a.s2s.Blocked(c, iris); err != nil {
|
2019-02-12 08:16:33 +09:00
|
|
|
return
|
2019-02-20 04:40:26 +09:00
|
|
|
} else if blocked {
|
2019-02-12 08:16:33 +09:00
|
|
|
w.WriteHeader(http.StatusForbidden)
|
|
|
|
return
|
|
|
|
}
|
2019-02-20 04:40:26 +09:00
|
|
|
authorized = true
|
2019-02-12 08:16:33 +09:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// PostInbox handles the side effects of determining whether to block the peer's
|
|
|
|
// request, adding the activity to the actor's inbox, and triggering side
|
|
|
|
// effects based on the activity's type.
|
|
|
|
func (a *sideEffectActor) PostInbox(c context.Context, inboxIRI *url.URL, activity Activity) error {
|
|
|
|
isNew, err := a.addToInboxIfNew(c, inboxIRI, activity)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if isNew {
|
2019-02-14 06:37:29 +09:00
|
|
|
wrapped, other := a.s2s.Callbacks(c)
|
|
|
|
// Populate side channels.
|
2019-02-13 06:43:01 +09:00
|
|
|
wrapped.db = a.db
|
2019-02-14 07:56:36 +09:00
|
|
|
wrapped.inboxIRI = inboxIRI
|
2019-02-16 05:42:56 +09:00
|
|
|
wrapped.newTransport = a.common.NewTransport
|
2019-02-21 05:31:11 +09:00
|
|
|
res, err := streams.NewTypeResolver(wrapped.callbacks(other)...)
|
2019-02-12 08:16:33 +09:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-02-21 05:31:11 +09:00
|
|
|
if err = res.Resolve(c, activity); err != nil && !streams.IsUnmatchedErr(err) {
|
2019-02-12 08:16:33 +09:00
|
|
|
return err
|
2019-02-21 05:31:11 +09:00
|
|
|
} else if streams.IsUnmatchedErr(err) {
|
|
|
|
err = a.s2s.DefaultCallback(c, activity)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-02-12 08:16:33 +09:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// InboxForwarding implements the 3-part inbox forwarding algorithm specified in
|
|
|
|
// the ActivityPub specification. Does not modify the Activity, but may send
|
|
|
|
// outbound requests as a side effect.
|
|
|
|
//
|
|
|
|
// InboxForwarding sets the federated data in the database.
|
|
|
|
func (a *sideEffectActor) InboxForwarding(c context.Context, inboxIRI *url.URL, activity Activity) error {
|
|
|
|
// 1. Must be first time we have seen this Activity.
|
|
|
|
//
|
|
|
|
// Obtain the id of the activity
|
|
|
|
id := activity.GetActivityStreamsId()
|
|
|
|
// Acquire a lock for the id. To be held for the rest of execution.
|
|
|
|
err := a.db.Lock(c, id.Get())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-02-20 04:33:42 +09:00
|
|
|
// WARNING: Unlock is not deferred
|
|
|
|
//
|
2019-02-12 08:16:33 +09:00
|
|
|
// If the database already contains the activity, exit early.
|
|
|
|
exists, err := a.db.Exists(c, id.Get())
|
|
|
|
if err != nil {
|
2019-02-20 04:33:42 +09:00
|
|
|
a.db.Unlock(c, id.Get())
|
2019-02-12 08:16:33 +09:00
|
|
|
return err
|
|
|
|
} else if exists {
|
2019-02-20 04:33:42 +09:00
|
|
|
a.db.Unlock(c, id.Get())
|
2019-02-12 08:16:33 +09:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// Attempt to create the activity entry.
|
|
|
|
err = a.db.Create(c, activity)
|
|
|
|
if err != nil {
|
2019-02-20 04:33:42 +09:00
|
|
|
a.db.Unlock(c, id.Get())
|
2019-02-12 08:16:33 +09:00
|
|
|
return err
|
|
|
|
}
|
2019-02-20 04:33:42 +09:00
|
|
|
a.db.Unlock(c, id.Get())
|
|
|
|
// Unlock by this point and in every branch above.
|
|
|
|
//
|
2019-02-12 08:16:33 +09:00
|
|
|
// 2. The values of 'to', 'cc', or 'audience' are Collections owned by
|
|
|
|
// this server.
|
|
|
|
var r []*url.URL
|
|
|
|
to := activity.GetActivityStreamsTo()
|
|
|
|
for iter := to.Begin(); iter != to.End(); iter = iter.Next() {
|
|
|
|
val, err := ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
r = append(r, val)
|
|
|
|
}
|
|
|
|
cc := activity.GetActivityStreamsCc()
|
|
|
|
for iter := cc.Begin(); iter != cc.End(); iter = iter.Next() {
|
|
|
|
val, err := ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
r = append(r, val)
|
|
|
|
}
|
|
|
|
audience := activity.GetActivityStreamsAudience()
|
|
|
|
for iter := audience.Begin(); iter != audience.End(); iter = iter.Next() {
|
|
|
|
val, err := ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
r = append(r, val)
|
|
|
|
}
|
|
|
|
// Find all IRIs owned by this server. We need to find all of them so
|
|
|
|
// that forwarding can properly occur.
|
|
|
|
var myIRIs []*url.URL
|
|
|
|
for _, iri := range r {
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-02-16 06:04:08 +09:00
|
|
|
err = a.db.Lock(c, iri)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// WARNING: Unlock is not deferred
|
2019-02-12 08:16:33 +09:00
|
|
|
if owns, err := a.db.Owns(c, iri); err != nil {
|
2019-02-16 06:04:08 +09:00
|
|
|
a.db.Unlock(c, iri)
|
2019-02-12 08:16:33 +09:00
|
|
|
return err
|
|
|
|
} else if !owns {
|
2019-02-16 06:04:08 +09:00
|
|
|
a.db.Unlock(c, iri)
|
2019-02-12 08:16:33 +09:00
|
|
|
continue
|
|
|
|
}
|
2019-02-16 06:04:08 +09:00
|
|
|
a.db.Unlock(c, iri)
|
|
|
|
// Unlock by this point and in every branch above.
|
2019-02-12 08:16:33 +09:00
|
|
|
myIRIs = append(myIRIs, iri)
|
|
|
|
}
|
|
|
|
// Finally, load our IRIs to determine if they are a Collection or
|
|
|
|
// OrderedCollection.
|
|
|
|
//
|
|
|
|
// Load the unfiltered IRIs.
|
|
|
|
var colIRIs []*url.URL
|
|
|
|
col := make(map[string]itemser)
|
|
|
|
oCol := make(map[string]orderedItemser)
|
|
|
|
for _, iri := range myIRIs {
|
|
|
|
err = a.db.Lock(c, iri)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer a.db.Unlock(c, iri)
|
|
|
|
t, err := a.db.Get(c, iri)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if streams.ActivityStreamsOrderedCollectionIsExtendedBy(t) {
|
|
|
|
if im, ok := t.(orderedItemser); ok {
|
|
|
|
oCol[iri.String()] = im
|
|
|
|
colIRIs = append(colIRIs, iri)
|
|
|
|
} else {
|
|
|
|
a.db.Unlock(c, iri)
|
|
|
|
}
|
|
|
|
} else if streams.ActivityStreamsCollectionIsExtendedBy(t) {
|
|
|
|
if im, ok := t.(itemser); ok {
|
|
|
|
col[iri.String()] = im
|
|
|
|
colIRIs = append(colIRIs, iri)
|
|
|
|
} else {
|
|
|
|
a.db.Unlock(c, iri)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
a.db.Unlock(c, iri)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// If we own none of the Collection IRIs in 'to', 'cc', or 'audience'
|
|
|
|
// then no need to do inbox forwarding. We have nothing to forward to.
|
|
|
|
if len(colIRIs) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// 3. The values of 'inReplyTo', 'object', 'target', or 'tag' are owned
|
|
|
|
// by this server. This is only a boolean trigger: As soon as we get
|
|
|
|
// a hit that we own something, then we should do inbox forwarding.
|
2019-02-14 06:37:29 +09:00
|
|
|
maxDepth := a.s2s.MaxInboxForwardingRecursionDepth(c)
|
2019-02-16 05:59:02 +09:00
|
|
|
ownsValue, err := a.hasInboxForwardingValues(c, inboxIRI, activity, maxDepth, 0)
|
2019-02-12 08:16:33 +09:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// If we don't own any of the 'inReplyTo', 'object', 'target', or 'tag'
|
|
|
|
// values, then no need to do inbox forwarding.
|
|
|
|
if !ownsValue {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// Do the inbox forwarding since the above conditions hold true. Support
|
|
|
|
// the behavior of letting the application filter out the resulting
|
|
|
|
// collections to be targeted.
|
|
|
|
toSend, err := a.s2s.FilterForwarding(c, colIRIs, activity)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
recipients := make([]*url.URL, 0, len(toSend))
|
|
|
|
for _, iri := range toSend {
|
|
|
|
if c, ok := col[iri.String()]; ok {
|
|
|
|
it := c.GetActivityStreamsItems()
|
|
|
|
for iter := it.Begin(); iter != it.End(); iter = iter.Next() {
|
|
|
|
id, err := ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
recipients = append(recipients, id)
|
|
|
|
}
|
|
|
|
} else if oc, ok := oCol[iri.String()]; ok {
|
|
|
|
oit := oc.GetActivityStreamsOrderedItems()
|
|
|
|
for iter := oit.Begin(); iter != oit.End(); iter = iter.Next() {
|
|
|
|
id, err := ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
recipients = append(recipients, id)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return a.deliverToRecipients(c, inboxIRI, activity, recipients)
|
|
|
|
}
|
|
|
|
|
|
|
|
// PostOutbox handles the side effects of adding the activity to the actor's
|
|
|
|
// outbox, and triggering side effects based on the activity's type.
|
|
|
|
//
|
|
|
|
// This implementation assumes all types are meant to be delivered except for
|
|
|
|
// the ActivityStreams Block type.
|
2019-02-21 05:31:11 +09:00
|
|
|
func (a *sideEffectActor) PostOutbox(c context.Context, activity Activity, outboxIRI *url.URL, rawJSON map[string]interface{}) (deliverable bool, err error) {
|
2019-02-14 06:37:29 +09:00
|
|
|
wrapped, other := a.c2s.Callbacks(c)
|
|
|
|
// Populate side channels.
|
2019-02-13 06:43:01 +09:00
|
|
|
wrapped.db = a.db
|
2019-02-15 05:51:57 +09:00
|
|
|
wrapped.outboxIRI = outboxIRI
|
|
|
|
wrapped.rawActivity = rawJSON
|
|
|
|
wrapped.clock = a.clock
|
2019-02-16 05:42:56 +09:00
|
|
|
wrapped.newTransport = a.common.NewTransport
|
2019-02-15 05:51:57 +09:00
|
|
|
wrapped.deliverable = &deliverable
|
2019-02-21 05:31:11 +09:00
|
|
|
var res *streams.TypeResolver
|
|
|
|
res, err = streams.NewTypeResolver(wrapped.callbacks(other)...)
|
2019-02-12 08:16:33 +09:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
2019-02-21 05:31:11 +09:00
|
|
|
if err = res.Resolve(c, activity); err != nil && !streams.IsUnmatchedErr(err) {
|
2019-02-12 08:16:33 +09:00
|
|
|
return
|
2019-02-21 05:31:11 +09:00
|
|
|
} else if streams.IsUnmatchedErr(err) {
|
|
|
|
deliverable = true
|
|
|
|
err = a.c2s.DefaultCallback(c, activity)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
2019-02-12 08:16:33 +09:00
|
|
|
}
|
|
|
|
err = a.addToOutbox(c, outboxIRI, activity)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// AddNewIds creates new 'id' entries on an activity and its objects if it is a
|
|
|
|
// Create activity.
|
|
|
|
func (a *sideEffectActor) AddNewIds(c context.Context, activity Activity) error {
|
|
|
|
id, err := a.db.NewId(c, activity)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
activityId := streams.NewActivityStreamsIdProperty()
|
|
|
|
activityId.Set(id)
|
|
|
|
activity.SetActivityStreamsId(activityId)
|
|
|
|
if streams.ActivityStreamsCreateIsExtendedBy(activity) {
|
|
|
|
o, ok := activity.(objecter)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("cannot add new id for Create: %T has no object property", activity)
|
|
|
|
}
|
|
|
|
oProp := o.GetActivityStreamsObject()
|
|
|
|
for iter := oProp.Begin(); iter != oProp.End(); iter = iter.Next() {
|
|
|
|
t := iter.GetType()
|
|
|
|
if t == nil {
|
|
|
|
return fmt.Errorf("cannot add new id for object in Create: object is not embedded as a value literal")
|
|
|
|
}
|
|
|
|
id, err = a.db.NewId(c, t)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
objId := streams.NewActivityStreamsIdProperty()
|
|
|
|
objId.Set(id)
|
|
|
|
t.SetActivityStreamsId(objId)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// deliver will complete the peer-to-peer sending of a federated message to
|
|
|
|
// another server.
|
|
|
|
//
|
|
|
|
// Must only be called if both social and federated protocols are supported.
|
|
|
|
func (a *sideEffectActor) Deliver(c context.Context, outboxIRI *url.URL, activity Activity) error {
|
|
|
|
recipients, err := a.prepare(c, outboxIRI, activity)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return a.deliverToRecipients(c, outboxIRI, activity, recipients)
|
|
|
|
}
|
|
|
|
|
|
|
|
// WrapInCreate wraps an object with a Create activity.
|
|
|
|
func (a *sideEffectActor) WrapInCreate(c context.Context, obj vocab.Type, outboxIRI *url.URL) (create vocab.ActivityStreamsCreate, err error) {
|
2019-02-16 05:59:02 +09:00
|
|
|
err = a.db.Lock(c, outboxIRI)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// WARNING: No deferring the Unlock
|
|
|
|
actorIRI, err := a.db.ActorForOutbox(c, outboxIRI)
|
2019-02-12 08:16:33 +09:00
|
|
|
if err != nil {
|
2019-02-16 05:59:02 +09:00
|
|
|
a.db.Unlock(c, outboxIRI)
|
2019-02-12 08:16:33 +09:00
|
|
|
return
|
|
|
|
}
|
2019-02-16 05:59:02 +09:00
|
|
|
a.db.Unlock(c, outboxIRI)
|
|
|
|
// Unlock the lock at this point and every branch above
|
|
|
|
return wrapInCreate(c, obj, actorIRI)
|
2019-02-12 08:16:33 +09:00
|
|
|
}
|
|
|
|
|
|
|
|
// deliverToRecipients will take a prepared Activity and send it to specific
|
|
|
|
// recipients on behalf of an actor.
|
|
|
|
func (a *sideEffectActor) deliverToRecipients(c context.Context, boxIRI *url.URL, activity Activity, recipients []*url.URL) error {
|
|
|
|
m, err := serialize(activity)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
b, err := json.Marshal(m)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-02-16 05:42:56 +09:00
|
|
|
tp, err := a.common.NewTransport(c, boxIRI, goFedUserAgent())
|
2019-02-12 08:16:33 +09:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
mErr := make(map[string]error)
|
|
|
|
for _, to := range recipients {
|
|
|
|
err := tp.Deliver(c, b, to)
|
|
|
|
if err != nil {
|
|
|
|
mErr[to.String()] = err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(mErr) > 0 {
|
|
|
|
s := make([]string, 0, len(mErr))
|
|
|
|
for k, v := range mErr {
|
|
|
|
s = append(s, fmt.Sprintf("%s=%s", k, v.Error()))
|
|
|
|
}
|
|
|
|
return fmt.Errorf("requests failed: %s", strings.Join(s, ";"))
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// addToOutbox adds the activity to the outbox and creates the activity in the
|
|
|
|
// internal database as its own entry.
|
|
|
|
func (a *sideEffectActor) addToOutbox(c context.Context, outboxIRI *url.URL, activity Activity) error {
|
|
|
|
// Set the activity in the database first.
|
|
|
|
id := activity.GetActivityStreamsId()
|
|
|
|
err := a.db.Lock(c, id.Get())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// WARNING: Unlock not deferred
|
|
|
|
err = a.db.Create(c, activity)
|
|
|
|
if err != nil {
|
|
|
|
a.db.Unlock(c, id.Get())
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
a.db.Unlock(c, id.Get())
|
|
|
|
// WARNING: Unlock(c, id) should be called by this point and in every
|
|
|
|
// return before here.
|
2019-02-20 04:33:42 +09:00
|
|
|
//
|
2019-02-12 08:16:33 +09:00
|
|
|
// Acquire a lock to read the outbox. Defer release.
|
|
|
|
err = a.db.Lock(c, outboxIRI)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer a.db.Unlock(c, outboxIRI)
|
|
|
|
outbox, err := a.db.GetOutbox(c, outboxIRI)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// Prepend the activity to the list of 'orderedItems'.
|
|
|
|
oi := outbox.GetActivityStreamsOrderedItems()
|
|
|
|
if oi == nil {
|
|
|
|
oi = streams.NewActivityStreamsOrderedItemsProperty()
|
|
|
|
}
|
|
|
|
oi.PrependIRI(id.Get())
|
|
|
|
outbox.SetActivityStreamsOrderedItems(oi)
|
|
|
|
// Save in the database.
|
|
|
|
err = a.db.SetOutbox(c, outbox)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// addToInboxIfNew will add the activity to the inbox at the specified IRI if
|
|
|
|
// the activity's ID has not yet been added to the inbox.
|
|
|
|
//
|
|
|
|
// It does not add the activity to this database's know federated data.
|
|
|
|
//
|
|
|
|
// Returns true when the activity is novel.
|
|
|
|
func (a *sideEffectActor) addToInboxIfNew(c context.Context, inboxIRI *url.URL, activity Activity) (isNew bool, err error) {
|
|
|
|
// Acquire a lock to read the inbox. Defer release.
|
|
|
|
err = a.db.Lock(c, inboxIRI)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer a.db.Unlock(c, inboxIRI)
|
|
|
|
// Obtain the id of the activity
|
|
|
|
id := activity.GetActivityStreamsId()
|
|
|
|
// If the inbox already contains the URL, early exit.
|
|
|
|
contains, err := a.db.InboxContains(c, inboxIRI, id.Get())
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
} else if contains {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// It is a new id, acquire the inbox.
|
|
|
|
isNew = true
|
|
|
|
inbox, err := a.db.GetInbox(c, inboxIRI)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// Prepend the activity to the list of 'orderedItems'.
|
|
|
|
oi := inbox.GetActivityStreamsOrderedItems()
|
|
|
|
if oi == nil {
|
|
|
|
oi = streams.NewActivityStreamsOrderedItemsProperty()
|
|
|
|
}
|
|
|
|
oi.PrependIRI(id.Get())
|
|
|
|
inbox.SetActivityStreamsOrderedItems(oi)
|
|
|
|
// Save in the database.
|
|
|
|
err = a.db.SetInbox(c, inbox)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Given an ActivityStreams value, recursively examines ownership of the id or
|
|
|
|
// href and the ones on properties applicable to inbox forwarding.
|
|
|
|
//
|
|
|
|
// Recursion may be limited by providing a 'maxDepth' greater than zero. A
|
|
|
|
// value of zero or a negative number will result in infinite recursion.
|
2019-02-16 05:59:02 +09:00
|
|
|
func (a *sideEffectActor) hasInboxForwardingValues(c context.Context, inboxIRI *url.URL, val vocab.Type, maxDepth, currDepth int) (bool, error) {
|
2019-02-12 08:16:33 +09:00
|
|
|
// Stop recurring if we are exceeding the maximum depth and the maximum
|
|
|
|
// is a positive number.
|
|
|
|
if maxDepth > 0 && currDepth >= maxDepth {
|
|
|
|
return false, nil
|
|
|
|
}
|
|
|
|
// Determine if we own the 'id' for this value.
|
|
|
|
id, err := GetId(val)
|
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
2019-02-16 06:04:08 +09:00
|
|
|
err = a.db.Lock(c, id)
|
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
// WARNING: Unlock is not deferred
|
2019-02-12 08:16:33 +09:00
|
|
|
if owns, err := a.db.Owns(c, id); err != nil {
|
2019-02-16 06:04:08 +09:00
|
|
|
a.db.Unlock(c, id)
|
2019-02-12 08:16:33 +09:00
|
|
|
return false, err
|
|
|
|
} else if owns {
|
2019-02-16 06:04:08 +09:00
|
|
|
a.db.Unlock(c, id)
|
2019-02-12 08:16:33 +09:00
|
|
|
return true, nil
|
|
|
|
}
|
2019-02-16 06:04:08 +09:00
|
|
|
a.db.Unlock(c, id)
|
|
|
|
// Unlock by this point and in every branch above
|
|
|
|
//
|
2019-02-12 08:16:33 +09:00
|
|
|
// Determine if we own the 'id' of any values on the properties we care
|
|
|
|
// about.
|
|
|
|
types, iris := getInboxForwardingValues(val)
|
|
|
|
// For IRIs, simply check if we own them.
|
|
|
|
for _, iri := range iris {
|
2019-02-16 06:04:08 +09:00
|
|
|
err = a.db.Lock(c, iri)
|
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
// WARNING: Unlock is not deferred
|
2019-02-12 08:16:33 +09:00
|
|
|
if owns, err := a.db.Owns(c, iri); err != nil {
|
2019-02-16 06:04:08 +09:00
|
|
|
a.db.Unlock(c, iri)
|
2019-02-12 08:16:33 +09:00
|
|
|
return false, err
|
|
|
|
} else if owns {
|
2019-02-16 06:04:08 +09:00
|
|
|
a.db.Unlock(c, iri)
|
2019-02-12 08:16:33 +09:00
|
|
|
return true, nil
|
|
|
|
}
|
2019-02-16 06:04:08 +09:00
|
|
|
a.db.Unlock(c, iri)
|
|
|
|
// Unlock by this point and in every branch above
|
|
|
|
//
|
2019-02-16 05:59:02 +09:00
|
|
|
// Attempt to dereference the IRI instead, and add it to the
|
|
|
|
// types we need to examine.
|
|
|
|
tport, err := a.common.NewTransport(c, inboxIRI, goFedUserAgent())
|
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
b, err := tport.Dereference(c, iri)
|
|
|
|
if err != nil {
|
|
|
|
// Do not fail the entire process if the data is
|
|
|
|
// missing.
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
var m map[string]interface{}
|
|
|
|
if err = json.Unmarshal(b, &m); err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
2019-02-16 06:47:59 +09:00
|
|
|
t, err := streams.ToType(c, m)
|
2019-02-16 05:59:02 +09:00
|
|
|
if err != nil {
|
|
|
|
// Do not fail the entire process if we cannot handle
|
|
|
|
// the type.
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
types = append(types, t)
|
2019-02-12 08:16:33 +09:00
|
|
|
}
|
|
|
|
// For embedded literals, recur.
|
|
|
|
for _, nextVal := range types {
|
2019-02-16 05:59:02 +09:00
|
|
|
if has, err := a.hasInboxForwardingValues(c, inboxIRI, nextVal, maxDepth, currDepth+1); err != nil {
|
2019-02-12 08:16:33 +09:00
|
|
|
return false, err
|
|
|
|
} else if has {
|
|
|
|
return true, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// prepare takes a deliverableObject and returns a list of the proper recipient
|
|
|
|
// target URIs. Additionally, the deliverableObject will have any hidden
|
|
|
|
// hidden recipients ("bto" and "bcc") stripped from it.
|
|
|
|
//
|
|
|
|
// Only call if both the social and federated protocol are supported.
|
|
|
|
func (a *sideEffectActor) prepare(c context.Context, outboxIRI *url.URL, activity Activity) (r []*url.URL, err error) {
|
|
|
|
// Get inboxes of recipients
|
|
|
|
if to := activity.GetActivityStreamsTo(); to != nil {
|
|
|
|
for iter := to.Begin(); iter != to.End(); iter = iter.Next() {
|
|
|
|
var val *url.URL
|
|
|
|
val, err = ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
r = append(r, val)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if bto := activity.GetActivityStreamsBto(); bto != nil {
|
|
|
|
for iter := bto.Begin(); iter != bto.End(); iter = iter.Next() {
|
|
|
|
var val *url.URL
|
|
|
|
val, err = ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
r = append(r, val)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if cc := activity.GetActivityStreamsCc(); cc != nil {
|
|
|
|
for iter := cc.Begin(); iter != cc.End(); iter = iter.Next() {
|
|
|
|
var val *url.URL
|
|
|
|
val, err = ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
r = append(r, val)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if bcc := activity.GetActivityStreamsBcc(); bcc != nil {
|
|
|
|
for iter := bcc.Begin(); iter != bcc.End(); iter = iter.Next() {
|
|
|
|
var val *url.URL
|
|
|
|
val, err = ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
r = append(r, val)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if audience := activity.GetActivityStreamsAudience(); audience != nil {
|
|
|
|
for iter := audience.Begin(); iter != audience.End(); iter = iter.Next() {
|
|
|
|
var val *url.URL
|
|
|
|
val, err = ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
r = append(r, val)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// 1. When an object is being delivered to the originating actor's
|
|
|
|
// followers, a server MAY reduce the number of receiving actors
|
|
|
|
// delivered to by identifying all followers which share the same
|
|
|
|
// sharedInbox who would otherwise be individual recipients and
|
|
|
|
// instead deliver objects to said sharedInbox.
|
|
|
|
// 2. If an object is addressed to the Public special collection, a
|
|
|
|
// server MAY deliver that object to all known sharedInbox endpoints
|
|
|
|
// on the network.
|
|
|
|
r = filterURLs(r, IsPublic)
|
2019-02-16 05:42:56 +09:00
|
|
|
t, err := a.common.NewTransport(c, outboxIRI, goFedUserAgent())
|
2019-02-12 08:16:33 +09:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2019-02-14 06:37:29 +09:00
|
|
|
receiverActors, err := a.resolveInboxes(c, t, r, 0, a.s2s.MaxDeliveryRecursionDepth(c))
|
2019-02-12 08:16:33 +09:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
targets, err := getInboxes(receiverActors)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// Get inboxes of sender.
|
2019-02-16 05:59:02 +09:00
|
|
|
err = a.db.Lock(c, outboxIRI)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// WARNING: No deferring the Unlock
|
2019-02-14 06:37:29 +09:00
|
|
|
actorIRI, err := a.db.ActorForOutbox(c, outboxIRI)
|
2019-02-12 08:16:33 +09:00
|
|
|
if err != nil {
|
2019-02-16 05:59:02 +09:00
|
|
|
a.db.Unlock(c, outboxIRI)
|
|
|
|
return
|
2019-02-12 08:16:33 +09:00
|
|
|
}
|
2019-02-16 05:59:02 +09:00
|
|
|
a.db.Unlock(c, outboxIRI)
|
2019-02-12 08:16:33 +09:00
|
|
|
// Make sure this matches the 'attributedTo' on the activity.
|
|
|
|
attrTo := activity.GetActivityStreamsAttributedTo()
|
|
|
|
if attrTo.Len() != 1 {
|
|
|
|
return nil, fmt.Errorf("federated c2s object does not have exactly one attributedTo value: %d", attrTo.Len())
|
|
|
|
} else if attrToIRI, err := ToId(attrTo.At(0)); err != nil {
|
|
|
|
return nil, err
|
|
|
|
} else if attrToIRI.String() != actorIRI.String() {
|
|
|
|
return nil, fmt.Errorf("federated c2s object attributedTo value does not match this actor")
|
|
|
|
}
|
|
|
|
// Get the inbox on the sender.
|
|
|
|
err = a.db.Lock(c, actorIRI)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// BEGIN LOCK
|
|
|
|
thisActor, err := a.db.Get(c, actorIRI)
|
|
|
|
a.db.Unlock(c, actorIRI)
|
|
|
|
// END LOCK -- Still need to handle err
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// Post-processing
|
|
|
|
var ignore *url.URL
|
|
|
|
ignore, err = getInbox(thisActor)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
r = dedupeIRIs(targets, []*url.URL{ignore})
|
|
|
|
stripHiddenRecipients(activity)
|
|
|
|
return r, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// resolveInboxes takes a list of Actor id URIs and returns them as concrete
|
|
|
|
// instances of actorObject. It attempts to apply recursively when it encounters
|
|
|
|
// a target that is a Collection or OrderedCollection.
|
|
|
|
//
|
|
|
|
// If maxDepth is zero or negative, then recursion is infinitely applied.
|
|
|
|
//
|
|
|
|
// If a recipient is a Collection or OrderedCollection, then the server MUST
|
|
|
|
// dereference the collection, WITH the user's credentials.
|
|
|
|
//
|
|
|
|
// Note that this also applies to CollectionPage and OrderedCollectionPage.
|
|
|
|
func (a *sideEffectActor) resolveInboxes(c context.Context, t Transport, r []*url.URL, depth, maxDepth int) (actors []vocab.Type, err error) {
|
|
|
|
if maxDepth > 0 && depth >= maxDepth {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
for _, u := range r {
|
|
|
|
var act vocab.Type
|
|
|
|
var more []*url.URL
|
|
|
|
act, more, err = a.dereferenceForResolvingInboxes(c, t, u)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
var recurActors []vocab.Type
|
|
|
|
recurActors, err = a.resolveInboxes(c, t, more, depth+1, maxDepth)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
actors = append(actors, act)
|
|
|
|
actors = append(actors, recurActors...)
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// dereferenceForResolvingInboxes dereferences an IRI solely for finding an
|
|
|
|
// actor's inbox IRI to deliver to.
|
|
|
|
func (a *sideEffectActor) dereferenceForResolvingInboxes(c context.Context, t Transport, actorIRI *url.URL) (actor vocab.Type, moreActorIRIs []*url.URL, err error) {
|
|
|
|
var resp []byte
|
|
|
|
resp, err = t.Dereference(c, actorIRI)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
var m map[string]interface{}
|
|
|
|
if err = json.Unmarshal(resp, &m); err != nil {
|
|
|
|
return
|
|
|
|
}
|
2019-02-16 06:47:59 +09:00
|
|
|
actor, err = streams.ToType(c, m)
|
2019-02-12 08:16:33 +09:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// Attempt to see if the 'actor' is really some sort of type that has
|
|
|
|
// an 'items' or 'orderedItems' property.
|
|
|
|
if v, ok := actor.(itemser); ok {
|
|
|
|
i := v.GetActivityStreamsItems()
|
|
|
|
for iter := i.Begin(); iter != i.End(); iter = iter.Next() {
|
|
|
|
var id *url.URL
|
|
|
|
id, err = ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
moreActorIRIs = append(moreActorIRIs, id)
|
|
|
|
}
|
|
|
|
} else if v, ok := actor.(orderedItemser); ok {
|
|
|
|
i := v.GetActivityStreamsOrderedItems()
|
|
|
|
for iter := i.Begin(); iter != i.End(); iter = iter.Next() {
|
|
|
|
var id *url.URL
|
|
|
|
id, err = ToId(iter)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
moreActorIRIs = append(moreActorIRIs, id)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|