Merge pull request #4220 from pixelfed/staging

Update FollowPipeline, fix followers_count and following_count counters
This commit is contained in:
daniel 2023-03-03 04:13:06 -07:00 committed by GitHub
commit f027f6c700
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 213 additions and 88 deletions

View file

@ -54,6 +54,7 @@ use App\Jobs\SharePipeline\UndoSharePipeline;
use App\Jobs\StatusPipeline\NewStatusPipeline; use App\Jobs\StatusPipeline\NewStatusPipeline;
use App\Jobs\StatusPipeline\StatusDelete; use App\Jobs\StatusPipeline\StatusDelete;
use App\Jobs\FollowPipeline\FollowPipeline; use App\Jobs\FollowPipeline\FollowPipeline;
use App\Jobs\FollowPipeline\UnfollowPipeline;
use App\Jobs\ImageOptimizePipeline\ImageOptimize; use App\Jobs\ImageOptimizePipeline\ImageOptimize;
use App\Jobs\VideoPipeline\{ use App\Jobs\VideoPipeline\{
VideoOptimize, VideoOptimize,
@ -707,8 +708,7 @@ class ApiV1Controller extends Controller
if($remote == true && config('federation.activitypub.remoteFollow') == true) { if($remote == true && config('federation.activitypub.remoteFollow') == true) {
(new FollowerController())->sendFollow($user->profile, $target); (new FollowerController())->sendFollow($user->profile, $target);
} }
FollowPipeline::dispatch($follower); FollowPipeline::dispatch($follower)->onQueue('high');
$target->increment('followers_count');
} }
RelationshipService::refresh($user->profile_id, $target->id); RelationshipService::refresh($user->profile_id, $target->id);
@ -769,25 +769,11 @@ class ApiV1Controller extends Controller
return $this->json($res); return $this->json($res);
} }
if($user->profile->following_count) {
$user->profile->decrement('following_count');
}
FollowRequest::whereFollowerId($user->profile_id)
->whereFollowingId($target->id)
->delete();
Follower::whereProfileId($user->profile_id) Follower::whereProfileId($user->profile_id)
->whereFollowingId($target->id) ->whereFollowingId($target->id)
->delete(); ->delete();
if(config('instance.timeline.home.cached')) { UnfollowPipeline::dispatch($user->profile_id, $target->id)->onQueue('high');
Cache::forget('pf:timelines:home:' . $user->profile_id);
}
FollowerService::remove($user->profile_id, $target->id);
$target->decrement('followers_count');
if($remote == true && config('federation.activitypub.remoteFollow') == true) { if($remote == true && config('federation.activitypub.remoteFollow') == true) {
(new FollowerController())->sendUndoFollow($user->profile, $target); (new FollowerController())->sendUndoFollow($user->profile, $target);

View file

@ -2,6 +2,7 @@
namespace App\Jobs\FollowPipeline; namespace App\Jobs\FollowPipeline;
use App\Follower;
use App\Notification; use App\Notification;
use Cache; use Cache;
use Illuminate\Bus\Queueable; use Illuminate\Bus\Queueable;
@ -11,6 +12,8 @@ use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels; use Illuminate\Queue\SerializesModels;
use Log; use Log;
use Illuminate\Support\Facades\Redis; use Illuminate\Support\Facades\Redis;
use App\Services\AccountService;
use App\Services\FollowerService;
class FollowPipeline implements ShouldQueue class FollowPipeline implements ShouldQueue
{ {
@ -46,9 +49,45 @@ class FollowPipeline implements ShouldQueue
$actor = $follower->actor; $actor = $follower->actor;
$target = $follower->target; $target = $follower->target;
if(!$actor || !$target) {
return;
}
Cache::forget('profile:following:' . $actor->id); Cache::forget('profile:following:' . $actor->id);
Cache::forget('profile:following:' . $target->id); Cache::forget('profile:following:' . $target->id);
FollowerService::add($actor->id, $target->id);
$actorProfileSync = Cache::get(FollowerService::FOLLOWING_SYNC_KEY . $actor->id);
if(!$actorProfileSync) {
FollowServiceWarmCache::dispatch($actor->id)->onQueue('low');
} else {
if($actor->following_count) {
$actor->increment('following_count');
} else {
$count = Follower::whereProfileId($actor->id)->count();
$actor->following_count = $count;
$actor->save();
}
Cache::put(FollowerService::FOLLOWING_SYNC_KEY . $actor->id, 1, 604800);
AccountService::del($actor->id);
}
$targetProfileSync = Cache::get(FollowerService::FOLLOWERS_SYNC_KEY . $target->id);
if(!$targetProfileSync) {
FollowServiceWarmCache::dispatch($target->id)->onQueue('low');
} else {
if($target->followers_count) {
$target->increment('followers_count');
} else {
$count = Follower::whereFollowingId($target->id)->count();
$target->followers_count = $count;
$target->save();
}
Cache::put(FollowerService::FOLLOWERS_SYNC_KEY . $target->id, 1, 604800);
AccountService::del($target->id);
}
if($target->domain || !$target->private_key) { if($target->domain || !$target->private_key) {
return; return;
} }

View file

@ -16,72 +16,72 @@ use App\Profile;
class FollowServiceWarmCache implements ShouldQueue class FollowServiceWarmCache implements ShouldQueue
{ {
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $profileId; public $profileId;
public $tries = 5; public $tries = 5;
public $timeout = 300; public $timeout = 5000;
public $failOnTimeout = true; public $failOnTimeout = false;
/** /**
* Create a new job instance. * Create a new job instance.
* *
* @return void * @return void
*/ */
public function __construct($profileId) public function __construct($profileId)
{ {
$this->profileId = $profileId; $this->profileId = $profileId;
} }
/** /**
* Execute the job. * Execute the job.
* *
* @return void * @return void
*/ */
public function handle() public function handle()
{ {
$id = $this->profileId; $id = $this->profileId;
$account = AccountService::get($id, true); $account = AccountService::get($id, true);
if(!$account) { if(!$account) {
Cache::put(FollowerService::FOLLOWERS_SYNC_KEY . $id, 1); Cache::put(FollowerService::FOLLOWERS_SYNC_KEY . $id, 1, 604800);
Cache::put(FollowerService::FOLLOWING_SYNC_KEY . $id, 1); Cache::put(FollowerService::FOLLOWING_SYNC_KEY . $id, 1, 604800);
return; return;
} }
DB::table('followers') DB::table('followers')
->select('id', 'following_id', 'profile_id') ->select('id', 'following_id', 'profile_id')
->whereFollowingId($id) ->whereFollowingId($id)
->orderBy('id') ->orderBy('id')
->chunk(200, function($followers) use($id) { ->chunk(200, function($followers) use($id) {
foreach($followers as $follow) { foreach($followers as $follow) {
FollowerService::add($follow->profile_id, $id); FollowerService::add($follow->profile_id, $id);
} }
}); });
DB::table('followers') DB::table('followers')
->select('id', 'following_id', 'profile_id') ->select('id', 'following_id', 'profile_id')
->whereProfileId($id) ->whereProfileId($id)
->orderBy('id') ->orderBy('id')
->chunk(200, function($followers) use($id) { ->chunk(200, function($followers) use($id) {
foreach($followers as $follow) { foreach($followers as $follow) {
FollowerService::add($id, $follow->following_id); FollowerService::add($id, $follow->following_id);
} }
}); });
Cache::put(FollowerService::FOLLOWERS_SYNC_KEY . $id, 1); Cache::put(FollowerService::FOLLOWERS_SYNC_KEY . $id, 1, 604800);
Cache::put(FollowerService::FOLLOWING_SYNC_KEY . $id, 1); Cache::put(FollowerService::FOLLOWING_SYNC_KEY . $id, 1, 604800);
$profile = Profile::find($id); $profile = Profile::find($id);
if($profile) { if($profile) {
$profile->following_count = DB::table('followers')->whereProfileId($id)->count(); $profile->following_count = DB::table('followers')->whereProfileId($id)->count();
$profile->followers_count = DB::table('followers')->whereFollowingId($id)->count(); $profile->followers_count = DB::table('followers')->whereFollowingId($id)->count();
$profile->save(); $profile->save();
} }
AccountService::del($id); AccountService::del($id);
return; return;
} }
} }

View file

@ -0,0 +1,114 @@
<?php
namespace App\Jobs\FollowPipeline;
use App\Follower;
use App\FollowRequest;
use App\Notification;
use App\Profile;
use Cache;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Log;
use Illuminate\Support\Facades\Redis;
use App\Services\AccountService;
use App\Services\FollowerService;
use App\Services\NotificationService;
class UnfollowPipeline implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $actor;
protected $target;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($actor, $target)
{
$this->actor = $actor;
$this->target = $target;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$actor = $this->actor;
$target = $this->target;
$actorProfile = Profile::find($actor);
if(!$actorProfile) {
return;
}
$targetProfile = Profile::find($target);
if(!$targetProfile) {
return;
}
FollowerService::remove($actor, $target);
$actorProfileSync = Cache::get(FollowerService::FOLLOWING_SYNC_KEY . $actor);
if(!$actorProfileSync) {
FollowServiceWarmCache::dispatch($actor)->onQueue('low');
} else {
if($actorProfile->following_count) {
$actorProfile->decrement('following_count');
} else {
$count = Follower::whereProfileId($actor)->count();
$actorProfile->following_count = $count;
$actorProfile->save();
}
Cache::put(FollowerService::FOLLOWING_SYNC_KEY . $actor, 1, 604800);
AccountService::del($actor);
}
$targetProfileSync = Cache::get(FollowerService::FOLLOWERS_SYNC_KEY . $target);
if(!$targetProfileSync) {
FollowServiceWarmCache::dispatch($target)->onQueue('low');
} else {
if($targetProfile->followers_count) {
$targetProfile->decrement('followers_count');
} else {
$count = Follower::whereFollowingId($target)->count();
$targetProfile->followers_count = $count;
$targetProfile->save();
}
Cache::put(FollowerService::FOLLOWERS_SYNC_KEY . $target, 1, 604800);
AccountService::del($target);
}
if($targetProfile->domain == null) {
Notification::withTrashed()
->whereProfileId($target)
->whereAction('follow')
->whereActorId($actor)
->whereItemId($target)
->whereItemType('App\Profile')
->get()
->each(function($n) {
NotificationService::del($n->profile_id, $n->id);
$n->forceDelete();
});
}
if($actorProfile->domain == null && config('instance.timeline.home.cached')) {
Cache::forget('pf:timelines:home:' . $actor);
}
FollowRequest::whereFollowingId($target)
->whereFollowerId($actor)
->delete();
return;
}
}

View file

@ -15,7 +15,6 @@ use App\Jobs\FollowPipeline\FollowServiceWarmCache;
class FollowerService class FollowerService
{ {
const CACHE_KEY = 'pf:services:followers:'; const CACHE_KEY = 'pf:services:followers:';
const FOLLOWERS_SYNC_ACTIVE = 'pf:services:followers:sync-active:';
const FOLLOWERS_SYNC_KEY = 'pf:services:followers:sync-followers:'; const FOLLOWERS_SYNC_KEY = 'pf:services:followers:sync-followers:';
const FOLLOWING_SYNC_KEY = 'pf:services:followers:sync-following:'; const FOLLOWING_SYNC_KEY = 'pf:services:followers:sync-following:';
const FOLLOWING_KEY = 'pf:services:follow:following:id:'; const FOLLOWING_KEY = 'pf:services:follow:following:id:';
@ -106,25 +105,13 @@ class FollowerService
if(Cache::get(self::FOLLOWERS_SYNC_KEY . $id) != null) { if(Cache::get(self::FOLLOWERS_SYNC_KEY . $id) != null) {
return; return;
} }
if(Cache::get(self::FOLLOWERS_SYNC_ACTIVE . $id) != null) {
return;
}
FollowServiceWarmCache::dispatch($id)->onQueue('low'); FollowServiceWarmCache::dispatch($id)->onQueue('low');
Cache::put(self::FOLLOWERS_SYNC_ACTIVE . $id, 1, 604800);
} }
if($scope === 'following') { if($scope === 'following') {
if(Cache::get(self::FOLLOWING_SYNC_KEY . $id) != null) { if(Cache::get(self::FOLLOWING_SYNC_KEY . $id) != null) {
return; return;
} }
if(Cache::get(self::FOLLOWERS_SYNC_ACTIVE . $id) != null) {
return;
}
FollowServiceWarmCache::dispatch($id)->onQueue('low'); FollowServiceWarmCache::dispatch($id)->onQueue('low');
Cache::put(self::FOLLOWERS_SYNC_ACTIVE . $id, 1, 604800);
} }
return; return;
} }
@ -220,6 +207,5 @@ class FollowerService
Redis::del(self::FOLLOWERS_KEY . $id); Redis::del(self::FOLLOWERS_KEY . $id);
Cache::forget(self::FOLLOWERS_SYNC_KEY . $id); Cache::forget(self::FOLLOWERS_SYNC_KEY . $id);
Cache::forget(self::FOLLOWING_SYNC_KEY . $id); Cache::forget(self::FOLLOWING_SYNC_KEY . $id);
Cache::forget(self::FOLLOWERS_SYNC_ACTIVE . $id);
} }
} }