Merge pull request #4752 from pixelfed/staging

Experimental home feed
This commit is contained in:
daniel 2023-11-13 00:50:26 -07:00 committed by GitHub
commit e8439358cb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 1722 additions and 786 deletions

View file

@ -8,6 +8,7 @@
- Added user:2fa command to easily disable 2FA for given account ([c6408fd7](https://github.com/pixelfed/pixelfed/commit/c6408fd7))
- Added `avatar:storage-deep-clean` command to dispatch remote avatar storage cleanup jobs ([c37b7cde](https://github.com/pixelfed/pixelfed/commit/c37b7cde))
- Added S3 command to rewrite media urls ([5b3a5610](https://github.com/pixelfed/pixelfed/commit/5b3a5610))
- Experimental home feed ([#4752](https://github.com/pixelfed/pixelfed/pull/4752)) ([c39b9afb](https://github.com/pixelfed/pixelfed/commit/c39b9afb))
### Federation
- Update Privacy Settings, add support for Mastodon `indexable` search flag ([fc24630e](https://github.com/pixelfed/pixelfed/commit/fc24630e))
@ -46,6 +47,7 @@
- Update ImportPostController, fix IG bug with missing spaces between hashtags ([9c24157a](https://github.com/pixelfed/pixelfed/commit/9c24157a))
- Update ApiV1Controller, fix mutes in home feed ([ddc21714](https://github.com/pixelfed/pixelfed/commit/ddc21714))
- Update AP helpers, improve preferredUsername validation ([21218c79](https://github.com/pixelfed/pixelfed/commit/21218c79))
- Update delete pipelines, properly invoke StatusHashtag delete events ([ce54d29c](https://github.com/pixelfed/pixelfed/commit/ce54d29c))
- ([](https://github.com/pixelfed/pixelfed/commit/))
## [v0.11.9 (2023-08-21)](https://github.com/pixelfed/pixelfed/compare/v0.11.8...v0.11.9)

View file

@ -71,6 +71,7 @@ use App\Services\{
CollectionService,
FollowerService,
HashtagService,
HomeTimelineService,
InstanceService,
LikeService,
NetworkTimelineService,
@ -102,6 +103,7 @@ use Illuminate\Support\Facades\RateLimiter;
use Purify;
use Carbon\Carbon;
use App\Http\Resources\MastoApi\FollowedTagResource;
use App\Jobs\HomeFeedPipeline\FeedWarmCachePipeline;
class ApiV1Controller extends Controller
{
@ -2129,11 +2131,11 @@ class ApiV1Controller extends Controller
public function timelineHome(Request $request)
{
$this->validate($request,[
'page' => 'sometimes|integer|max:40',
'min_id' => 'sometimes|integer|min:0|max:' . PHP_INT_MAX,
'max_id' => 'sometimes|integer|min:0|max:' . PHP_INT_MAX,
'limit' => 'sometimes|integer|min:1|max:100',
'include_reblogs' => 'sometimes',
'page' => 'sometimes|integer|max:40',
'min_id' => 'sometimes|integer|min:0|max:' . PHP_INT_MAX,
'max_id' => 'sometimes|integer|min:0|max:' . PHP_INT_MAX,
'limit' => 'sometimes|integer|min:1|max:40',
'include_reblogs' => 'sometimes',
]);
$napi = $request->has(self::PF_API_ENTITY_KEY);
@ -2142,13 +2144,77 @@ class ApiV1Controller extends Controller
$max = $request->input('max_id');
$limit = $request->input('limit') ?? 20;
$pid = $request->user()->profile_id;
$includeReblogs = $request->filled('include_reblogs');
$nullFields = $includeReblogs ?
['in_reply_to_id'] :
['in_reply_to_id', 'reblog_of_id'];
$inTypes = $includeReblogs ?
['photo', 'photo:album', 'video', 'video:album', 'photo:video:album', 'share'] :
['photo', 'photo:album', 'video', 'video:album', 'photo:video:album'];
$includeReblogs = $request->filled('include_reblogs');
$nullFields = $includeReblogs ?
['in_reply_to_id'] :
['in_reply_to_id', 'reblog_of_id'];
$inTypes = $includeReblogs ?
['photo', 'photo:album', 'video', 'video:album', 'photo:video:album', 'share'] :
['photo', 'photo:album', 'video', 'video:album', 'photo:video:album'];
if(config('exp.cached_home_timeline')) {
if($min || $max) {
if($request->has('min_id')) {
$res = HomeTimelineService::getRankedMinId($pid, $min ?? 0, $limit + 10);
} else {
$res = HomeTimelineService::getRankedMaxId($pid, $max ?? 0, $limit + 10);
}
} else {
$res = HomeTimelineService::get($pid, 0, $limit + 10);
}
if(!$res) {
$res = Cache::has('pf:services:apiv1:home:cached:coldbootcheck:' . $pid);
if(!$res) {
Cache::set('pf:services:apiv1:home:cached:coldbootcheck:' . $pid, 1, 86400);
FeedWarmCachePipeline::dispatchSync($pid);
return response()->json([], 206);
} else {
Cache::set('pf:services:apiv1:home:cached:coldbootcheck:' . $pid, 1, 86400);
return response()->json([], 206);
}
}
$res = collect($res)->take($limit)->map(function($id) use($napi) {
return $napi ? StatusService::get($id, false) : StatusService::getMastodon($id, false);
})->filter(function($res) {
return $res && isset($res['account']);
})->map(function($status) use($pid) {
if($pid) {
$status['favourited'] = (bool) LikeService::liked($pid, $status['id']);
$status['reblogged'] = (bool) ReblogService::get($pid, $status['id']);
$status['bookmarked'] = (bool) BookmarkService::get($pid, $status['id']);
}
return $status;
});
$baseUrl = config('app.url') . '/api/v1/timelines/home?limit=' . $limit . '&';
$minId = $res->map(function($s) {
return ['id' => $s['id']];
})->min('id');
$maxId = $res->map(function($s) {
return ['id' => $s['id']];
})->max('id');
if($minId == $maxId) {
$minId = null;
}
if($maxId) {
$link = '<'.$baseUrl.'max_id='.$minId.'>; rel="next"';
}
if($minId) {
$link = '<'.$baseUrl.'min_id='.$maxId.'>; rel="prev"';
}
if($maxId && $minId) {
$link = '<'.$baseUrl.'max_id='.$minId.'>; rel="next",<'.$baseUrl.'min_id='.$maxId.'>; rel="prev"';
}
$headers = isset($link) ? ['Link' => $link] : [];
return $this->json($res->toArray(), 200, $headers);
}
$following = Cache::remember('profile:following:'.$pid, 1209600, function() use($pid) {
$following = Follower::whereProfileId($pid)->pluck('following_id');
@ -2161,6 +2227,7 @@ class ApiV1Controller extends Controller
$following = array_diff($following, $muted);
}
if($min || $max) {
$dir = $min ? '>' : '<';
$id = $min ?? $max;
@ -2199,22 +2266,22 @@ class ApiV1Controller extends Controller
if($pid) {
$status['favourited'] = (bool) LikeService::liked($pid, $s['id']);
$status['reblogged'] = (bool) ReblogService::get($pid, $status['id']);
$status['bookmarked'] = (bool) BookmarkService::get($pid, $status['id']);
$status['bookmarked'] = (bool) BookmarkService::get($pid, $status['id']);
}
return $status;
})
->filter(function($status) {
return $status && isset($status['account']);
})
->map(function($status) use($pid) {
if(!empty($status['reblog'])) {
$status['reblog']['favourited'] = (bool) LikeService::liked($pid, $status['reblog']['id']);
$status['reblog']['reblogged'] = (bool) ReblogService::get($pid, $status['reblog']['id']);
$status['bookmarked'] = (bool) BookmarkService::get($pid, $status['id']);
}
->map(function($status) use($pid) {
if(!empty($status['reblog'])) {
$status['reblog']['favourited'] = (bool) LikeService::liked($pid, $status['reblog']['id']);
$status['reblog']['reblogged'] = (bool) ReblogService::get($pid, $status['reblog']['id']);
$status['bookmarked'] = (bool) BookmarkService::get($pid, $status['id']);
}
return $status;
})
return $status;
})
->take($limit)
->values();
} else {
@ -2252,22 +2319,22 @@ class ApiV1Controller extends Controller
if($pid) {
$status['favourited'] = (bool) LikeService::liked($pid, $s['id']);
$status['reblogged'] = (bool) ReblogService::get($pid, $status['id']);
$status['bookmarked'] = (bool) BookmarkService::get($pid, $status['id']);
$status['bookmarked'] = (bool) BookmarkService::get($pid, $status['id']);
}
return $status;
})
->filter(function($status) {
return $status && isset($status['account']);
})
->map(function($status) use($pid) {
if(!empty($status['reblog'])) {
$status['reblog']['favourited'] = (bool) LikeService::liked($pid, $status['reblog']['id']);
$status['reblog']['reblogged'] = (bool) ReblogService::get($pid, $status['reblog']['id']);
$status['bookmarked'] = (bool) BookmarkService::get($pid, $status['id']);
}
->map(function($status) use($pid) {
if(!empty($status['reblog'])) {
$status['reblog']['favourited'] = (bool) LikeService::liked($pid, $status['reblog']['id']);
$status['reblog']['reblogged'] = (bool) ReblogService::get($pid, $status['reblog']['id']);
$status['bookmarked'] = (bool) BookmarkService::get($pid, $status['id']);
}
return $status;
})
return $status;
})
->take($limit)
->values();
}
@ -2321,10 +2388,11 @@ class ApiV1Controller extends Controller
$max = $request->input('max_id');
$limit = $request->input('limit') ?? 20;
$user = $request->user();
$remote = ($request->has('remote') && $request->input('remote') == true) || ($request->filled('local') && $request->input('local') != true);
$remote = $request->has('remote');
$local = $request->has('local');
$filtered = $user ? UserFilterService::filters($user->profile_id) : [];
if((!$request->has('local') || $remote) && config('instance.timeline.network.cached')) {
if($remote && config('instance.timeline.network.cached')) {
Cache::remember('api:v1:timelines:network:cache_check', 10368000, function() {
if(NetworkTimelineService::count() == 0) {
NetworkTimelineService::warmCache(true, config('instance.timeline.network.cache_dropoff'));
@ -2338,7 +2406,9 @@ class ApiV1Controller extends Controller
} else {
$feed = NetworkTimelineService::get(0, $limit + 5);
}
} else {
}
if($local || !$remote && !$local) {
Cache::remember('api:v1:timelines:public:cache_check', 10368000, function() {
if(PublicTimelineService::count() == 0) {
PublicTimelineService::warmCache(true, 400);
@ -3533,19 +3603,6 @@ class ApiV1Controller extends Controller
->filter(function($profile) use($pid) {
return $profile['id'] != $pid;
})
->map(function($profile) {
$ids = collect(ProfileStatusService::get($profile['id'], 0, 9))
->map(function($id) {
return StatusService::get($id, true);
})
->filter(function($post) {
return $post && isset($post['id']);
})
->take(3)
->values();
$profile['recent_posts'] = $ids;
return $profile;
})
->take(6)
->values();

View file

@ -76,7 +76,10 @@ class DeleteRemoteStatusPipeline implements ShouldQueue
});
Mention::whereStatusId($status->id)->forceDelete();
Report::whereObjectType('App\Status')->whereObjectId($status->id)->delete();
StatusHashtag::whereStatusId($status->id)->delete();
$statusHashtags = StatusHashtag::whereStatusId($status->id)->get();
foreach($statusHashtags as $stag) {
$stag->delete();
}
StatusView::whereStatusId($status->id)->delete();
Status::whereReblogOfId($status->id)->forceDelete();
$status->forceDelete();

View file

@ -17,6 +17,7 @@ use Illuminate\Support\Facades\Redis;
use App\Services\AccountService;
use App\Services\FollowerService;
use App\Services\NotificationService;
use App\Jobs\HomeFeedPipeline\FeedUnfollowPipeline;
class UnfollowPipeline implements ShouldQueue
{
@ -55,6 +56,8 @@ class UnfollowPipeline implements ShouldQueue
return;
}
FeedUnfollowPipeline::dispatch($actor, $target)->onQueue('follow');
FollowerService::remove($actor, $target);
$actorProfileSync = Cache::get(FollowerService::FOLLOWING_SYNC_KEY . $actor);

View file

@ -0,0 +1,87 @@
<?php
namespace App\Jobs\HomeFeedPipeline;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
use App\Services\AccountService;
use App\Services\HomeTimelineService;
use App\Services\SnowflakeService;
use App\Status;
class FeedFollowPipeline implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $actorId;
protected $followingId;
public $timeout = 900;
public $tries = 3;
public $maxExceptions = 1;
public $failOnTimeout = true;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return 'hts:feed:insert:follows:aid:' . $this->actorId . ':fid:' . $this->followingId;
}
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping("hts:feed:insert:follows:aid:{$this->actorId}:fid:{$this->followingId}"))->shared()->dontRelease()];
}
/**
* Create a new job instance.
*/
public function __construct($actorId, $followingId)
{
$this->actorId = $actorId;
$this->followingId = $followingId;
}
/**
* Execute the job.
*/
public function handle(): void
{
$actorId = $this->actorId;
$followingId = $this->followingId;
$minId = SnowflakeService::byDate(now()->subMonths(6));
$ids = Status::where('id', '>', $minId)
->where('profile_id', $followingId)
->whereNull(['in_reply_to_id', 'reblog_of_id'])
->whereIn('type', ['photo', 'photo:album', 'video', 'video:album', 'photo:video:album'])
->whereIn('visibility',['public', 'unlisted', 'private'])
->orderByDesc('id')
->limit(HomeTimelineService::FOLLOWER_FEED_POST_LIMIT)
->pluck('id');
foreach($ids as $id) {
HomeTimelineService::add($actorId, $id);
}
}
}

View file

@ -0,0 +1,73 @@
<?php
namespace App\Jobs\HomeFeedPipeline;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
use App\Services\FollowerService;
use App\Services\HomeTimelineService;
class FeedInsertPipeline implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $sid;
protected $pid;
public $timeout = 900;
public $tries = 3;
public $maxExceptions = 1;
public $failOnTimeout = true;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return 'hts:feed:insert:sid:' . $this->sid;
}
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping("hts:feed:insert:sid:{$this->sid}"))->shared()->dontRelease()];
}
/**
* Create a new job instance.
*/
public function __construct($sid, $pid)
{
$this->sid = $sid;
$this->pid = $pid;
}
/**
* Execute the job.
*/
public function handle(): void
{
$ids = FollowerService::localFollowerIds($this->pid);
foreach($ids as $id) {
HomeTimelineService::add($id, $this->sid);
}
}
}

View file

@ -0,0 +1,73 @@
<?php
namespace App\Jobs\HomeFeedPipeline;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
use App\Services\FollowerService;
use App\Services\HomeTimelineService;
class FeedRemovePipeline implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $sid;
protected $pid;
public $timeout = 900;
public $tries = 3;
public $maxExceptions = 1;
public $failOnTimeout = true;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return 'hts:feed:remove:sid:' . $this->sid;
}
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping("hts:feed:remove:sid:{$this->sid}"))->shared()->dontRelease()];
}
/**
* Create a new job instance.
*/
public function __construct($sid, $pid)
{
$this->sid = $sid;
$this->pid = $pid;
}
/**
* Execute the job.
*/
public function handle(): void
{
$ids = FollowerService::localFollowerIds($this->pid);
foreach($ids as $id) {
HomeTimelineService::rem($id, $this->sid);
}
}
}

View file

@ -0,0 +1,81 @@
<?php
namespace App\Jobs\HomeFeedPipeline;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
use App\Services\AccountService;
use App\Services\StatusService;
use App\Services\HomeTimelineService;
class FeedUnfollowPipeline implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $actorId;
protected $followingId;
public $timeout = 900;
public $tries = 3;
public $maxExceptions = 1;
public $failOnTimeout = true;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return 'hts:feed:remove:follows:aid:' . $this->actorId . ':fid:' . $this->followingId;
}
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping("hts:feed:remove:follows:aid:{$this->actorId}:fid:{$this->followingId}"))->shared()->dontRelease()];
}
/**
* Create a new job instance.
*/
public function __construct($actorId, $followingId)
{
$this->actorId = $actorId;
$this->followingId = $followingId;
}
/**
* Execute the job.
*/
public function handle(): void
{
$actorId = $this->actorId;
$followingId = $this->followingId;
$ids = HomeTimelineService::get($actorId, 0, -1);
foreach($ids as $id) {
$status = StatusService::get($id, false);
if($status && isset($status['account'], $status['account']['id'])) {
if($status['account']['id'] == $followingId) {
HomeTimelineService::rem($actorId, $id);
}
}
}
}
}

View file

@ -0,0 +1,67 @@
<?php
namespace App\Jobs\HomeFeedPipeline;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Services\HomeTimelineService;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class FeedWarmCachePipeline implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $pid;
public $timeout = 900;
public $tries = 3;
public $maxExceptions = 1;
public $failOnTimeout = true;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return 'hfp:warm-cache:pid:' . $this->pid;
}
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping("hfp:warm-cache:pid:{$this->pid}"))->shared()->dontRelease()];
}
/**
* Create a new job instance.
*/
public function __construct($pid)
{
$this->pid = $pid;
}
/**
* Execute the job.
*/
public function handle(): void
{
$pid = $this->pid;
HomeTimelineService::warmCache($pid, true, 400, true);
}
}

