ソモサン

私rohkiによる活動や読書の記録をつらつらと書くページです

AWS Kinesis ひたすら読みにいく k-iter の複数 shard 対応(と、Rust の Future/Stream/Tokio 話)

github.com

AWS Kinesis Stream Subscriber

USAGE:
    k-iter [FLAGS] [OPTIONS] --region <NAME> --stream-name <NAME>

FLAGS:
    -h, --help       Prints help information
    -V, --version    Prints version information
        --verbose    Enable verbose mode.

OPTIONS:
        --data-format <TYPE>       Set data output-format. [default: UTF8_STRING]  [possible values: RAW_BYTES,
                                   RAW_STRING, UTF8_STRING]
    -t, --iterator-type <TYPE>     Sets iterator type. [default: LATEST]  [possible values: LATEST, AT_SEQUENCE_NUMBER,
                                   AFTER_SEQUENCE_NUMBER, AT_TIMESTAMP, TRIM_HORIZON]
    -r, --region <NAME>            Sets a region name. [possible values: ap-northeast-1, ap-northeast-2, ap-south-1, ap-
                                   southeast-1, ap-southeast-2, ca-central-1, eu-central-1, eu-west-1, eu-west-2, eu-
                                   west-3, sa-east-1, us-east-1, us-east-2, us-west-1, us-west-2, us-gov-west-1, cn-
                                   north-1, cn-northwest-1]
        --sequence-number <NUM>    Set Sequence number when Iterator Type is AT_SEQUENCE_NUMBER or
                                   AFTER_SEQUENCE_NUMBER.
    -s, --shard-id <ID1 ID2>...    Set shard ids. If you don't set, iterate all shards
    -n, --stream-name <NAME>       Sets a stream name.
        --timestamp <TIMESTAMP>    Set timestamp(UNIX Epoch milliseconds) when Iterator Type is AT_TIMESTAMP.

-s, --shard-id ... Set shard ids. If you don't set, iterate all shards

でけたー。できましたよ。細々とやっとりました。
何も指定しなければ全 shard をにらみにいきますし、当然指定もできます。
複数の shard id も指定できます。あ、sequence-number も複数の方がいいかな? 確認大変そう。また今度にしよう。

つまったところ

futures::stream::Stream の実装

impl Stream for KinesisShardIterator {
    type Item = GetRecordsOutput;
    type Error = Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        if let Some(current) = &self.token {
            let r = GetRecordsInput {
                shard_iterator: current.clone(),
                ..Default::default()
            };

            self.client
                .get_records(r)
                .map(|r| {
                    self.token = r.next_shard_iterator.clone();
                    Async::Ready(Some(r))
                })
                .map_err(Into::into)
                .wait()
        } else {
            self.get_iterator_token()
                .map(|next| {
                    self.token = Some(next);
                    Async::NotReady
                })
                .map_err(Into::into)
        }
    }
}

このあたり ですな。
wait じゃあなくて poll つかって動かなかったり、map_or_else つかって型が合わなかったり、Iterator だけでいいんじゃあ…とあきらめかけたり、いろいろやってました。
というか、wait まわりはまだちゃんとわかってないです。動いたーで喜んで今書いてます。

Stream でかつ、Async::Ready(None) を返さなければひたすら動き続ける(という理解な)ので、呼び出し先で こう 書けます。

            let it = match iter_type {
                IteratorType::LATEST | IteratorType::TRIM_HORIZON => {
                    KinesisShardIterator::new(na, ia, ta, ra)
                }
                IteratorType::AT_SEQUENCE_NUMBER | IteratorType::AFTER_SEQUENCE_NUMBER => {
                    let seq = value_t_or_exit!(matches.value_of("sequence-number"), String);
                    KinesisShardIterator::new_with_sequence_number(na, ia, ta, seq.as_str(), ra)
                }
                IteratorType::AT_TIMESTAMP => {
                    let timestamp = value_t_or_exit!(matches.value_of("timestamp"), f64);
                    KinesisShardIterator::new_with_timestamp(na, ia, ta, timestamp, ra)
                }
            };

            tokio::spawn({
                Interval::new_interval(Duration::from_millis(1000))
                    .map_err(|e| eprintln!("timer failed; err={:?}", e))
                    .zip(it.map_err(|e| eprintln!("subscribe error = err{:?}", e)))
                    .map(|(_, r)| r)
                    .forward(
                        tx.clone()
                            .sink_map_err(|e| eprintln!("send error = err{:?}", e)),
                    )
                    .and_then(|_| Ok(()))
            });
        }

Interval も Stream を実装しています。
zipKinesisShardIterator と並べることで、1 秒ずつ Kinesis の record を確認しに行くようになりました。
0.5 秒とかにすると Rate Limit Exceed とかって出ました。知ってる。

tokio::sync::mpsc の取り扱い

これは動くようになったところを 1 段階目とすると、今は 3 段階目です。 最初は 1 record 毎に Sender を複製して send を呼んでいました。だせーと思いながら動かすために目をつむってかいたのが、1 段階目。
2 段階目は try_send&mut self なのに気づいて、毎度複製するよかましかーと書き換えたやつ。
3 段階目が、send_all とか forward をみて、mut なしでいけるのではと気付いて書き換えた今のやつです。

課題

Verbose mode での shard-id 表示

ほしくない?

ローカルでのテスト

いい加減 localstack つかったテストかけるようにしよう。これもこれで大変です。

しめ

適用できたぜい。なんちゃってでも Future と Stream、Tokio が扱えたのはでかいです。
もうちょっと待てば async/await を含んだ release がでます。けどいけそうっておもって書かずにはってかんじでした。 出たら書き直します。