Adding multicore support.
This commit is contained in:
parent
6c44a9bb43
commit
4450ba7892
|
@ -9,7 +9,10 @@ use feature 'signatures';
|
|||
use Moo;
|
||||
use Mojo::UserAgent;
|
||||
use Mojo::IOLoop;
|
||||
use Mojo::IOLoop::Subprocess;
|
||||
use Mojo::Promise;
|
||||
use Time::HiRes qw/usleep/;
|
||||
use Data::Dumper;
|
||||
|
||||
use TgMagicPdf::PdfBuilder;
|
||||
|
||||
|
@ -21,6 +24,11 @@ has _ua => ( is => 'lazy', );
|
|||
|
||||
has _last_offset_update => ( is => 'rw', );
|
||||
|
||||
has _used_cores => ( is => 'rw', default => sub { 0 } );
|
||||
has _cores => ( is => 'rw', default => sub { `nproc` } );
|
||||
has _updates => ( is => 'rw' );
|
||||
has _ioloop => ( is => 'rw' );
|
||||
|
||||
sub _build__token ($self) {
|
||||
require TgMagicPdf;
|
||||
return TgMagicPdf->new->config->{telegram}{token};
|
||||
|
@ -28,73 +36,63 @@ sub _build__token ($self) {
|
|||
|
||||
sub run ($self) {
|
||||
$self->_dispatch_updates;
|
||||
my $promise_dispatch;
|
||||
while (1) {
|
||||
if ( defined $promise_dispatch ) {
|
||||
$promise_dispatch->wait;
|
||||
}
|
||||
$promise_dispatch = $self->_dispatch_updates;
|
||||
}
|
||||
}
|
||||
|
||||
sub run_recursive ($self) {
|
||||
$self->_dispatch_updates.then(sub {
|
||||
$self->run_recursive;
|
||||
})->catch(sub ($err) {
|
||||
warn $err;
|
||||
$self->run_recursive;
|
||||
});
|
||||
}
|
||||
|
||||
sub _dispatch_updates ($self) {
|
||||
my $updates_p = $self->_get_updates;
|
||||
my $promise = Mojo::Promise->new;
|
||||
$updates_p->then(
|
||||
sub ($res) {
|
||||
my $updates = $res->result->json->{result};
|
||||
$self->_dispatch_updates_one_to_one($promise, $updates);
|
||||
}
|
||||
)->catch(
|
||||
sub ($err) {
|
||||
$promise->resolve;
|
||||
warn $err;
|
||||
}
|
||||
);
|
||||
return $promise;
|
||||
}
|
||||
|
||||
sub _dispatch_updates_one_to_one($self, $promise, $updates) {
|
||||
my $update = shift @$updates;
|
||||
if (!defined $update) {
|
||||
$promise->resolve;
|
||||
Mojo::IOLoop->recurring(0.5, sub {
|
||||
if ( $self->_used_cores >= $self->_cores ) {
|
||||
return;
|
||||
}
|
||||
$self->_dispatch_update($update)->then(sub {
|
||||
$self->_dispatch_updates_one_to_one($promise, $updates);
|
||||
})->catch(sub($err){
|
||||
$self->_dispatch_updates_one_to_one($promise, $updates);
|
||||
warn $err;
|
||||
for ( my $i = $self->_used_cores ; $i <= $self->_cores ; $i++ ) {
|
||||
if ( !defined $self->_updates
|
||||
|| scalar @{ $self->_updates } == 0 )
|
||||
{
|
||||
my $res = $self->_get_updates;
|
||||
my $updates = $res->json->{result};
|
||||
$self->_updates($updates);
|
||||
}
|
||||
my $update = shift $self->_updates->@*;
|
||||
if ( !defined $update ) {
|
||||
last;
|
||||
}
|
||||
$self->_used_cores( $self->_used_cores + 1 );
|
||||
Mojo::IOLoop->subprocess->run(
|
||||
sub {
|
||||
say "Proccess $i active";
|
||||
$self->_dispatch_update($update)->then(
|
||||
sub {
|
||||
my $message = $update->{message};
|
||||
say 'Generated pdf for '
|
||||
. (Data::Dumper::Dumper $message->{from}) . ' '
|
||||
. $message->{date};
|
||||
Mojo::IOLoop->stop;
|
||||
}
|
||||
);
|
||||
Mojo::IOLoop->start;
|
||||
return;
|
||||
},
|
||||
sub {
|
||||
say "Proccess $i ended";
|
||||
$self->_used_cores( $self->_used_cores - 1 );
|
||||
}
|
||||
);
|
||||
say $i;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
sub _dispatch_update ( $self, $update ) {
|
||||
my $promise = Mojo::Promise->new;;
|
||||
my $promise = Mojo::Promise->new;
|
||||
{
|
||||
if ( !defined $self->_last_offset_update
|
||||
|| $self->_last_offset_update < $update->{update_id} )
|
||||
{
|
||||
$self->_last_offset_update( $update->{update_id} );
|
||||
}
|
||||
if ( !defined $update->{message} ) {
|
||||
$promise->resolve;
|
||||
next;
|
||||
}
|
||||
$self->_handle_message( $update->{message} )->then(sub {
|
||||
$self->_handle_message( $update->{message} )->finally(
|
||||
sub {
|
||||
$promise->resolve;
|
||||
})->catch(sub {
|
||||
$promise->resolve;
|
||||
});
|
||||
}
|
||||
);
|
||||
}
|
||||
return $promise;
|
||||
}
|
||||
|
@ -102,7 +100,12 @@ sub _dispatch_update ( $self, $update ) {
|
|||
sub sendMessage ( $self, $chat_id, $text ) {
|
||||
my $url = "@{[$self->_tg_root]}/sendMessage";
|
||||
my $ua = $self->_ua;
|
||||
return $ua->post_p( $url, json => { chat_id => $chat_id, text => $text } );
|
||||
if ( defined $self->_ioloop ) {
|
||||
$ua->ioloop( $self->_ioloop );
|
||||
}
|
||||
my $promise =
|
||||
$ua->post_p( $url, json => { chat_id => $chat_id, text => $text } );
|
||||
return $promise;
|
||||
}
|
||||
|
||||
sub _handle_message ( $self, $message ) {
|
||||
|
@ -120,13 +123,17 @@ sub _handle_message ( $self, $message ) {
|
|||
next;
|
||||
}
|
||||
$self->sendMessage( $chat_id,
|
||||
'Got your message, attempting to generate a pdf.' )->then(sub {
|
||||
'Got your message, attempting to generate a pdf.' )->then(
|
||||
sub {
|
||||
my $pdf_builder = TgMagicPdf::PdfBuilder->new;
|
||||
$pdf_builder->from_text($text)->then(
|
||||
sub ($pdf) {
|
||||
$self->sendDocument( $chat_id, 'mtgprint.pdf', $pdf )->then(sub {;
|
||||
$self->sendDocument( $chat_id, 'mtgprint.pdf', $pdf )
|
||||
->then(
|
||||
sub {
|
||||
$promise->resolve;
|
||||
});
|
||||
}
|
||||
);
|
||||
}
|
||||
)->catch(
|
||||
sub ($err) {
|
||||
|
@ -145,25 +152,29 @@ sub _handle_message ( $self, $message ) {
|
|||
if ( -1 != index $err,
|
||||
$TgMagicPdf::PdfBuilder::ERR_INVALID_CARD )
|
||||
{
|
||||
$error_to_user .= ' ' . $pdf_builder->last_invalid_card;
|
||||
$error_to_user .=
|
||||
' ' . $pdf_builder->last_invalid_card;
|
||||
}
|
||||
if ( -1 != index $err,
|
||||
$TgMagicPdf::PdfBuilder::ERR_UNABLE_TO_FIND_IMAGE )
|
||||
$TgMagicPdf::PdfBuilder::ERR_UNABLE_TO_FIND_IMAGE
|
||||
)
|
||||
{
|
||||
$error_to_user .= ' ' . $pdf_builder->last_invalid_card;
|
||||
$error_to_user .=
|
||||
' ' . $pdf_builder->last_invalid_card;
|
||||
}
|
||||
$self->sendMessage( $chat_id,
|
||||
'I could not process your deck take a look to this details: '
|
||||
'I could not process your deck take a look to this details: '
|
||||
. $error_to_user )->wait;
|
||||
return;
|
||||
}
|
||||
warn $err;
|
||||
$self->sendMessage( $chat_id,
|
||||
'I could not process your deck because of a server error' )
|
||||
->wait;
|
||||
'I could not process your deck because of a server error'
|
||||
)->wait;
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
)->catch( sub ($err) { say $err } );
|
||||
}
|
||||
return $promise;
|
||||
}
|
||||
|
@ -190,9 +201,8 @@ sub _pdf_builder {
|
|||
}
|
||||
|
||||
sub _build__ua ($self) {
|
||||
my $ua = Mojo::UserAgent->new->with_roles('+Queued');
|
||||
$ua->max_active(5);
|
||||
$ua->inactivity_timeout(60);
|
||||
my $ua = Mojo::UserAgent->new;
|
||||
$ua->inactivity_timeout(5);
|
||||
return $ua;
|
||||
}
|
||||
|
||||
|
@ -203,7 +213,15 @@ sub _get_updates ($self) {
|
|||
if ( defined $self->_last_offset_update ) {
|
||||
$params{offset} = $self->_last_offset_update + 1;
|
||||
}
|
||||
return $ua->post_p( $url, json => {%params} );
|
||||
my $res = $ua->post( $url, json => {%params} )->result;
|
||||
for my $update ( $res->json->{result}->@* ) {
|
||||
if ( !defined $self->_last_offset_update
|
||||
|| $self->_last_offset_update < $update->{update_id} )
|
||||
{
|
||||
$self->_last_offset_update( $update->{update_id} );
|
||||
}
|
||||
}
|
||||
return $res;
|
||||
}
|
||||
|
||||
sub _build__tg_root ($self) {
|
||||
|
|
Loading…
Reference in New Issue