View file

@ -0,0 +1,86 @@
<?php
namespace App\Jobs\HomeFeedPipeline;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Hashtag;
use App\StatusHashtag;
use App\Services\HashtagFollowService;
use App\Services\HomeTimelineService;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class HashtagInsertFanoutPipeline implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $hashtag;
public $timeout = 900;
public $tries = 3;
public $maxExceptions = 1;
public $failOnTimeout = true;
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return 'hfp:hashtag:fanout:insert:' . $this->hashtag->id;
}
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping("hfp:hashtag:fanout:insert:{$this->hashtag->id}"))->shared()->dontRelease()];
}
/**
* Create a new job instance.
*/
public function __construct(StatusHashtag $hashtag)
{
$this->hashtag = $hashtag;
}
/**
* Execute the job.
*/
public function handle(): void
{
$hashtag = $this->hashtag;
$ids = HashtagFollowService::getPidByHid($hashtag->hashtag_id);
if(!$ids || !count($ids)) {
return;
}
foreach($ids as $id) {
HomeTimelineService::add($id, $hashtag->status_id);
}
}
}

View file

@ -0,0 +1,82 @@
<?php
namespace App\Jobs\HomeFeedPipeline;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Hashtag;
use App\StatusHashtag;
use App\Services\HashtagFollowService;
use App\Services\HomeTimelineService;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class HashtagRemoveFanoutPipeline implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $sid;
protected $hid;
public $timeout = 900;
public $tries = 3;
public $maxExceptions = 1;
public $failOnTimeout = true;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return 'hfp:hashtag:fanout:remove:' . $this->hid . ':' . $this->sid;
}
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping("hfp:hashtag:fanout:remove:{$this->hid}:{$this->sid}"))->shared()->dontRelease()];
}
/**
* Create a new job instance.
*/
public function __construct($sid, $hid)
{
$this->sid = $sid;
$this->hid = $hid;
}
/**
* Execute the job.
*/
public function handle(): void
{
$sid = $this->sid;
$hid = $this->hid;
$ids = HashtagFollowService::getPidByHid($hid);
if(!$ids || !count($ids)) {
return;
}
foreach($ids as $id) {
HomeTimelineService::rem($id, $sid);
}
}
}

