これができると嬉しいこと

サーバとクライアントがひとつのプロセスにまとまってクローラー的なプロセスが必要なくなり、監視せずに済むので地味に嬉しい

概要

言われてみれば何だ、という程度ではあるけれども、app.psgiで Twiggy::Server を使い、コネクションを AE::timer で監視し、AE::cv->recv; するという方法をつかえば Tatsumaki::MessageQueue も使えて万々歳。これがなかなか思いつかなかった…

思いつかなかった言い訳

Twitter の userstream は時々切れるので再接続の処理をする必要があった。今までずっとクローラーで while ループ回したりしてたけれども、 while を使って接続が切れるごとに ->recv してるとサーバと一緒に動かすことができなくて悶々とした日々を過ごすこと数ヶ月。

サーバでは builder {} を return していたので、当然 while なんて挟めなかった。

ソース

必要な部分だけ切り取り

use strict;

package PollHandler;
use parent 'Tatsumaki::Handler';
__PACKAGE__->asynchronous(1);
use Tatsumaki::MessageQueue;

sub get {
    my $self = shift;

    my $mq = Tatsumaki::MessageQueue->instance('tweet');
    my $client_id = $self->request->param('client_id')
        or Tatsumaki::Error::HTTP->throw(500, 'client_id is required');
    $mq->poll_once($client_id, sub { $self->on_new_event(@_) });
}

sub on_new_event {
    my $self = shift;
    $self->write(\@_);
    $self->finish;
}

package main;
use AE;
use Plack::Builder;
use Tatsumaki::Application;
use Twiggy::Server;
use AnyEvent::Twitter::Stream;

my $app = Tatsumaki::Application->new([ '/poll' => 'PollHandler' ]);

my $mapp = builder {
    # enable etc...
    $app;
};

my $server = Twiggy::Server->new(
    host => 'example.com',
    port => 5000,
)->register_service($mapp);

my $STREAM_CONN; # we need $STREAM_CONN to keep the availability of stream connection
my $LISTENER;    # we have to declare $LISTENER here, because variables in the timer callback scope are not visible from here

my $w; $w = AE::timer 1, 10, sub {
    return if $STREAM_CONN;

    my $oauth = { ... };

    say "ALERT: wake up";
    $LISTENER = AnyEvent::Twitter::Stream->new(
        %$oauth,
        method     => 'userstream',
        on_error   => sub { $STREAM_CONN = 0; },
        on_eof     => sub { $STREAM_CONN = 0; },
        on_connect => sub { $STREAM_CONN = 1; },
        on_tweet   => sub {
            my $tweet = shift;

            my $mq = Tatsumaki::MessageQueue->instance('tweet');
            $mq->publish({ type => 'tweet', tweet => $tweet });
            $STREAM_CONN = 1;
        }
    );
};

AE::cv->recv;

補足

AnyEvent::HTTP の変更によって AE::T::Stream 0.20 は動かないが、 AE::T::Stream の DEVELOPER RELEASE (0.20_1) だと chunked encoding 周りが fix されているので上げるべし