From 4450ba7892d9987ff7787787bf52f9ff7be74b09 Mon Sep 17 00:00:00 2001 From: Sergiotarxz Date: Thu, 23 May 2024 20:15:39 +0200 Subject: [PATCH] Adding multicore support. --- lib/TgMagicPdf/Telegram.pm | 212 ++++++++++++++++++++----------------- 1 file changed, 115 insertions(+), 97 deletions(-) diff --git a/lib/TgMagicPdf/Telegram.pm b/lib/TgMagicPdf/Telegram.pm index 7832545..0262d8b 100644 --- a/lib/TgMagicPdf/Telegram.pm +++ b/lib/TgMagicPdf/Telegram.pm @@ -9,7 +9,10 @@ use feature 'signatures'; use Moo; use Mojo::UserAgent; use Mojo::IOLoop; +use Mojo::IOLoop::Subprocess; use Mojo::Promise; +use Time::HiRes qw/usleep/; +use Data::Dumper; use TgMagicPdf::PdfBuilder; @@ -21,6 +24,11 @@ has _ua => ( is => 'lazy', ); has _last_offset_update => ( is => 'rw', ); +has _used_cores => ( is => 'rw', default => sub { 0 } ); +has _cores => ( is => 'rw', default => sub { `nproc` } ); +has _updates => ( is => 'rw' ); +has _ioloop => ( is => 'rw' ); + sub _build__token ($self) { require TgMagicPdf; return TgMagicPdf->new->config->{telegram}{token}; @@ -28,73 +36,63 @@ sub _build__token ($self) { sub run ($self) { $self->_dispatch_updates; - my $promise_dispatch; - while (1) { - if ( defined $promise_dispatch ) { - $promise_dispatch->wait; - } - $promise_dispatch = $self->_dispatch_updates; - } -} - -sub run_recursive ($self) { - $self->_dispatch_updates.then(sub { - $self->run_recursive; - })->catch(sub ($err) { - warn $err; - $self->run_recursive; - }); } sub _dispatch_updates ($self) { - my $updates_p = $self->_get_updates; - my $promise = Mojo::Promise->new; - $updates_p->then( - sub ($res) { - my $updates = $res->result->json->{result}; - $self->_dispatch_updates_one_to_one($promise, $updates); + Mojo::IOLoop->recurring(0.5, sub { + if ( $self->_used_cores >= $self->_cores ) { + return; } - )->catch( - sub ($err) { - $promise->resolve; - warn $err; + for ( my $i = $self->_used_cores ; $i <= $self->_cores ; $i++ ) { + if ( !defined $self->_updates + || scalar @{ $self->_updates } == 0 ) + { + my $res = $self->_get_updates; + my $updates = $res->json->{result}; + $self->_updates($updates); + } + my $update = shift $self->_updates->@*; + if ( !defined $update ) { + last; + } + $self->_used_cores( $self->_used_cores + 1 ); + Mojo::IOLoop->subprocess->run( + sub { + say "Proccess $i active"; + $self->_dispatch_update($update)->then( + sub { + my $message = $update->{message}; + say 'Generated pdf for ' + . (Data::Dumper::Dumper $message->{from}) . ' ' + . $message->{date}; + Mojo::IOLoop->stop; + } + ); + Mojo::IOLoop->start; + return; + }, + sub { + say "Proccess $i ended"; + $self->_used_cores( $self->_used_cores - 1 ); + } + ); + say $i; } - ); - return $promise; -} - -sub _dispatch_updates_one_to_one($self, $promise, $updates) { - my $update = shift @$updates; - if (!defined $update) { - $promise->resolve; - return; - } - $self->_dispatch_update($update)->then(sub { - $self->_dispatch_updates_one_to_one($promise, $updates); - })->catch(sub($err){ - $self->_dispatch_updates_one_to_one($promise, $updates); - warn $err; }); } - sub _dispatch_update ( $self, $update ) { - my $promise = Mojo::Promise->new;; + my $promise = Mojo::Promise->new; { - if ( !defined $self->_last_offset_update - || $self->_last_offset_update < $update->{update_id} ) - { - $self->_last_offset_update( $update->{update_id} ); - } if ( !defined $update->{message} ) { $promise->resolve; - next; + next; } - $self->_handle_message( $update->{message} )->then(sub { - $promise->resolve; - })->catch(sub { - $promise->resolve; - }); + $self->_handle_message( $update->{message} )->finally( + sub { + $promise->resolve; + } + ); } return $promise; } @@ -102,7 +100,12 @@ sub _dispatch_update ( $self, $update ) { sub sendMessage ( $self, $chat_id, $text ) { my $url = "@{[$self->_tg_root]}/sendMessage"; my $ua = $self->_ua; - return $ua->post_p( $url, json => { chat_id => $chat_id, text => $text } ); + if ( defined $self->_ioloop ) { + $ua->ioloop( $self->_ioloop ); + } + my $promise = + $ua->post_p( $url, json => { chat_id => $chat_id, text => $text } ); + return $promise; } sub _handle_message ( $self, $message ) { @@ -120,50 +123,58 @@ sub _handle_message ( $self, $message ) { next; } $self->sendMessage( $chat_id, - 'Got your message, attempting to generate a pdf.' )->then(sub { - my $pdf_builder = TgMagicPdf::PdfBuilder->new; - $pdf_builder->from_text($text)->then( - sub ($pdf) { - $self->sendDocument( $chat_id, 'mtgprint.pdf', $pdf )->then(sub {; + 'Got your message, attempting to generate a pdf.' )->then( + sub { + my $pdf_builder = TgMagicPdf::PdfBuilder->new; + $pdf_builder->from_text($text)->then( + sub ($pdf) { + $self->sendDocument( $chat_id, 'mtgprint.pdf', $pdf ) + ->then( + sub { + $promise->resolve; + } + ); + } + )->catch( + sub ($err) { $promise->resolve; - }); - } - )->catch( - sub ($err) { - $promise->resolve; - my $match = 0; - for my $known_error ( - $TgMagicPdf::PdfBuilder::ERR_INVALID_CARD, - $TgMagicPdf::PdfBuilder::ERR_UNABLE_TO_FIND_IMAGE, - $TgMagicPdf::PdfBuilder::ERR_TOO_MANY_CARDS - ) - { - $match = 1 if ( index( $err, $known_error ) != -1 ); - } - if ($match) { - my $error_to_user .= $err =~ s/ at.*$//r; - if ( -1 != index $err, - $TgMagicPdf::PdfBuilder::ERR_INVALID_CARD ) + my $match = 0; + for my $known_error ( + $TgMagicPdf::PdfBuilder::ERR_INVALID_CARD, + $TgMagicPdf::PdfBuilder::ERR_UNABLE_TO_FIND_IMAGE, + $TgMagicPdf::PdfBuilder::ERR_TOO_MANY_CARDS + ) { - $error_to_user .= ' ' . $pdf_builder->last_invalid_card; + $match = 1 if ( index( $err, $known_error ) != -1 ); } - if ( -1 != index $err, - $TgMagicPdf::PdfBuilder::ERR_UNABLE_TO_FIND_IMAGE ) - { - $error_to_user .= ' ' . $pdf_builder->last_invalid_card; + if ($match) { + my $error_to_user .= $err =~ s/ at.*$//r; + if ( -1 != index $err, + $TgMagicPdf::PdfBuilder::ERR_INVALID_CARD ) + { + $error_to_user .= + ' ' . $pdf_builder->last_invalid_card; + } + if ( -1 != index $err, + $TgMagicPdf::PdfBuilder::ERR_UNABLE_TO_FIND_IMAGE + ) + { + $error_to_user .= + ' ' . $pdf_builder->last_invalid_card; + } + $self->sendMessage( $chat_id, +'I could not process your deck take a look to this details: ' + . $error_to_user )->wait; + return; } + warn $err; $self->sendMessage( $chat_id, - 'I could not process your deck take a look to this details: ' - . $error_to_user )->wait; - return; +'I could not process your deck because of a server error' + )->wait; } - warn $err; - $self->sendMessage( $chat_id, - 'I could not process your deck because of a server error' ) - ->wait; - } - ); - }); + ); + } + )->catch( sub ($err) { say $err } ); } return $promise; } @@ -190,9 +201,8 @@ sub _pdf_builder { } sub _build__ua ($self) { - my $ua = Mojo::UserAgent->new->with_roles('+Queued'); - $ua->max_active(5); - $ua->inactivity_timeout(60); + my $ua = Mojo::UserAgent->new; + $ua->inactivity_timeout(5); return $ua; } @@ -203,7 +213,15 @@ sub _get_updates ($self) { if ( defined $self->_last_offset_update ) { $params{offset} = $self->_last_offset_update + 1; } - return $ua->post_p( $url, json => {%params} ); + my $res = $ua->post( $url, json => {%params} )->result; + for my $update ( $res->json->{result}->@* ) { + if ( !defined $self->_last_offset_update + || $self->_last_offset_update < $update->{update_id} ) + { + $self->_last_offset_update( $update->{update_id} ); + } + } + return $res; } sub _build__tg_root ($self) {