View file

@ -153,7 +153,10 @@ class RemoteStatusDelete implements ShouldQueue, ShouldBeUniqueUntilProcessing
->whereObjectId($status->id)
->delete();
StatusArchived::whereStatusId($status->id)->delete();
StatusHashtag::whereStatusId($status->id)->delete();
$statusHashtags = StatusHashtag::whereStatusId($status->id)->get();
foreach($statusHashtags as $stag) {
$stag->delete();
}
StatusView::whereStatusId($status->id)->delete();
Status::whereInReplyToId($status->id)->update(['in_reply_to_id' => null]);

View file

@ -130,7 +130,10 @@ class StatusDelete implements ShouldQueue
->delete();
StatusArchived::whereStatusId($status->id)->delete();
StatusHashtag::whereStatusId($status->id)->delete();
$statusHashtags = StatusHashtag::whereStatusId($status->id)->get();
foreach($statusHashtags as $stag) {
$stag->delete();
}
StatusView::whereStatusId($status->id)->delete();
Status::whereInReplyToId($status->id)->update(['in_reply_to_id' => null]);

View file

@ -21,6 +21,8 @@ use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Services\UserFilterService;
use App\Services\AdminShadowFilterService;
use App\Jobs\HomeFeedPipeline\FeedInsertPipeline;
use App\Jobs\HomeFeedPipeline\HashtagInsertFanoutPipeline;
class StatusEntityLexer implements ShouldQueue
{
@ -105,12 +107,12 @@ class StatusEntityLexer implements ShouldQueue
}
DB::transaction(function () use ($status, $tag) {
$slug = str_slug($tag, '-', false);
$hashtag = Hashtag::where('slug', $slug)->first();
if (!$hashtag) {
$hashtag = Hashtag::create(
['name' => $tag, 'slug' => $slug]
);
}
$hashtag = Hashtag::firstOrCreate([
'slug' => $slug
], [
'name' => $tag
]);
StatusHashtag::firstOrCreate(
[
@ -150,6 +152,21 @@ class StatusEntityLexer implements ShouldQueue
MentionPipeline::dispatch($status, $m);
});
}
$this->fanout();
}
public function fanout()
{
$status = $this->status;
if(config('exp.cached_home_timeline')) {
if( $status->in_reply_to_id === null &&
$status->reblog_of_id === null &&
in_array($status->scope, ['public', 'unlisted', 'private'])
) {
FeedInsertPipeline::dispatch($status->id, $status->profile_id)->onQueue('feed');
}
}
$this->deliver();
}

