From 9904d05fbf2a84872fa3b2be54eb3f140b7aa699 Mon Sep 17 00:00:00 2001 From: Daniel Supernault Date: Sat, 7 Sep 2024 01:57:03 -0600 Subject: [PATCH] Add MovePipeline queue jobs --- .../CleanupLegacyAccountMovePipeline.php | 95 +++++++++++ .../MoveMigrateFollowersPipeline.php | 157 +++++++++++++++++ .../UnfollowLegacyAccountMovePipeline.php | 161 ++++++++++++++++++ 3 files changed, 413 insertions(+) create mode 100644 app/Jobs/MovePipeline/CleanupLegacyAccountMovePipeline.php create mode 100644 app/Jobs/MovePipeline/MoveMigrateFollowersPipeline.php create mode 100644 app/Jobs/MovePipeline/UnfollowLegacyAccountMovePipeline.php diff --git a/app/Jobs/MovePipeline/CleanupLegacyAccountMovePipeline.php b/app/Jobs/MovePipeline/CleanupLegacyAccountMovePipeline.php new file mode 100644 index 000000000..5e99c95bd --- /dev/null +++ b/app/Jobs/MovePipeline/CleanupLegacyAccountMovePipeline.php @@ -0,0 +1,95 @@ +target = $target; + $this->activity = $activity; + } + + /** + * Get the middleware the job should pass through. + * + * @return array + */ + public function middleware(): array + { + return [ + new WithoutOverlapping('process-move-cleanup-legacy-followers:'.$this->activity), + (new ThrottlesExceptions(2, 5 * 60))->backoff(5), + ]; + } + + /** + * Determine the time at which the job should timeout. + */ + public function retryUntil(): DateTime + { + return now()->addMinutes(15); + } + + /** + * Get the middleware the job should pass through. + * + * @return array + */ + public function middleware(): array + { + return [new WithoutOverlapping($this->target)]; + } + + /** + * Execute the job. + */ + public function handle(): void + { + if (config('app.env') !== 'production' || (bool) config_cache('federation.activitypub.enabled') == false) { + throw new Exception('Activitypub not enabled'); + } + + $target = $this->target; + $actor = $this->activity; + + $targetAccount = Helpers::profileFetch($target); + $actorAccount = Helpers::profileFetch($actor); + + if (! $targetAccount || ! $actorAccount) { + throw new Exception('Invalid move accounts'); + } + + Follower::whereFollowingId($actorAccount['id'])->delete(); + } +} diff --git a/app/Jobs/MovePipeline/MoveMigrateFollowersPipeline.php b/app/Jobs/MovePipeline/MoveMigrateFollowersPipeline.php new file mode 100644 index 000000000..306a8f775 --- /dev/null +++ b/app/Jobs/MovePipeline/MoveMigrateFollowersPipeline.php @@ -0,0 +1,157 @@ +target = $target; + $this->activity = $activity; + } + + /** + * Get the middleware the job should pass through. + * + * @return array + */ + public function middleware(): array + { + return [ + new WithoutOverlapping('process-move-migrate-followers:'.$this->target), + (new ThrottlesExceptions(2, 5 * 60))->backoff(5), + ]; + } + + /** + * Determine the time at which the job should timeout. + */ + public function retryUntil(): DateTime + { + return now()->addMinutes(15); + } + + /** + * Get the middleware the job should pass through. + * + * @return array + */ + public function middleware(): array + { + return [new WithoutOverlapping($this->target)]; + } + + /** + * Execute the job. + */ + public function handle(): void + { + if (config('app.env') !== 'production' || (bool) config_cache('federation.activitypub.enabled') == false) { + throw new Exception('Activitypub not enabled'); + } + + $target = $this->target; + $actor = $this->activity; + + $targetAccount = Helpers::profileFetch($target); + $actorAccount = Helpers::profileFetch($actor); + + if (! $targetAccount || ! $actorAccount) { + throw new Exception('Invalid move accounts'); + } + + $activity = [ + '@context' => 'https://www.w3.org/ns/activitystreams', + 'type' => 'Follow', + 'actor' => null, + 'object' => $target, + ]; + + $version = config('pixelfed.version'); + $appUrl = config('app.url'); + $userAgent = "(Pixelfed/{$version}; +{$appUrl})"; + $addlHeaders = [ + 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', + 'User-Agent' => $userAgent, + ]; + $targetInbox = $targetAccount['sharedInbox'] ?? $targetAccount['inbox_url']; + $targetPid = $targetAccount['id']; + + DB::table('followers') + ->join('profiles', 'followers.profile_id', '=', 'profiles.id') + ->where('followers.following_id', $actorAccount['id']) + ->whereNotNull('profiles.user_id') + ->whereNull('profiles.deleted_at') + ->select('profiles.id', 'profiles.user_id', 'profiles.username', 'profiles.private_key') + ->chunkById(100, function ($followers) use ($activity, $addlHeaders, $targetInbox, $targetPid) { + $client = new Client([ + 'timeout' => config('federation.activitypub.delivery.timeout'), + ]); + $requests = function ($followers) use ($client, $activity, $addlHeaders, $targetInbox, $targetPid) { + foreach ($followers as $follower) { + $permalink = 'https://'.config('pixelfed.domain.app').'/users/'.$follower->username; + $activity['actor'] = $permalink; + $keyId = $permalink.'#main-key'; + $payload = json_encode($activity); + $url = $targetInbox; + $headers = HttpSignature::signRaw($follower->private_key, $keyId, $targetInbox, $activity, $addlHeaders); + Follower::updateOrCreate([ + 'profile_id' => $follower->id, + 'following_id' => $targetPid, + ]); + yield new $client->postAsync($url, [ + 'curl' => [ + CURLOPT_HTTPHEADER => $headers, + CURLOPT_POSTFIELDS => $payload, + CURLOPT_HEADER => true, + ], + ]); + } + }; + + $pool = new Pool($client, $requests($followers), [ + 'concurrency' => config('federation.activitypub.delivery.concurrency'), + 'fulfilled' => function ($response, $index) {}, + 'rejected' => function ($reason, $index) {}, + ]); + + $promise = $pool->promise(); + + $promise->wait(); + }, 'id'); + } +} diff --git a/app/Jobs/MovePipeline/UnfollowLegacyAccountMovePipeline.php b/app/Jobs/MovePipeline/UnfollowLegacyAccountMovePipeline.php new file mode 100644 index 000000000..a5f687fa0 --- /dev/null +++ b/app/Jobs/MovePipeline/UnfollowLegacyAccountMovePipeline.php @@ -0,0 +1,161 @@ +target = $target; + $this->activity = $activity; + } + + /** + * Get the middleware the job should pass through. + * + * @return array + */ + public function middleware(): array + { + return [ + new WithoutOverlapping('process-move-undo-legacy-followers:'.$this->activity), + (new ThrottlesExceptions(2, 5 * 60))->backoff(5), + ]; + } + + /** + * Determine the time at which the job should timeout. + */ + public function retryUntil(): DateTime + { + return now()->addMinutes(15); + } + + /** + * Get the middleware the job should pass through. + * + * @return array + */ + public function middleware(): array + { + return [new WithoutOverlapping($this->target)]; + } + + /** + * Execute the job. + */ + public function handle(): void + { + if (config('app.env') !== 'production' || (bool) config_cache('federation.activitypub.enabled') == false) { + throw new Exception('Activitypub not enabled'); + } + + $target = $this->target; + $actor = $this->activity; + + $targetAccount = Helpers::profileFetch($target); + $actorAccount = Helpers::profileFetch($actor); + + if (! $targetAccount || ! $actorAccount) { + throw new Exception('Invalid move accounts'); + } + + $activity = [ + '@context' => 'https://www.w3.org/ns/activitystreams', + 'type' => 'Undo', + 'id' => null, + 'actor' => null, + 'object' => [ + 'type' => 'Follow', + 'id' => null, + 'object' => $actor, + 'actor' => null, + ], + ]; + + $version = config('pixelfed.version'); + $appUrl = config('app.url'); + $userAgent = "(Pixelfed/{$version}; +{$appUrl})"; + $addlHeaders = [ + 'Content-Type' => 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', + 'User-Agent' => $userAgent, + ]; + $targetInbox = $actorAccount['sharedInbox'] ?? $actorAccount['inbox_url']; + $targetPid = $actorAccount['id']; + + DB::table('followers') + ->join('profiles', 'followers.profile_id', '=', 'profiles.id') + ->where('followers.following_id', $actorAccount['id']) + ->whereNotNull('profiles.user_id') + ->whereNull('profiles.deleted_at') + ->select('profiles.id', 'profiles.user_id', 'profiles.username', 'profiles.private_key') + ->chunkById(100, function ($followers) use ($activity, $addlHeaders, $targetInbox, $targetPid) { + $client = new Client([ + 'timeout' => config('federation.activitypub.delivery.timeout'), + ]); + $requests = function ($followers) use ($client, $activity, $addlHeaders, $targetInbox, $targetPid) { + foreach ($followers as $follower) { + $permalink = 'https://'.config('pixelfed.domain.app').'/users/'.$follower->username; + $activity['id'] = $permalink.'#follow/'.$targetPid.'/undo'; + $activity['actor'] = $permalink; + $activity['object']['id'] = $permalink.'#follows/'.$targetPid; + $activity['object']['actor'] = $permalink; + $keyId = $permalink.'#main-key'; + $payload = json_encode($activity); + $url = $targetInbox; + $headers = HttpSignature::signRaw($follower->private_key, $keyId, $targetInbox, $activity, $addlHeaders); + yield new $client->postAsync($url, [ + 'curl' => [ + CURLOPT_HTTPHEADER => $headers, + CURLOPT_POSTFIELDS => $payload, + CURLOPT_HEADER => true, + ], + ]); + } + }; + + $pool = new Pool($client, $requests($followers), [ + 'concurrency' => config('federation.activitypub.delivery.concurrency'), + 'fulfilled' => function ($response, $index) {}, + 'rejected' => function ($reason, $index) {}, + ]); + + $promise = $pool->promise(); + + $promise->wait(); + }, 'id'); + } +}