Update job queue, separate deletes into their own queue

This commit is contained in:
Daniel Supernault 2021-07-13 23:09:50 -06:00
parent 3b071e56ac
commit 7f4213924f
No known key found for this signature in database
GPG key ID: 0DEF1C662C9033F7
3 changed files with 309 additions and 283 deletions

View file

@ -3,16 +3,17 @@
namespace App\Http\Controllers;
use App\Jobs\InboxPipeline\{
InboxWorker,
InboxValidator
DeleteWorker,
InboxWorker,
InboxValidator
};
use App\Jobs\RemoteFollowPipeline\RemoteFollowPipeline;
use App\{
AccountLog,
Like,
Profile,
Status,
User
AccountLog,
Like,
Profile,
Status,
User
};
use App\Util\Lexer\Nickname;
use App\Util\Webfinger\Webfinger;
@ -23,146 +24,158 @@ use Illuminate\Http\Request;
use League\Fractal;
use App\Util\Site\Nodeinfo;
use App\Util\ActivityPub\{
Helpers,
HttpSignature,
Outbox
Helpers,
HttpSignature,
Outbox
};
use Zttp\Zttp;
class FederationController extends Controller
{
public function nodeinfoWellKnown()
{
abort_if(!config('federation.nodeinfo.enabled'), 404);
return response()->json(Nodeinfo::wellKnown())
->header('Access-Control-Allow-Origin','*');
}
public function nodeinfoWellKnown()
{
abort_if(!config('federation.nodeinfo.enabled'), 404);
return response()->json(Nodeinfo::wellKnown())
->header('Access-Control-Allow-Origin','*');
}
public function nodeinfo()
{
abort_if(!config('federation.nodeinfo.enabled'), 404);
return response()->json(Nodeinfo::get())
->header('Access-Control-Allow-Origin','*');
}
public function nodeinfo()
{
abort_if(!config('federation.nodeinfo.enabled'), 404);
return response()->json(Nodeinfo::get())
->header('Access-Control-Allow-Origin','*');
}
public function webfinger(Request $request)
{
abort_if(!config('federation.webfinger.enabled'), 400);
public function webfinger(Request $request)
{
abort_if(!config('federation.webfinger.enabled'), 400);
abort_if(!$request->filled('resource'), 400);
abort_if(!$request->filled('resource'), 400);
$resource = $request->input('resource');
$parsed = Nickname::normalizeProfileUrl($resource);
if(empty($parsed) || $parsed['domain'] !== config('pixelfed.domain.app')) {
abort(404);
}
$username = $parsed['username'];
$profile = Profile::whereNull('domain')->whereUsername($username)->firstOrFail();
if($profile->status != null) {
return ProfileController::accountCheck($profile);
}
$webfinger = (new Webfinger($profile))->generate();
$resource = $request->input('resource');
$parsed = Nickname::normalizeProfileUrl($resource);
if(empty($parsed) || $parsed['domain'] !== config('pixelfed.domain.app')) {
abort(404);
}
$username = $parsed['username'];
$profile = Profile::whereNull('domain')->whereUsername($username)->firstOrFail();
if($profile->status != null) {
return ProfileController::accountCheck($profile);
}
$webfinger = (new Webfinger($profile))->generate();
return response()->json($webfinger, 200, [], JSON_PRETTY_PRINT|JSON_UNESCAPED_SLASHES)
->header('Access-Control-Allow-Origin','*');
}
return response()->json($webfinger, 200, [], JSON_PRETTY_PRINT|JSON_UNESCAPED_SLASHES)
->header('Access-Control-Allow-Origin','*');
}
public function hostMeta(Request $request)
{
abort_if(!config('federation.webfinger.enabled'), 404);
public function hostMeta(Request $request)
{
abort_if(!config('federation.webfinger.enabled'), 404);
$path = route('well-known.webfinger');
$xml = '<?xml version="1.0" encoding="UTF-8"?><XRD xmlns="http://docs.oasis-open.org/ns/xri/xrd-1.0"><Link rel="lrdd" type="application/xrd+xml" template="'.$path.'?resource={uri}"/></XRD>';
$path = route('well-known.webfinger');
$xml = '<?xml version="1.0" encoding="UTF-8"?><XRD xmlns="http://docs.oasis-open.org/ns/xri/xrd-1.0"><Link rel="lrdd" type="application/xrd+xml" template="'.$path.'?resource={uri}"/></XRD>';
return response($xml)->header('Content-Type', 'application/xrd+xml');
}
return response($xml)->header('Content-Type', 'application/xrd+xml');
}
public function userOutbox(Request $request, $username)
{
abort_if(!config_cache('federation.activitypub.enabled'), 404);
abort_if(!config('federation.activitypub.outbox'), 404);
public function userOutbox(Request $request, $username)
{
abort_if(!config_cache('federation.activitypub.enabled'), 404);
abort_if(!config('federation.activitypub.outbox'), 404);
$profile = Profile::whereNull('domain')
->whereNull('status')
->whereIsPrivate(false)
->whereUsername($username)
->firstOrFail();
$profile = Profile::whereNull('domain')
->whereNull('status')
->whereIsPrivate(false)
->whereUsername($username)
->firstOrFail();
$key = 'ap:outbox:latest_10:pid:' . $profile->id;
$ttl = now()->addMinutes(15);
$res = Cache::remember($key, $ttl, function() use($profile) {
return Outbox::get($profile);
});
$key = 'ap:outbox:latest_10:pid:' . $profile->id;
$ttl = now()->addMinutes(15);
$res = Cache::remember($key, $ttl, function() use($profile) {
return Outbox::get($profile);
});
return response(json_encode($res, JSON_UNESCAPED_SLASHES))->header('Content-Type', 'application/activity+json');
}
return response(json_encode($res, JSON_UNESCAPED_SLASHES))->header('Content-Type', 'application/activity+json');
}
public function userInbox(Request $request, $username)
{
abort_if(!config_cache('federation.activitypub.enabled'), 404);
abort_if(!config('federation.activitypub.inbox'), 404);
public function userInbox(Request $request, $username)
{
abort_if(!config_cache('federation.activitypub.enabled'), 404);
abort_if(!config('federation.activitypub.inbox'), 404);
$headers = $request->headers->all();
$payload = $request->getContent();
dispatch(new InboxValidator($username, $headers, $payload))->onQueue('high');
return;
}
$headers = $request->headers->all();
$payload = $request->getContent();
$obj = json_decode($payload, true, 8);
public function sharedInbox(Request $request)
{
abort_if(!config_cache('federation.activitypub.enabled'), 404);
abort_if(!config('federation.activitypub.sharedInbox'), 404);
if(isset($obj['type']) && $obj['type'] === 'Delete') {
dispatch(new DeleteWorker($headers, $payload))->onQueue('delete');
} else {
dispatch(new InboxValidator($username, $headers, $payload))->onQueue('high');
}
return;
}
$headers = $request->headers->all();
$payload = $request->getContent();
dispatch(new InboxWorker($headers, $payload))->onQueue('high');
return;
}
public function sharedInbox(Request $request)
{
abort_if(!config_cache('federation.activitypub.enabled'), 404);
abort_if(!config('federation.activitypub.sharedInbox'), 404);
public function userFollowing(Request $request, $username)
{
abort_if(!config_cache('federation.activitypub.enabled'), 404);
$headers = $request->headers->all();
$payload = $request->getContent();
$obj = json_decode($payload, true, 8);
$profile = Profile::whereNull('remote_url')
->whereUsername($username)
->whereIsPrivate(false)
->firstOrFail();
if(isset($obj['type']) && $obj['type'] === 'Delete') {
dispatch(new DeleteWorker($headers, $payload))->onQueue('delete');
} else {
dispatch(new InboxWorker($headers, $payload))->onQueue('high');
}
return;
}
if($profile->status != null) {
abort(404);
}
public function userFollowing(Request $request, $username)
{
abort_if(!config_cache('federation.activitypub.enabled'), 404);
$obj = [
'@context' => 'https://www.w3.org/ns/activitystreams',
'id' => $request->getUri(),
'type' => 'OrderedCollectionPage',
'totalItems' => 0,
'orderedItems' => []
];
return response()->json($obj);
}
$profile = Profile::whereNull('remote_url')
->whereUsername($username)
->whereIsPrivate(false)
->firstOrFail();
public function userFollowers(Request $request, $username)
{
abort_if(!config_cache('federation.activitypub.enabled'), 404);
if($profile->status != null) {
abort(404);
}
$profile = Profile::whereNull('remote_url')
->whereUsername($username)
->whereIsPrivate(false)
->firstOrFail();
$obj = [
'@context' => 'https://www.w3.org/ns/activitystreams',
'id' => $request->getUri(),
'type' => 'OrderedCollectionPage',
'totalItems' => 0,
'orderedItems' => []
];
return response()->json($obj);
}
if($profile->status != null) {
abort(404);
}
public function userFollowers(Request $request, $username)
{
abort_if(!config_cache('federation.activitypub.enabled'), 404);
$obj = [
'@context' => 'https://www.w3.org/ns/activitystreams',
'id' => $request->getUri(),
'type' => 'OrderedCollectionPage',
'totalItems' => 0,
'orderedItems' => []
];
$profile = Profile::whereNull('remote_url')
->whereUsername($username)
->whereIsPrivate(false)
->firstOrFail();
return response()->json($obj);
}
if($profile->status != null) {
abort(404);
}
$obj = [
'@context' => 'https://www.w3.org/ns/activitystreams',
'id' => $request->getUri(),
'type' => 'OrderedCollectionPage',
'totalItems' => 0,
'orderedItems' => []
];
return response()->json($obj);
}
}