View file

@ -5,6 +5,8 @@ namespace App\Observers;
use App\Follower;
use App\Services\FollowerService;
use Cache;
use App\Jobs\HomeFeedPipeline\FeedFollowPipeline;
use App\Jobs\HomeFeedPipeline\FeedUnfollowPipeline;
class FollowerObserver
{
@ -21,6 +23,7 @@ class FollowerObserver
}
FollowerService::add($follower->profile_id, $follower->following_id);
FeedFollowPipeline::dispatch($follower->profile_id, $follower->following_id)->onQueue('follow');
}
/**

View file

@ -5,32 +5,31 @@ namespace App\Observers;
use DB;
use App\StatusHashtag;
use App\Services\StatusHashtagService;
use App\Jobs\HomeFeedPipeline\HashtagInsertFanoutPipeline;
use App\Jobs\HomeFeedPipeline\HashtagRemoveFanoutPipeline;
use Illuminate\Contracts\Events\ShouldHandleEventsAfterCommit;
class StatusHashtagObserver
class StatusHashtagObserver implements ShouldHandleEventsAfterCommit
{
/**
* Handle events after all transactions are committed.
*
* @var bool
*/
public $afterCommit = true;
/**
* Handle the notification "created" event.
*
* @param \App\Notification $notification
* @param \App\StatusHashtag $hashtag
* @return void
*/
public function created(StatusHashtag $hashtag)
{
StatusHashtagService::set($hashtag->hashtag_id, $hashtag->status_id);
DB::table('hashtags')->where('id', $hashtag->hashtag_id)->increment('cached_count');
if($hashtag->status_visibility && $hashtag->status_visibility === 'public') {
HashtagInsertFanoutPipeline::dispatch($hashtag)->onQueue('feed');
}
}
/**
* Handle the notification "updated" event.
*
* @param \App\Notification $notification
* @param \App\StatusHashtag $hashtag
* @return void
*/
public function updated(StatusHashtag $hashtag)
@ -39,21 +38,24 @@ class StatusHashtagObserver
}
/**
* Handle the notification "deleted" event.
* Handle the notification "deleting" event.
*
* @param \App\Notification $notification
* @param \App\StatusHashtag $hashtag
* @return void
*/
public function deleted(StatusHashtag $hashtag)
public function deleting(StatusHashtag $hashtag)
{
StatusHashtagService::del($hashtag->hashtag_id, $hashtag->status_id);
DB::table('hashtags')->where('id', $hashtag->hashtag_id)->decrement('cached_count');
if($hashtag->status_visibility && $hashtag->status_visibility === 'public') {
HashtagRemoveFanoutPipeline::dispatch($hashtag->status_id, $hashtag->hashtag_id)->onQueue('feed');
}
}
/**
* Handle the notification "restored" event.
*
* @param \App\Notification $notification
* @param \App\StatusHashtag $hashtag
* @return void
*/
public function restored(StatusHashtag $hashtag)
@ -64,7 +66,7 @@ class StatusHashtagObserver
/**
* Handle the notification "force deleted" event.
*
* @param \App\Notification $notification
* @param \App\StatusHashtag $hashtag
* @return void
*/
public function forceDeleted(StatusHashtag $hashtag)

