Add StoryPipeline jobs

This commit is contained in:
Daniel Supernault 2021-09-03 21:18:33 -06:00
parent 0d8d6bc71e
commit c7a5715a60
No known key found for this signature in database
GPG key ID: 0DEF1C662C9033F7
8 changed files with 827 additions and 0 deletions

View file

@ -0,0 +1,136 @@
<?php
namespace App\Jobs\StoryPipeline;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Storage;
use App\Story;
use League\Fractal;
use League\Fractal\Serializer\ArraySerializer;
use App\Transformer\ActivityPub\Verb\DeleteStory;
use App\Util\ActivityPub\Helpers;
use GuzzleHttp\Pool;
use GuzzleHttp\Client;
use GuzzleHttp\Promise;
use App\Util\ActivityPub\HttpSignature;
use App\Services\FollowerService;
use App\Services\StoryService;
class StoryDelete implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $story;
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(Story $story)
{
$this->story = $story;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$story = $this->story;
if($story->local == false) {
return;
}
StoryService::removeRotateQueue($story->id);
StoryService::delLatest($story->profile_id);
StoryService::delById($story->id);
if(Storage::exists($story->path) == true) {
Storage::delete($story->path);
}
$story->views()->delete();
$profile = $story->profile;
$activity = [
'@context' => 'https://www.w3.org/ns/activitystreams',
'id' => $story->url() . '#delete',
'type' => 'Delete',
'actor' => $profile->permalink(),
'object' => [
'id' => $story->url(),
'type' => 'Story',
],
];
$this->fanoutExpiry($profile, $activity);
// delete notifications
// delete polls
// delete reports
$story->delete();
return;
}
protected function fanoutExpiry($profile, $activity)
{
$audience = FollowerService::softwareAudience($profile->id, 'pixelfed');
if(empty($audience)) {
// Return on profiles with no remote followers
return;
}
$payload = json_encode($activity);
$client = new Client([
'timeout' => config('federation.activitypub.delivery.timeout')
]);
$requests = function($audience) use ($client, $activity, $profile, $payload) {
foreach($audience as $url) {
$headers = HttpSignature::sign($profile, $url, $activity);
yield function() use ($client, $url, $headers, $payload) {
return $client->postAsync($url, [
'curl' => [
CURLOPT_HTTPHEADER => $headers,
CURLOPT_POSTFIELDS => $payload,
CURLOPT_HEADER => true
]
]);
};
}
};
$pool = new Pool($client, $requests($audience), [
'concurrency' => config('federation.activitypub.delivery.concurrency'),
'fulfilled' => function ($response, $index) {
},
'rejected' => function ($reason, $index) {
}
]);
$promise = $pool->promise();
$promise->wait();
}
}

View file

@ -0,0 +1,169 @@
<?php
namespace App\Jobs\StoryPipeline;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Storage;
use App\Story;
use League\Fractal;
use League\Fractal\Serializer\ArraySerializer;
use App\Transformer\ActivityPub\Verb\DeleteStory;
use App\Util\ActivityPub\Helpers;
use GuzzleHttp\Pool;
use GuzzleHttp\Client;
use GuzzleHttp\Promise;
use App\Util\ActivityPub\HttpSignature;
use App\Services\FollowerService;
use App\Services\StoryService;
class StoryExpire implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $story;
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(Story $story)
{
$this->story = $story;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$story = $this->story;
if($story->local == false) {
$this->handleRemoteExpiry();
return;
}
if($story->active == false) {
return;
}
if($story->expires_at->gt(now())) {
return;
}
$story->active = false;
$story->save();
$this->rotateMediaPath();
$this->fanoutExpiry();
StoryService::delLatest($story->profile_id);
}
protected function rotateMediaPath()
{
$story = $this->story;
$date = date('Y').date('m');
$old = $story->path;
$base = "story_archives/{$story->profile_id}/{$date}/";
$paths = explode('/', $old);
$path = array_pop($paths);
$newPath = $base . $path;
if(Storage::exists($old) == true) {
$dir = implode('/', $paths);
Storage::move($old, $newPath);
Storage::delete($old);
$story->bearcap_token = null;
$story->path = $newPath;
$story->save();
Storage::deleteDirectory($dir);
}
}
protected function fanoutExpiry()
{
$story = $this->story;
$profile = $story->profile;
if($story->local == false || $story->remote_url) {
return;
}
$audience = FollowerService::softwareAudience($story->profile_id, 'pixelfed');
if(empty($audience)) {
// Return on profiles with no remote followers
return;
}
$fractal = new Fractal\Manager();
$fractal->setSerializer(new ArraySerializer());
$resource = new Fractal\Resource\Item($story, new DeleteStory());
$activity = $fractal->createData($resource)->toArray();
$payload = json_encode($activity);
$client = new Client([
'timeout' => config('federation.activitypub.delivery.timeout')
]);
$requests = function($audience) use ($client, $activity, $profile, $payload) {
foreach($audience as $url) {
$headers = HttpSignature::sign($profile, $url, $activity);
yield function() use ($client, $url, $headers, $payload) {
return $client->postAsync($url, [
'curl' => [
CURLOPT_HTTPHEADER => $headers,
CURLOPT_POSTFIELDS => $payload,
CURLOPT_HEADER => true
]
]);
};
}
};
$pool = new Pool($client, $requests($audience), [
'concurrency' => config('federation.activitypub.delivery.concurrency'),
'fulfilled' => function ($response, $index) {
},
'rejected' => function ($reason, $index) {
}
]);
$promise = $pool->promise();
$promise->wait();
}
protected function handleRemoteExpiry()
{
$story = $this->story;
$story->active = false;
$story->save();
$path = $story->path;
if(Storage::exists($path) == true) {
Storage::delete($path);
}
$story->views()->delete();
$story->delete();
}
}