View file

@ -50,7 +50,7 @@ class DeleteWorker implements ShouldQueue
$payload = json_decode($this->payload, true, 8);
if(isset($payload['id'])) {
$lockKey = hash('sha256', $payload['id']);
$lockKey = 'pf:ap:del-lock:' . hash('sha256', $payload['id']);
if(Cache::get($lockKey) !== null) {
// Job processed already
return 1;
@ -116,6 +116,18 @@ class DeleteWorker implements ShouldQueue
return;
}
$profile = null;
if($this->verifySignature($headers, $payload) == true) {
(new Inbox($headers, $profile, $payload))->handle();
return;
} else if($this->blindKeyRotation($headers, $payload) == true) {
(new Inbox($headers, $profile, $payload))->handle();
return;
} else {
return;
}
}
protected function verifySignature($headers, $payload)

View file

@ -2,190 +2,191 @@
return [
/*
|--------------------------------------------------------------------------
| Horizon Domain
|--------------------------------------------------------------------------
|
| This is the subdomain where Horizon will be accessible from. If this
| setting is null, Horizon will reside under the same domain as the
| application. Otherwise, this value will serve as the subdomain.
|
*/
/*
|--------------------------------------------------------------------------
| Horizon Domain
|--------------------------------------------------------------------------
|
| This is the subdomain where Horizon will be accessible from. If this
| setting is null, Horizon will reside under the same domain as the
| application. Otherwise, this value will serve as the subdomain.
|
*/
'domain' => null,
'domain' => null,
/*
|--------------------------------------------------------------------------
| Horizon Path
|--------------------------------------------------------------------------
|
| This is the URI path where Horizon will be accessible from. Feel free
| to change this path to anything you like. Note that the URI will not
| affect the paths of its internal API that aren't exposed to users.
|
*/
/*
|--------------------------------------------------------------------------
| Horizon Path
|--------------------------------------------------------------------------
|
| This is the URI path where Horizon will be accessible from. Feel free
| to change this path to anything you like. Note that the URI will not
| affect the paths of its internal API that aren't exposed to users.
|
*/
'path' => 'horizon',
'path' => 'horizon',
/*
|--------------------------------------------------------------------------
| Horizon Redis Connection
|--------------------------------------------------------------------------
|
| This is the name of the Redis connection where Horizon will store the
| meta information required for it to function. It includes the list
| of supervisors, failed jobs, job metrics, and other information.
|
*/
/*
|--------------------------------------------------------------------------
| Horizon Redis Connection
|--------------------------------------------------------------------------
|
| This is the name of the Redis connection where Horizon will store the
| meta information required for it to function. It includes the list
| of supervisors, failed jobs, job metrics, and other information.
|
*/
'use' => 'default',
'use' => 'default',
/*
|--------------------------------------------------------------------------
| Horizon Redis Prefix
|--------------------------------------------------------------------------
|
| This prefix will be used when storing all Horizon data in Redis. You
| may modify the prefix when you are running multiple installations
| of Horizon on the same server so that they don't have problems.
|
*/
/*
|--------------------------------------------------------------------------
| Horizon Redis Prefix
|--------------------------------------------------------------------------
|
| This prefix will be used when storing all Horizon data in Redis. You
| may modify the prefix when you are running multiple installations
| of Horizon on the same server so that they don't have problems.
|
*/
'prefix' => env('HORIZON_PREFIX', 'horizon-'.str_random(8).':'),
'prefix' => env('HORIZON_PREFIX', 'horizon-'.str_random(8).':'),
/*
|--------------------------------------------------------------------------
| Horizon Route Middleware
|--------------------------------------------------------------------------
|
| These middleware will get attached onto each Horizon route, giving you
| the chance to add your own middleware to this list or change any of
| the existing middleware. Or, you can simply stick with this list.
|
*/
/*
|--------------------------------------------------------------------------
| Horizon Route Middleware
|--------------------------------------------------------------------------
|
| These middleware will get attached onto each Horizon route, giving you
| the chance to add your own middleware to this list or change any of
| the existing middleware. Or, you can simply stick with this list.
|
*/
'middleware' => ['web'],
'middleware' => ['web'],
/*
|--------------------------------------------------------------------------
| Queue Wait Time Thresholds
|--------------------------------------------------------------------------
|
| This option allows you to configure when the LongWaitDetected event
| will be fired. Every connection / queue combination may have its
| own, unique threshold (in seconds) before this event is fired.
|
*/
/*
|--------------------------------------------------------------------------
| Queue Wait Time Thresholds
|--------------------------------------------------------------------------
|
| This option allows you to configure when the LongWaitDetected event
| will be fired. Every connection / queue combination may have its
| own, unique threshold (in seconds) before this event is fired.
|
*/
'waits' => [
'redis:feed' => 30,
'redis:default' => 30,
'redis:high' => 30,
],
'waits' => [
'redis:feed' => 30,
'redis:default' => 30,
'redis:high' => 30,
'redis:delete' => 30
],
/*
|--------------------------------------------------------------------------
| Job Trimming Times
|--------------------------------------------------------------------------
|
| Here you can configure for how long (in minutes) you desire Horizon to
| persist the recent and failed jobs. Typically, recent jobs are kept
| for one hour while all failed jobs are stored for an entire week.
|
*/
/*
|--------------------------------------------------------------------------
| Job Trimming Times
|--------------------------------------------------------------------------
|
| Here you can configure for how long (in minutes) you desire Horizon to
| persist the recent and failed jobs. Typically, recent jobs are kept
| for one hour while all failed jobs are stored for an entire week.
|
*/
'trim' => [
'recent' => 60,
'pending' => 60,
'completed' => 60,
'recent_failed' => 10080,
'failed' => 10080,
'monitored' => 10080,
],
'trim' => [
'recent' => 60,
'pending' => 60,
'completed' => 60,
'recent_failed' => 10080,
'failed' => 10080,
'monitored' => 10080,
],
/*
|--------------------------------------------------------------------------
| Metrics
|--------------------------------------------------------------------------
|
| Here you can configure how many snapshots should be kept to display in
| the metrics graph. This will get used in combination with Horizon's
| `horizon:snapshot` schedule to define how long to retain metrics.
|
*/
/*
|--------------------------------------------------------------------------
| Metrics
|--------------------------------------------------------------------------
|
| Here you can configure how many snapshots should be kept to display in
| the metrics graph. This will get used in combination with Horizon's
| `horizon:snapshot` schedule to define how long to retain metrics.
|
*/
'metrics' => [
'trim_snapshots' => [
'job' => 24,
'queue' => 24,
],
],
'metrics' => [
'trim_snapshots' => [
'job' => 24,
'queue' => 24,
],
],
/*
|--------------------------------------------------------------------------
| Fast Termination
|--------------------------------------------------------------------------
|
| When this option is enabled, Horizon's "terminate" command will not
| wait on all of the workers to terminate unless the --wait option
| is provided. Fast termination can shorten deployment delay by
| allowing a new instance of Horizon to start while the last
| instance will continue to terminate each of its workers.
|
*/
/*
|--------------------------------------------------------------------------
| Fast Termination
|--------------------------------------------------------------------------
|
| When this option is enabled, Horizon's "terminate" command will not
| wait on all of the workers to terminate unless the --wait option
| is provided. Fast termination can shorten deployment delay by
| allowing a new instance of Horizon to start while the last
| instance will continue to terminate each of its workers.
|
*/
'fast_termination' => false,
'fast_termination' => false,
/*
|--------------------------------------------------------------------------
| Memory Limit (MB)
|--------------------------------------------------------------------------
|
| This value describes the maximum amount of memory the Horizon worker
| may consume before it is terminated and restarted. You should set
| this value according to the resources available to your server.
|
*/
/*
|--------------------------------------------------------------------------
| Memory Limit (MB)
|--------------------------------------------------------------------------
|
| This value describes the maximum amount of memory the Horizon worker
| may consume before it is terminated and restarted. You should set
| this value according to the resources available to your server.
|
*/
'memory_limit' => 64,
'memory_limit' => 64,
/*
|--------------------------------------------------------------------------
| Queue Worker Configuration
|--------------------------------------------------------------------------
|
| Here you may define the queue worker settings used by your application
| in all environments. These supervisors and settings handle all your
| queued jobs and will be provisioned by Horizon during deployment.
|
*/
/*
|--------------------------------------------------------------------------
| Queue Worker Configuration
|--------------------------------------------------------------------------
|
| Here you may define the queue worker settings used by your application
| in all environments. These supervisors and settings handle all your
| queued jobs and will be provisioned by Horizon during deployment.
|
*/
'environments' => [
'production' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['high', 'default', 'feed'],
'balance' => 'auto',
'maxProcesses' => 20,
'memory' => 128,
'tries' => 3,
'nice' => 0,
],
],
'environments' => [
'production' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['high', 'default', 'feed', 'delete'],
'balance' => 'auto',
'maxProcesses' => 20,
'memory' => 128,
'tries' => 3,
'nice' => 0,
],
],
'local' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['high', 'default', 'feed'],
'balance' => 'auto',
'maxProcesses' => 20,
'memory' => 128,
'tries' => 3,
'nice' => 0,
],
],
],
'local' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['high', 'default', 'feed', 'delete'],
'balance' => 'auto',
'maxProcesses' => 20,
'memory' => 128,
'tries' => 3,
'nice' => 0,
],
],
],
'darkmode' => env('HORIZON_DARKMODE', false),
'darkmode' => env('HORIZON_DARKMODE', false),
];