View file

@ -7,6 +7,7 @@ use App\Services\ProfileStatusService;
use Cache;
use App\Models\ImportPost;
use App\Services\ImportService;
use App\Jobs\HomeFeedPipeline\FeedRemovePipeline;
class StatusObserver
{
@ -63,6 +64,10 @@ class StatusObserver
ImportPost::whereProfileId($status->profile_id)->whereStatusId($status->id)->delete();
ImportService::clearImportedFiles($status->profile_id);
}
if(config('exp.cached_home_timeline')) {
FeedRemovePipeline::dispatch($status->id, $status->profile_id)->onQueue('feed');
}
}
/**

View file

@ -4,6 +4,8 @@ namespace App\Observers;
use App\UserFilter;
use App\Services\UserFilterService;
use App\Jobs\HomeFeedPipeline\FeedFollowPipeline;
use App\Jobs\HomeFeedPipeline\FeedUnfollowPipeline;
class UserFilterObserver
{
@ -78,10 +80,12 @@ class UserFilterObserver
switch ($userFilter->filter_type) {
case 'mute':
UserFilterService::mute($userFilter->user_id, $userFilter->filterable_id);
FeedUnfollowPipeline::dispatch($userFilter->user_id, $userFilter->filterable_id)->onQueue('feed');
break;
case 'block':
UserFilterService::block($userFilter->user_id, $userFilter->filterable_id);
FeedUnfollowPipeline::dispatch($userFilter->user_id, $userFilter->filterable_id)->onQueue('feed');
break;
}
}
@ -96,10 +100,12 @@ class UserFilterObserver
switch ($userFilter->filter_type) {
case 'mute':
UserFilterService::unmute($userFilter->user_id, $userFilter->filterable_id);
FeedFollowPipeline::dispatch($userFilter->user_id, $userFilter->filterable_id)->onQueue('feed');
break;
case 'block':
UserFilterService::unblock($userFilter->user_id, $userFilter->filterable_id);
FeedFollowPipeline::dispatch($userFilter->user_id, $userFilter->filterable_id)->onQueue('feed');
break;
}
}

View file

@ -19,6 +19,7 @@ class FollowerService
const FOLLOWING_SYNC_KEY = 'pf:services:followers:sync-following:';
const FOLLOWING_KEY = 'pf:services:follow:following:id:';
const FOLLOWERS_KEY = 'pf:services:follow:followers:id:';
const FOLLOWERS_LOCAL_KEY = 'pf:services:follow:local-follower-ids:';
public static function add($actor, $target, $refresh = true)
{
@ -212,4 +213,15 @@ class FollowerService
Cache::forget(self::FOLLOWERS_SYNC_KEY . $id);
Cache::forget(self::FOLLOWING_SYNC_KEY . $id);
}
public static function localFollowerIds($pid, $limit = 0)
{
$key = self::FOLLOWERS_LOCAL_KEY . $pid;
$res = Cache::remember($key, 7200, function() use($pid) {
return DB::table('followers')->whereFollowingId($pid)->whereLocalProfile(true)->pluck('profile_id')->sort();
});
return $limit ?
$res->take($limit)->values()->toArray() :
$res->values()->toArray();
}
}

View file

@ -0,0 +1,21 @@
<?php
namespace App\Services;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Redis;
use App\Hashtag;
use App\StatusHashtag;
use App\HashtagFollow;
class HashtagFollowService
{
const FOLLOW_KEY = 'pf:services:hashtag-follows:v1:';
public static function getPidByHid($hid)
{
return Cache::remember(self::FOLLOW_KEY . $hid, 86400, function() use($hid) {
return HashtagFollow::whereHashtagId($hid)->pluck('profile_id')->toArray();
});
}
}

View file

@ -8,65 +8,79 @@ use App\Hashtag;
use App\StatusHashtag;
use App\HashtagFollow;
class HashtagService {
class HashtagService
{
const FOLLOW_KEY = 'pf:services:hashtag:following:v1:';
const FOLLOW_PIDS_KEY = 'pf:services:hashtag-follows:v1:';
const FOLLOW_KEY = 'pf:services:hashtag:following:';
public static function get($id)
{
return Cache::remember('services:hashtag:by_id:' . $id, 3600, function() use($id) {
$tag = Hashtag::find($id);
if(!$tag) {
return [];
}
return [
'name' => $tag->name,
'slug' => $tag->slug,
];
});
}
public static function get($id)
{
return Cache::remember('services:hashtag:by_id:' . $id, 3600, function() use($id) {
$tag = Hashtag::find($id);
if(!$tag) {
return [];
}
return [
'name' => $tag->name,
'slug' => $tag->slug,
];
});
}
public static function count($id)
{
return Cache::remember('services:hashtag:public-count:by_id:' . $id, 86400, function() use($id) {
return StatusHashtag::whereHashtagId($id)->whereStatusVisibility('public')->count();
});
}
public static function count($id)
{
return Cache::remember('services:hashtag:public-count:by_id:' . $id, 86400, function() use($id) {
return StatusHashtag::whereHashtagId($id)->whereStatusVisibility('public')->count();
});
}
public static function isFollowing($pid, $hid)
{
$res = Redis::zscore(self::FOLLOW_KEY . $hid, $pid);
if($res) {
return true;
}
public static function isFollowing($pid, $hid)
{
$res = Redis::zscore(self::FOLLOW_KEY . $pid, $hid);
if($res) {
return true;
}
$synced = Cache::get(self::FOLLOW_KEY . 'acct:' . $pid . ':synced');
if(!$synced) {
$tags = HashtagFollow::whereProfileId($pid)
->get()
->each(function($tag) use($pid) {
self::follow($pid, $tag->hashtag_id);
});
Cache::set(self::FOLLOW_KEY . 'acct:' . $pid . ':synced', true, 1209600);
$synced = Cache::get(self::FOLLOW_KEY . $pid . ':synced');
if(!$synced) {
$tags = HashtagFollow::whereProfileId($pid)
->get()
->each(function($tag) use($pid) {
self::follow($pid, $tag->hashtag_id);
});
Cache::set(self::FOLLOW_KEY . $pid . ':synced', true, 1209600);
return (bool) Redis::zscore(self::FOLLOW_KEY . $hid, $pid) >= 1;
}
return (bool) Redis::zscore(self::FOLLOW_KEY . $pid, $hid) > 1;
}
return false;
}
return false;
}
public static function follow($pid, $hid)
{
Cache::forget(self::FOLLOW_PIDS_KEY . $hid);
return Redis::zadd(self::FOLLOW_KEY . $hid, $pid, $pid);
}
public static function follow($pid, $hid)
{
return Redis::zadd(self::FOLLOW_KEY . $pid, $hid, $hid);
}
public static function unfollow($pid, $hid)
{
Cache::forget(self::FOLLOW_PIDS_KEY . $hid);
return Redis::zrem(self::FOLLOW_KEY . $hid, $pid);
}
public static function unfollow($pid, $hid)
{
return Redis::zrem(self::FOLLOW_KEY . $pid, $hid);
}
public static function following($hid, $start = 0, $limit = 10)
{
$synced = Cache::get(self::FOLLOW_KEY . 'acct-following:' . $hid . ':synced');
if(!$synced) {
$tags = HashtagFollow::whereHashtagId($hid)
->get()
->each(function($tag) use($hid) {
self::follow($tag->profile_id, $hid);
});
Cache::set(self::FOLLOW_KEY . 'acct-following:' . $hid . ':synced', true, 1209600);
public static function following($pid, $start = 0, $limit = 10)
{
return Redis::zrevrange(self::FOLLOW_KEY . $pid, $start, $limit);
}
return Redis::zrevrange(self::FOLLOW_KEY . $hid, $start, $limit);
}
return Redis::zrevrange(self::FOLLOW_KEY . $hid, $start, $limit);
}
}

View file

@ -0,0 +1,101 @@
<?php
namespace App\Services;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Redis;
use App\Follower;
use App\Status;
class HomeTimelineService
{
const CACHE_KEY = 'pf:services:timeline:home:';
const FOLLOWER_FEED_POST_LIMIT = 10;
public static function get($id, $start = 0, $stop = 10)
{
if($stop > 100) {
$stop = 100;
}
return Redis::zrevrange(self::CACHE_KEY . $id, $start, $stop);
}
public static function getRankedMaxId($id, $start = null, $limit = 10)
{
if(!$start) {
return [];
}
return array_keys(Redis::zrevrangebyscore(self::CACHE_KEY . $id, $start, '-inf', [
'withscores' => true,
'limit' => [1, $limit - 1]
]));
}
public static function getRankedMinId($id, $end = null, $limit = 10)
{
if(!$end) {
return [];
}
return array_keys(Redis::zrevrangebyscore(self::CACHE_KEY . $id, '+inf', $end, [
'withscores' => true,
'limit' => [0, $limit]
]));
}
public static function add($id, $val)
{
if(self::count($id) >= 400) {
Redis::zpopmin(self::CACHE_KEY . $id);
}
return Redis::zadd(self::CACHE_KEY .$id, $val, $val);
}
public static function rem($id, $val)
{
return Redis::zrem(self::CACHE_KEY . $id, $val);
}
public static function count($id)
{
return Redis::zcard(self::CACHE_KEY . $id);
}
public static function warmCache($id, $force = false, $limit = 100, $returnIds = false)
{
if(self::count($id) == 0 || $force == true) {
Redis::del(self::CACHE_KEY . $id);
$following = Cache::remember('profile:following:'.$id, 1209600, function() use($id) {
$following = Follower::whereProfileId($id)->pluck('following_id');
return $following->push($id)->toArray();
});
$minId = SnowflakeService::byDate(now()->subMonths(6));
$filters = UserFilterService::filters($id);
if($filters && count($filters)) {
$following = array_diff($following, $filters);
}
$ids = Status::where('id', '>', $minId)
->whereIn('profile_id', $following)
->whereNull(['in_reply_to_id', 'reblog_of_id'])
->whereIn('type', ['photo', 'photo:album', 'video', 'video:album', 'photo:video:album'])
->whereIn('visibility',['public', 'unlisted', 'private'])
->orderByDesc('id')
->limit($limit)
->pluck('id');
foreach($ids as $pid) {
self::add($id, $pid);
}
return $returnIds ? $ids : 1;
}
return 0;
}
}

1431
composer.lock generated

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,34 @@
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
return new class extends Migration
{
/**
* Run the migrations.
*/
public function up(): void
{
Schema::table('profiles', function (Blueprint $table) {
$table->index('followers_count', 'profiles_followers_count_index');
$table->index('following_count', 'profiles_following_count_index');
$table->index('status_count', 'profiles_status_count_index');
$table->index('is_private', 'profiles_is_private_index');
});
}
/**
* Reverse the migrations.
*/
public function down(): void
{
Schema::table('profiles', function (Blueprint $table) {
$table->dropIndex('profiles_followers_count_index');
$table->dropIndex('profiles_following_count_index');
$table->dropIndex('profiles_status_count_index');
$table->dropIndex('profiles_is_private_index');
});
}
};