From ce63c4997b5eb4406696953cdfa84ed1e8419aaa Mon Sep 17 00:00:00 2001 From: Daniel Supernault Date: Sun, 12 Nov 2023 20:54:32 -0700 Subject: [PATCH] Add Feed fanout --- .../HomeFeedPipeline/FeedInsertPipeline.php | 21 +++--- .../HomeFeedPipeline/FeedRemovePipeline.php | 73 +++++++++++++++++++ app/Jobs/StatusPipeline/StatusEntityLexer.php | 26 +++++-- app/Observers/StatusObserver.php | 5 ++ 4 files changed, 107 insertions(+), 18 deletions(-) create mode 100644 app/Jobs/HomeFeedPipeline/FeedRemovePipeline.php diff --git a/app/Jobs/HomeFeedPipeline/FeedInsertPipeline.php b/app/Jobs/HomeFeedPipeline/FeedInsertPipeline.php index 6ccdc18ef..f0455c638 100644 --- a/app/Jobs/HomeFeedPipeline/FeedInsertPipeline.php +++ b/app/Jobs/HomeFeedPipeline/FeedInsertPipeline.php @@ -10,8 +10,6 @@ use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\Middleware\WithoutOverlapping; use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing; -use App\Status; -use App\Follower; use App\Services\FollowerService; use App\Services\HomeTimelineService; @@ -19,7 +17,8 @@ class FeedInsertPipeline implements ShouldQueue, ShouldBeUniqueUntilProcessing { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; - protected $status; + protected $sid; + protected $pid; public $timeout = 900; public $tries = 3; @@ -38,7 +37,7 @@ class FeedInsertPipeline implements ShouldQueue, ShouldBeUniqueUntilProcessing */ public function uniqueId(): string { - return 'hfp:f-insert:sid:' . $this->status->id; + return 'hts:feed:insert:sid:' . $this->sid; } /** @@ -48,15 +47,16 @@ class FeedInsertPipeline implements ShouldQueue, ShouldBeUniqueUntilProcessing */ public function middleware(): array { - return [(new WithoutOverlapping("hfp:f-insert:sid:{$this->status->id}"))->shared()->dontRelease()]; + return [(new WithoutOverlapping("hts:feed:insert:sid:{$this->sid}"))->shared()->dontRelease()]; } /** * Create a new job instance. */ - public function __construct(Status $status) + public function __construct($sid, $pid) { - $this->status = $status; + $this->sid = $sid; + $this->pid = $pid; } /** @@ -64,13 +64,10 @@ class FeedInsertPipeline implements ShouldQueue, ShouldBeUniqueUntilProcessing */ public function handle(): void { - $status = $this->status; - $sid = $status->id; - $pid = $status->profile_id; - $ids = FollowerService::localFollowerIds($pid); + $ids = FollowerService::localFollowerIds($this->pid); foreach($ids as $id) { - HomeTimelineService::add($id, $sid); + HomeTimelineService::add($id, $this->sid); } } } diff --git a/app/Jobs/HomeFeedPipeline/FeedRemovePipeline.php b/app/Jobs/HomeFeedPipeline/FeedRemovePipeline.php new file mode 100644 index 000000000..745907084 --- /dev/null +++ b/app/Jobs/HomeFeedPipeline/FeedRemovePipeline.php @@ -0,0 +1,73 @@ +sid; + } + + /** + * Get the middleware the job should pass through. + * + * @return array + */ + public function middleware(): array + { + return [(new WithoutOverlapping("hts:feed:remove:sid:{$this->sid}"))->shared()->dontRelease()]; + } + + /** + * Create a new job instance. + */ + public function __construct($sid, $pid) + { + $this->sid = $sid; + $this->pid = $pid; + } + + /** + * Execute the job. + */ + public function handle(): void + { + $ids = FollowerService::localFollowerIds($this->pid); + + foreach($ids as $id) { + HomeTimelineService::rem($id, $this->sid); + } + } +} diff --git a/app/Jobs/StatusPipeline/StatusEntityLexer.php b/app/Jobs/StatusPipeline/StatusEntityLexer.php index 2bbc92102..0a913d793 100644 --- a/app/Jobs/StatusPipeline/StatusEntityLexer.php +++ b/app/Jobs/StatusPipeline/StatusEntityLexer.php @@ -21,6 +21,8 @@ use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; use App\Services\UserFilterService; use App\Services\AdminShadowFilterService; +use App\Jobs\HomeFeedPipeline\FeedInsertPipeline; +use App\Jobs\HomeFeedPipeline\HashtagInsertFanoutPipeline; class StatusEntityLexer implements ShouldQueue { @@ -105,12 +107,12 @@ class StatusEntityLexer implements ShouldQueue } DB::transaction(function () use ($status, $tag) { $slug = str_slug($tag, '-', false); - $hashtag = Hashtag::where('slug', $slug)->first(); - if (!$hashtag) { - $hashtag = Hashtag::create( - ['name' => $tag, 'slug' => $slug] - ); - } + + $hashtag = Hashtag::firstOrCreate([ + 'slug' => $slug + ], [ + 'name' => $tag + ]); StatusHashtag::firstOrCreate( [ @@ -150,6 +152,18 @@ class StatusEntityLexer implements ShouldQueue MentionPipeline::dispatch($status, $m); }); } + $this->fanout(); + } + + public function fanout() + { + $status = $this->status; + + if(config('exp.cached_home_timeline')) { + if($status->in_reply_to_id == null && in_array($status->scope, ['public', 'unlisted', 'private'])) { + FeedInsertPipeline::dispatch($status->id, $status->profile_id)->onQueue('feed'); + } + } $this->deliver(); } diff --git a/app/Observers/StatusObserver.php b/app/Observers/StatusObserver.php index e58997165..9b9f8e4f1 100644 --- a/app/Observers/StatusObserver.php +++ b/app/Observers/StatusObserver.php @@ -7,6 +7,7 @@ use App\Services\ProfileStatusService; use Cache; use App\Models\ImportPost; use App\Services\ImportService; +use App\Jobs\HomeFeedPipeline\FeedRemovePipeline; class StatusObserver { @@ -63,6 +64,10 @@ class StatusObserver ImportPost::whereProfileId($status->profile_id)->whereStatusId($status->id)->delete(); ImportService::clearImportedFiles($status->profile_id); } + + if(config('exp.cached_home_timeline')) { + FeedRemovePipeline::dispatch($status->id, $status->profile_id)->onQueue('feed'); + } } /**