AWS Kinesis Stream Subscriber USAGE: k-iter [FLAGS] [OPTIONS] --region <NAME> --stream-name <NAME> FLAGS: -h, --help Prints help information -V, --version Prints version information --verbose Enable verbose mode. OPTIONS: --data-format <TYPE> Set data output-format. [default: UTF8_STRING] [possible values: RAW_BYTES, RAW_STRING, UTF8_STRING] -t, --iterator-type <TYPE> Sets iterator type. [default: LATEST] [possible values: LATEST, AT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER, AT_TIMESTAMP, TRIM_HORIZON] -r, --region <NAME> Sets a region name. [possible values: ap-northeast-1, ap-northeast-2, ap-south-1, ap- southeast-1, ap-southeast-2, ca-central-1, eu-central-1, eu-west-1, eu-west-2, eu- west-3, sa-east-1, us-east-1, us-east-2, us-west-1, us-west-2, us-gov-west-1, cn- north-1, cn-northwest-1] --sequence-number <NUM> Set Sequence number when Iterator Type is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. -s, --shard-id <ID1 ID2>... Set shard ids. If you don't set, iterate all shards -n, --stream-name <NAME> Sets a stream name. --timestamp <TIMESTAMP> Set timestamp(UNIX Epoch milliseconds) when Iterator Type is AT_TIMESTAMP.
-s, --shard-id
... Set shard ids. If you don't set, iterate all shards
でけたー。できましたよ。細々とやっとりました。
何も指定しなければ全 shard をにらみにいきますし、当然指定もできます。
複数の shard id も指定できます。あ、sequence-number も複数の方がいいかな? 確認大変そう。また今度にしよう。
つまったところ
futures::stream::Stream の実装
impl Stream for KinesisShardIterator { type Item = GetRecordsOutput; type Error = Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { if let Some(current) = &self.token { let r = GetRecordsInput { shard_iterator: current.clone(), ..Default::default() }; self.client .get_records(r) .map(|r| { self.token = r.next_shard_iterator.clone(); Async::Ready(Some(r)) }) .map_err(Into::into) .wait() } else { self.get_iterator_token() .map(|next| { self.token = Some(next); Async::NotReady }) .map_err(Into::into) } } }
このあたり ですな。
wait
じゃあなくて poll
つかって動かなかったり、map_or_else
つかって型が合わなかったり、Iterator だけでいいんじゃあ…とあきらめかけたり、いろいろやってました。
というか、wait
まわりはまだちゃんとわかってないです。動いたーで喜んで今書いてます。
Stream でかつ、Async::Ready(None)
を返さなければひたすら動き続ける(という理解な)ので、呼び出し先で こう 書けます。
let it = match iter_type { IteratorType::LATEST | IteratorType::TRIM_HORIZON => { KinesisShardIterator::new(na, ia, ta, ra) } IteratorType::AT_SEQUENCE_NUMBER | IteratorType::AFTER_SEQUENCE_NUMBER => { let seq = value_t_or_exit!(matches.value_of("sequence-number"), String); KinesisShardIterator::new_with_sequence_number(na, ia, ta, seq.as_str(), ra) } IteratorType::AT_TIMESTAMP => { let timestamp = value_t_or_exit!(matches.value_of("timestamp"), f64); KinesisShardIterator::new_with_timestamp(na, ia, ta, timestamp, ra) } }; tokio::spawn({ Interval::new_interval(Duration::from_millis(1000)) .map_err(|e| eprintln!("timer failed; err={:?}", e)) .zip(it.map_err(|e| eprintln!("subscribe error = err{:?}", e))) .map(|(_, r)| r) .forward( tx.clone() .sink_map_err(|e| eprintln!("send error = err{:?}", e)), ) .and_then(|_| Ok(())) }); }
Interval も Stream を実装しています。
zip
で KinesisShardIterator
と並べることで、1 秒ずつ Kinesis の record を確認しに行くようになりました。
0.5 秒とかにすると Rate Limit Exceed とかって出ました。知ってる。
tokio::sync::mpsc の取り扱い
これは動くようになったところを 1 段階目とすると、今は 3 段階目です。
最初は 1 record 毎に Sender を複製して send を呼んでいました。だせーと思いながら動かすために目をつむってかいたのが、1 段階目。
2 段階目は try_send が &mut self
なのに気づいて、毎度複製するよかましかーと書き換えたやつ。
3 段階目が、send_all とか forward をみて、mut なしでいけるのではと気付いて書き換えた今のやつです。
課題
Verbose mode での shard-id 表示
ほしくない?
ローカルでのテスト
いい加減 localstack つかったテストかけるようにしよう。これもこれで大変です。
しめ
あー、やっと一山超えれた気がする。もうちょっと。ツールの方に適用していって。
— rohki (@r_ohki) 2019年5月30日
適用できたぜい。なんちゃってでも Future と Stream、Tokio が扱えたのはでかいです。
もうちょっと待てば async/await を含んだ release がでます。けどいけそうっておもって書かずにはってかんじでした。
出たら書き直します。