diff --git a/app/Jobs/FollowPipeline/FollowServiceWarmCache.php b/app/Jobs/FollowPipeline/FollowServiceWarmCache.php index cabea9958..990236f69 100644 --- a/app/Jobs/FollowPipeline/FollowServiceWarmCache.php +++ b/app/Jobs/FollowPipeline/FollowServiceWarmCache.php @@ -8,10 +8,13 @@ use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; +use Illuminate\Queue\Middleware\WithoutOverlapping; use App\Services\AccountService; use App\Services\FollowerService; use Cache; use DB; +use Storage; +use App\Follower; use App\Profile; class FollowServiceWarmCache implements ShouldQueue @@ -23,6 +26,16 @@ class FollowServiceWarmCache implements ShouldQueue public $timeout = 5000; public $failOnTimeout = false; + /** + * Get the middleware the job should pass through. + * + * @return array + */ + public function middleware(): array + { + return [(new WithoutOverlapping($this->profileId))->dontRelease()]; + } + /** * Create a new job instance. * @@ -42,6 +55,10 @@ class FollowServiceWarmCache implements ShouldQueue { $id = $this->profileId; + if(Cache::has(FollowerService::FOLLOWERS_SYNC_KEY . $id) && Cache::has(FollowerService::FOLLOWING_SYNC_KEY . $id)) { + return; + } + $account = AccountService::get($id, true); if(!$account) { @@ -50,25 +67,43 @@ class FollowServiceWarmCache implements ShouldQueue return; } - DB::table('followers') - ->select('id', 'following_id', 'profile_id') - ->whereFollowingId($id) - ->orderBy('id') - ->chunk(200, function($followers) use($id) { - foreach($followers as $follow) { - FollowerService::add($follow->profile_id, $id); - } - }); + $hasFollowerPostProcessing = false; + $hasFollowingPostProcessing = false; - DB::table('followers') - ->select('id', 'following_id', 'profile_id') - ->whereProfileId($id) - ->orderBy('id') - ->chunk(200, function($followers) use($id) { - foreach($followers as $follow) { - FollowerService::add($id, $follow->following_id); - } - }); + if(Follower::whereProfileId($id)->orWhere('following_id', $id)->count()) { + $following = []; + $followers = []; + foreach(Follower::lazy() as $follow) { + if($follow->following_id != $id && $follow->profile_id != $id) { + continue; + } + if($follow->profile_id == $id) { + $following[] = $follow->following_id; + } else { + $followers[] = $follow->profile_id; + } + } + + if(count($followers) > 100) { + // store follower ids and process in another job + Storage::put('follow-warm-cache/' . $id . '/followers.json', json_encode($followers)); + $hasFollowerPostProcessing = true; + } else { + foreach($followers as $follower) { + FollowerService::add($follower, $id); + } + } + + if(count($following) > 100) { + // store following ids and process in another job + Storage::put('follow-warm-cache/' . $id . '/following.json', json_encode($following)); + $hasFollowingPostProcessing = true; + } else { + foreach($following as $following) { + FollowerService::add($id, $following); + } + } + } Cache::put(FollowerService::FOLLOWERS_SYNC_KEY . $id, 1, 604800); Cache::put(FollowerService::FOLLOWING_SYNC_KEY . $id, 1, 604800); @@ -82,6 +117,14 @@ class FollowServiceWarmCache implements ShouldQueue AccountService::del($id); + if($hasFollowingPostProcessing) { + FollowServiceWarmCacheLargeIngestPipeline::dispatch($id, 'following')->onQueue('follow'); + } + + if($hasFollowerPostProcessing) { + FollowServiceWarmCacheLargeIngestPipeline::dispatch($id, 'followers')->onQueue('follow'); + } + return; } } diff --git a/app/Jobs/FollowPipeline/FollowServiceWarmCacheLargeIngestPipeline.php b/app/Jobs/FollowPipeline/FollowServiceWarmCacheLargeIngestPipeline.php new file mode 100644 index 000000000..3299bf7a4 --- /dev/null +++ b/app/Jobs/FollowPipeline/FollowServiceWarmCacheLargeIngestPipeline.php @@ -0,0 +1,88 @@ +profileId = $profileId; + $this->followType = $followType; + } + + /** + * Execute the job. + * + * @return void + */ + public function handle() + { + $pid = $this->profileId; + $type = $this->followType; + + if($type === 'followers') { + $key = 'follow-warm-cache/' . $pid . '/followers.json'; + if(!Storage::exists($key)) { + return; + } + $file = Storage::get($key); + $json = json_decode($file, true); + + foreach($json as $id) { + FollowerService::add($id, $pid, false); + usleep(random_int(500, 3000)); + } + sleep(5); + Storage::delete($key); + } + + if($type === 'following') { + $key = 'follow-warm-cache/' . $pid . '/following.json'; + if(!Storage::exists($key)) { + return; + } + $file = Storage::get($key); + $json = json_decode($file, true); + + foreach($json as $id) { + FollowerService::add($pid, $id, false); + usleep(random_int(500, 3000)); + } + sleep(5); + Storage::delete($key); + } + + sleep(random_int(2, 5)); + $files = Storage::files('follow-warm-cache/' . $pid); + if(empty($files)) { + Storage::deleteDirectory('follow-warm-cache/' . $pid); + } + } +}