View file

@ -0,0 +1,107 @@
<?php
namespace App\Jobs\StoryPipeline;
use Cache, Log;
use App\Story;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use League\Fractal;
use League\Fractal\Serializer\ArraySerializer;
use App\Transformer\ActivityPub\Verb\CreateStory;
use App\Util\ActivityPub\Helpers;
use GuzzleHttp\Pool;
use GuzzleHttp\Client;
use GuzzleHttp\Promise;
use App\Util\ActivityPub\HttpSignature;
use App\Services\FollowerService;
use App\Services\StoryService;
class StoryFanout implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $story;
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(Story $story)
{
$this->story = $story;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$story = $this->story;
$profile = $story->profile;
if($story->local == false || $story->remote_url) {
return;
}
StoryService::delLatest($story->profile_id);
$audience = FollowerService::softwareAudience($story->profile_id, 'pixelfed');
if(empty($audience)) {
// Return on profiles with no remote followers
return;
}
$fractal = new Fractal\Manager();
$fractal->setSerializer(new ArraySerializer());
$resource = new Fractal\Resource\Item($story, new CreateStory());
$activity = $fractal->createData($resource)->toArray();
$payload = json_encode($activity);
$client = new Client([
'timeout' => config('federation.activitypub.delivery.timeout')
]);
$requests = function($audience) use ($client, $activity, $profile, $payload) {
foreach($audience as $url) {
$headers = HttpSignature::sign($profile, $url, $activity);
yield function() use ($client, $url, $headers, $payload) {
return $client->postAsync($url, [
'curl' => [
CURLOPT_HTTPHEADER => $headers,
CURLOPT_POSTFIELDS => $payload,
CURLOPT_HEADER => true
]
]);
};
}
};
$pool = new Pool($client, $requests($audience), [
'concurrency' => config('federation.activitypub.delivery.concurrency'),
'fulfilled' => function ($response, $index) {
},
'rejected' => function ($reason, $index) {
}
]);
$promise = $pool->promise();
$promise->wait();
}
}

View file

@ -0,0 +1,144 @@
<?php
namespace App\Jobs\StoryPipeline;
use Cache, Log;
use App\Story;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Util\ActivityPub\Helpers;
use App\Services\FollowerService;
use App\Util\Lexer\Bearcap;
use Illuminate\Support\Facades\Http;
use Illuminate\Http\Client\RequestException;
use Illuminate\Http\Client\ConnectionException;
use App\Util\ActivityPub\Validator\StoryValidator;
use App\Services\StoryService;
use App\Services\MediaPathService;
use Illuminate\Support\Str;
use Illuminate\Http\File;
use Illuminate\Support\Facades\Storage;
class StoryFetch implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $activity;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($activity)
{
$this->activity = $activity;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$activity = $this->activity;
$activityId = $activity['id'];
$activityActor = $activity['actor'];
if(parse_url($activityId, PHP_URL_HOST) !== parse_url($activityActor, PHP_URL_HOST)) {
return;
}
$bearcap = Bearcap::decode($activity['object']['object']);
if(!$bearcap) {
return;
}
$url = $bearcap['url'];
$token = $bearcap['token'];
if(parse_url($activityId, PHP_URL_HOST) !== parse_url($url, PHP_URL_HOST)) {
return;
}
$version = config('pixelfed.version');
$appUrl = config('app.url');
$headers = [
'Accept' => 'application/json',
'Authorization' => 'Bearer ' . $token,
'User-Agent' => "(Pixelfed/{$version}; +{$appUrl})",
];
try {
$res = Http::withHeaders($headers)
->timeout(30)
->get($url);
} catch (RequestException $e) {
return false;
} catch (ConnectionException $e) {
return false;
} catch (\Exception $e) {
return false;
}
$payload = $res->json();
if(StoryValidator::validate($payload) == false) {
return;
}
if(Helpers::validateUrl($payload['attachment']['url']) == false) {
return;
}
$type = $payload['attachment']['type'] == 'Image' ? 'photo' : 'video';
$profile = Helpers::profileFetch($payload['attributedTo']);
$ext = pathinfo($payload['attachment']['url'], PATHINFO_EXTENSION);
$storagePath = MediaPathService::story($profile);
$fileName = Str::random(random_int(2, 12)) . '_' . Str::random(random_int(32, 35)) . '_' . Str::random(random_int(1, 14)) . '.' . $ext;
$contextOptions = [
'ssl' => [
'verify_peer' => false,
'verify_peername' => false
]
];
$ctx = stream_context_create($contextOptions);
$data = file_get_contents($payload['attachment']['url'], false, $ctx);
$tmpBase = storage_path('app/remcache/');
$tmpPath = $profile->id . '-' . $fileName;
$tmpName = $tmpBase . $tmpPath;
file_put_contents($tmpName, $data);
$disk = Storage::disk(config('filesystems.default'));
$path = $disk->putFileAs($storagePath, new File($tmpName), $fileName, 'public');
$size = filesize($tmpName);
unlink($tmpName);
$story = new Story;
$story->profile_id = $profile->id;
$story->object_id = $payload['id'];
$story->size = $size;
$story->mime = $payload['attachment']['mediaType'];
$story->duration = $payload['duration'];
$story->media_url = $payload['attachment']['url'];
$story->type = $type;
$story->public = false;
$story->local = false;
$story->active = true;
$story->path = $path;
$story->view_count = 0;
$story->can_reply = $payload['can_reply'];
$story->can_react = $payload['can_react'];
$story->created_at = now()->parse($payload['published']);
$story->expires_at = now()->parse($payload['expiresAt']);
$story->save();
StoryService::delLatest($story->profile_id);
}
}

