From 15918cba8cbeafac069736f4c6aaefba6bbc2915 Mon Sep 17 00:00:00 2001 From: Daniel Supernault Date: Wed, 11 Sep 2024 00:42:42 -0600 Subject: [PATCH] Refactor MovePipeline follow/undo follow to their own separate jobs, outside of the job chain --- .../MoveMigrateFollowersPipeline.php | 56 ++------- .../MovePipeline/MoveSendFollowPipeline.php | 113 +++++++++++++++++ .../MoveSendUndoFollowPipeline.php | 119 ++++++++++++++++++ .../UnfollowLegacyAccountMovePipeline.php | 58 +-------- 4 files changed, 248 insertions(+), 98 deletions(-) create mode 100644 app/Jobs/MovePipeline/MoveSendFollowPipeline.php create mode 100644 app/Jobs/MovePipeline/MoveSendUndoFollowPipeline.php diff --git a/app/Jobs/MovePipeline/MoveMigrateFollowersPipeline.php b/app/Jobs/MovePipeline/MoveMigrateFollowersPipeline.php index 2f187ca70..1cde3818e 100644 --- a/app/Jobs/MovePipeline/MoveMigrateFollowersPipeline.php +++ b/app/Jobs/MovePipeline/MoveMigrateFollowersPipeline.php @@ -4,12 +4,9 @@ namespace App\Jobs\MovePipeline; use App\Follower; use App\Util\ActivityPub\Helpers; -use App\Util\ActivityPub\HttpSignature; use DateTime; use DB; use Exception; -use GuzzleHttp\Client; -use GuzzleHttp\Pool; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Queue\Queueable; use Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis; @@ -116,52 +113,19 @@ class MoveMigrateFollowersPipeline implements ShouldQueue ->whereNotNull('profiles.user_id') ->whereNull('profiles.deleted_at') ->select('profiles.id', 'profiles.user_id', 'profiles.username', 'profiles.private_key', 'profiles.status') - ->chunkById(100, function ($followers) use ($addlHeaders, $targetInbox, $targetPid, $target) { - $client = new Client([ - 'timeout' => config('federation.activitypub.delivery.timeout'), - ]); - $requests = function ($followers) use ($client, $target, $addlHeaders, $targetInbox, $targetPid) { - $activity = [ - '@context' => 'https://www.w3.org/ns/activitystreams', - 'type' => 'Follow', - 'actor' => null, - 'object' => $target, - ]; - foreach ($followers as $follower) { - if (! $follower->private_key || ! $follower->username || ! $follower->user_id || $follower->status === 'delete') { - continue; - } - $permalink = 'https://'.config('pixelfed.domain.app').'/users/'.$follower->username; - $activity['actor'] = $permalink; - $keyId = $permalink.'#main-key'; - $payload = json_encode($activity); - $url = $targetInbox; - $headers = HttpSignature::signRaw($follower->private_key, $keyId, $targetInbox, $activity, $addlHeaders); - Follower::updateOrCreate([ - 'profile_id' => $follower->id, - 'following_id' => $targetPid, - ]); - yield function () use ($client, $url, $headers, $payload) { - return $client->postAsync($url, [ - 'curl' => [ - CURLOPT_HTTPHEADER => $headers, - CURLOPT_POSTFIELDS => $payload, - CURLOPT_HEADER => true, - ], - ]); - }; + ->chunkById(100, function ($followers) use ($targetInbox, $targetPid, $target) { + foreach ($followers as $follower) { + if (! $follower->private_key || ! $follower->username || ! $follower->user_id || $follower->status === 'delete') { + continue; } - }; - $pool = new Pool($client, $requests($followers), [ - 'concurrency' => config('federation.activitypub.delivery.concurrency'), - 'fulfilled' => function ($response, $index) {}, - 'rejected' => function ($reason, $index) {}, - ]); + Follower::updateOrCreate([ + 'profile_id' => $follower->id, + 'following_id' => $targetPid, + ]); - $promise = $pool->promise(); - - $promise->wait(); + MoveSendFollowPipeline::dispatch($follower, $targetInbox, $targetPid, $target)->onQueue('follow'); + } }, 'id'); } } diff --git a/app/Jobs/MovePipeline/MoveSendFollowPipeline.php b/app/Jobs/MovePipeline/MoveSendFollowPipeline.php new file mode 100644 index 000000000..1bccf458b --- /dev/null +++ b/app/Jobs/MovePipeline/MoveSendFollowPipeline.php @@ -0,0 +1,113 @@ + + */ + public function middleware(): array + { + return [ + new WithoutOverlapping('move-send-follow:'.$this->follower->id.':target:'.$this->target), + (new ThrottlesExceptions(2, 5 * 60))->backoff(5), + ]; + } + + /** + * Create a new job instance. + */ + public function __construct($follower, $targetInbox, $targetPid, $target) + { + $this->follower = $follower; + $this->targetInbox = $targetInbox; + $this->targetPid = $targetPid; + $this->target = $target; + } + + /** + * Execute the job. + */ + public function handle(): void + { + $follower = $this->follower; + $targetPid = $this->targetPid; + $targetInbox = $this->targetInbox; + $target = $this->target; + + if (! $follower->username || ! $follower->private_key) { + return; + } + + $permalink = 'https://'.config('pixelfed.domain.app').'/users/'.$follower->username; + $version = config('pixelfed.version'); + $appUrl = config('app.url'); + $userAgent = "(Pixelfed/{$version}; +{$appUrl})"; + $addlHeaders = [ + 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', + 'User-Agent' => $userAgent, + ]; + + $activity = [ + '@context' => 'https://www.w3.org/ns/activitystreams', + 'type' => 'Follow', + 'actor' => $permalink, + 'object' => $target, + ]; + + $keyId = $permalink.'#main-key'; + $payload = json_encode($activity); + $headers = HttpSignature::signRaw($follower->private_key, $keyId, $targetInbox, $activity, $addlHeaders); + + $client = new Client([ + 'timeout' => config('federation.activitypub.delivery.timeout'), + ]); + + try { + $client->post($targetInbox, [ + 'curl' => [ + CURLOPT_HTTPHEADER => $headers, + CURLOPT_POSTFIELDS => $payload, + CURLOPT_HEADER => true, + ], + ]); + } catch (ClientException $e) { + + } + } +} diff --git a/app/Jobs/MovePipeline/MoveSendUndoFollowPipeline.php b/app/Jobs/MovePipeline/MoveSendUndoFollowPipeline.php new file mode 100644 index 000000000..523afab90 --- /dev/null +++ b/app/Jobs/MovePipeline/MoveSendUndoFollowPipeline.php @@ -0,0 +1,119 @@ + + */ + public function middleware(): array + { + return [ + new WithoutOverlapping('move-send-unfollow:'.$this->follower->id.':actor:'.$this->actor), + (new ThrottlesExceptions(2, 5 * 60))->backoff(5), + ]; + } + + /** + * Create a new job instance. + */ + public function __construct($follower, $targetInbox, $targetPid, $actor) + { + $this->follower = $follower; + $this->targetInbox = $targetInbox; + $this->targetPid = $targetPid; + $this->actor = $actor; + } + + /** + * Execute the job. + */ + public function handle(): void + { + $follower = $this->follower; + $targetPid = $this->targetPid; + $targetInbox = $this->targetInbox; + $actor = $this->actor; + + if (! $follower->username || ! $follower->private_key) { + return; + } + + $permalink = 'https://'.config('pixelfed.domain.app').'/users/'.$follower->username; + $version = config('pixelfed.version'); + $appUrl = config('app.url'); + $userAgent = "(Pixelfed/{$version}; +{$appUrl})"; + $addlHeaders = [ + 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', + 'User-Agent' => $userAgent, + ]; + + $activity = [ + '@context' => 'https://www.w3.org/ns/activitystreams', + 'type' => 'Undo', + 'id' => $permalink.'#follow/'.$targetPid.'/undo', + 'actor' => $permalink, + 'object' => [ + 'type' => 'Follow', + 'id' => $permalink.'#follows/'.$targetPid, + 'object' => $actor, + 'actor' => $permalink, + ], + ]; + + $keyId = $permalink.'#main-key'; + $payload = json_encode($activity); + $headers = HttpSignature::signRaw($follower->private_key, $keyId, $targetInbox, $activity, $addlHeaders); + + $client = new Client([ + 'timeout' => config('federation.activitypub.delivery.timeout'), + ]); + + try { + $client->post($targetInbox, [ + 'curl' => [ + CURLOPT_HTTPHEADER => $headers, + CURLOPT_POSTFIELDS => $payload, + CURLOPT_HEADER => true, + ], + ]); + } catch (ClientException $e) { + + } + } +} diff --git a/app/Jobs/MovePipeline/UnfollowLegacyAccountMovePipeline.php b/app/Jobs/MovePipeline/UnfollowLegacyAccountMovePipeline.php index db306e6ef..47ed2aeb6 100644 --- a/app/Jobs/MovePipeline/UnfollowLegacyAccountMovePipeline.php +++ b/app/Jobs/MovePipeline/UnfollowLegacyAccountMovePipeline.php @@ -3,12 +3,9 @@ namespace App\Jobs\MovePipeline; use App\Util\ActivityPub\Helpers; -use App\Util\ActivityPub\HttpSignature; use DateTime; use DB; use Exception; -use GuzzleHttp\Client; -use GuzzleHttp\Pool; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Queue\Queueable; use Illuminate\Queue\Middleware\ThrottlesExceptions; @@ -101,57 +98,14 @@ class UnfollowLegacyAccountMovePipeline implements ShouldQueue ->whereNotNull('profiles.user_id') ->whereNull('profiles.deleted_at') ->select('profiles.id', 'profiles.user_id', 'profiles.username', 'profiles.private_key', 'profiles.status') - ->chunkById(100, function ($followers) use ($actor, $addlHeaders, $targetInbox, $targetPid) { - $client = new Client([ - 'timeout' => config('federation.activitypub.delivery.timeout'), - ]); - $requests = function ($followers) use ($client, $actor, $addlHeaders, $targetInbox, $targetPid) { - $activity = [ - '@context' => 'https://www.w3.org/ns/activitystreams', - 'type' => 'Undo', - 'id' => null, - 'actor' => null, - 'object' => [ - 'type' => 'Follow', - 'id' => null, - 'object' => $actor, - 'actor' => null, - ], - ]; - foreach ($followers as $follower) { - if (! $follower->private_key || ! $follower->username || ! $follower->user_id || $follower->status === 'delete') { - continue; - } - $permalink = 'https://'.config('pixelfed.domain.app').'/users/'.$follower->username; - $activity['id'] = $permalink.'#follow/'.$targetPid.'/undo'; - $activity['actor'] = $permalink; - $activity['object']['id'] = $permalink.'#follows/'.$targetPid; - $activity['object']['actor'] = $permalink; - $keyId = $permalink.'#main-key'; - $payload = json_encode($activity); - $url = $targetInbox; - $headers = HttpSignature::signRaw($follower->private_key, $keyId, $targetInbox, $activity, $addlHeaders); - yield function () use ($client, $url, $headers, $payload) { - return $client->postAsync($url, [ - 'curl' => [ - CURLOPT_HTTPHEADER => $headers, - CURLOPT_POSTFIELDS => $payload, - CURLOPT_HEADER => true, - ], - ]); - }; + ->chunkById(100, function ($followers) use ($actor, $targetInbox, $targetPid) { + foreach ($followers as $follower) { + if (! $follower->id || ! $follower->private_key || ! $follower->username || ! $follower->user_id || $follower->status === 'delete') { + continue; } - }; - $pool = new Pool($client, $requests($followers), [ - 'concurrency' => config('federation.activitypub.delivery.concurrency'), - 'fulfilled' => function ($response, $index) {}, - 'rejected' => function ($reason, $index) {}, - ]); - - $promise = $pool->promise(); - - $promise->wait(); + MoveSendUndoFollowPipeline::dispatch($follower, $targetInbox, $targetPid, $actor)->onQueue('move'); + } }, 'id'); } }