View file

@ -0,0 +1,70 @@
<?php
namespace App\Jobs\StoryPipeline;
use App\Story;
use App\Status;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Util\ActivityPub\Helpers;
class StoryReactionDeliver implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $story;
protected $status;
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(Story $story, Status $status)
{
$this->story = $story;
$this->status = $status;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$story = $this->story;
$status = $this->status;
if($story->local == true) {
return;
}
$target = $story->profile;
$actor = $status->profile;
$to = $target->inbox_url;
$payload = [
'@context' => 'https://www.w3.org/ns/activitystreams',
'id' => $status->permalink(),
'type' => 'Story:Reaction',
'to' => $target->permalink(),
'actor' => $actor->permalink(),
'content' => $status->caption,
'inReplyTo' => $story->object_id,
'published' => $status->created_at->toAtomString()
];
Helpers::sendSignedObject($actor, $to, $payload);
}
}

View file

@ -0,0 +1,70 @@
<?php
namespace App\Jobs\StoryPipeline;
use App\Story;
use App\Status;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Util\ActivityPub\Helpers;
class StoryReplyDeliver implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $story;
protected $status;
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(Story $story, Status $status)
{
$this->story = $story;
$this->status = $status;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$story = $this->story;
$status = $this->status;
if($story->local == true) {
return;
}
$target = $story->profile;
$actor = $status->profile;
$to = $target->inbox_url;
$payload = [
'@context' => 'https://www.w3.org/ns/activitystreams',
'id' => $status->permalink(),
'type' => 'Story:Reply',
'to' => $target->permalink(),
'actor' => $actor->permalink(),
'content' => $status->caption,
'inReplyTo' => $story->object_id,
'published' => $status->created_at->toAtomString()
];
Helpers::sendSignedObject($actor, $to, $payload);
}
}

View file

@ -0,0 +1,61 @@
<?php
namespace App\Jobs\StoryPipeline;
use Illuminate\Support\Facades\Storage;
use Illuminate\Support\Str;
use App\Story;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Util\ActivityPub\Helpers;
class StoryRotateMedia implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $story;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(Story $story)
{
$this->story = $story;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$story = $this->story;
if($story->local == false) {
return;
}
$paths = explode('/', $story->path);
$name = array_pop($paths);
$oldPath = $story->path;
$ext = pathinfo($name, PATHINFO_EXTENSION);
$new = Str::random(13) . '_' . Str::random(24) . '_' . Str::random(3) . '.' . $ext;
array_push($paths, $new);
$newPath = implode('/', $paths);
if(Storage::exists($oldPath)) {
Storage::copy($oldPath, $newPath);
$story->path = $newPath;
$story->bearcap_token = null;
$story->save();
Storage::delete($oldPath);
}
}
}

View file

@ -0,0 +1,70 @@
<?php
namespace App\Jobs\StoryPipeline;
use App\Story;
use App\Profile;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use App\Util\ActivityPub\Helpers;
class StoryViewDeliver implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $story;
protected $profile;
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(Story $story, Profile $profile)
{
$this->story = $story;
$this->profile = $profile;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$story = $this->story;
if($story->local == true) {
return;
}
$actor = $this->profile;
$target = $story->profile;
$to = $target->inbox_url;
$payload = [
'@context' => 'https://www.w3.org/ns/activitystreams',
'id' => $actor->permalink('#stories/' . $story->id . '/view'),
'type' => 'View',
'to' => $target->permalink(),
'actor' => $actor->permalink(),
'object' => [
'type' => 'Story',
'object' => $story->object_id
]
];
Helpers::sendSignedObject($actor, $to, $payload);
}
}