github.com
AWS Kinesis Stream Subscriber
USAGE:
k-iter [FLAGS] [OPTIONS] --region <NAME> --stream-name <NAME>
FLAGS:
-h, --help Prints help information
-V, --version Prints version information
--verbose Enable verbose mode.
OPTIONS:
--data-format <TYPE> Set data output-format. [default: UTF8_STRING] [possible values: RAW_BYTES,
RAW_STRING, UTF8_STRING]
-t, --iterator-type <TYPE> Sets iterator type. [default: LATEST] [possible values: LATEST, AT_SEQUENCE_NUMBER,
AFTER_SEQUENCE_NUMBER, AT_TIMESTAMP, TRIM_HORIZON]
-r, --region <NAME> Sets a region name. [possible values: ap-northeast-1, ap-northeast-2, ap-south-1, ap-
southeast-1, ap-southeast-2, ca-central-1, eu-central-1, eu-west-1, eu-west-2, eu-
west-3, sa-east-1, us-east-1, us-east-2, us-west-1, us-west-2, us-gov-west-1, cn-
north-1, cn-northwest-1]
--sequence-number <NUM> Set Sequence number when Iterator Type is AT_SEQUENCE_NUMBER or
AFTER_SEQUENCE_NUMBER.
-s, --shard-id <ID1 ID2>... Set shard ids. If you don't set, iterate all shards
-n, --stream-name <NAME> Sets a stream name.
--timestamp <TIMESTAMP> Set timestamp(UNIX Epoch milliseconds) when Iterator Type is AT_TIMESTAMP.
-s, --shard-id ... Set shard ids. If you don't set, iterate all shards
でけたー。できましたよ。細々とやっとりました。
何も指定しなければ全 shard をにらみにいきますし、当然指定もできます。
複数の shard id も指定できます。あ、sequence-number も複数の方がいいかな? 確認大変そう。また今度にしよう。
つまったところ
impl Stream for KinesisShardIterator {
type Item = GetRecordsOutput;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(current) = &self.token {
let r = GetRecordsInput {
shard_iterator: current.clone(),
..Default::default()
};
self.client
.get_records(r)
.map(|r| {
self.token = r.next_shard_iterator.clone();
Async::Ready(Some(r))
})
.map_err(Into::into)
.wait()
} else {
self.get_iterator_token()
.map(|next| {
self.token = Some(next);
Async::NotReady
})
.map_err(Into::into)
}
}
}
このあたり ですな。
wait
じゃあなくて poll
つかって動かなかったり、map_or_else
つかって型が合わなかったり、Iterator だけでいいんじゃあ…とあきらめかけたり、いろいろやってました。
というか、wait
まわりはまだちゃんとわかってないです。動いたーで喜んで今書いてます。
Stream でかつ、Async::Ready(None)
を返さなければひたすら動き続ける(という理解な)ので、呼び出し先で こう 書けます。
let it = match iter_type {
IteratorType::LATEST | IteratorType::TRIM_HORIZON => {
KinesisShardIterator::new(na, ia, ta, ra)
}
IteratorType::AT_SEQUENCE_NUMBER | IteratorType::AFTER_SEQUENCE_NUMBER => {
let seq = value_t_or_exit!(matches.value_of("sequence-number"), String);
KinesisShardIterator::new_with_sequence_number(na, ia, ta, seq.as_str(), ra)
}
IteratorType::AT_TIMESTAMP => {
let timestamp = value_t_or_exit!(matches.value_of("timestamp"), f64);
KinesisShardIterator::new_with_timestamp(na, ia, ta, timestamp, ra)
}
};
tokio::spawn({
Interval::new_interval(Duration::from_millis(1000))
.map_err(|e| eprintln!("timer failed; err={:?}", e))
.zip(it.map_err(|e| eprintln!("subscribe error = err{:?}", e)))
.map(|(_, r)| r)
.forward(
tx.clone()
.sink_map_err(|e| eprintln!("send error = err{:?}", e)),
)
.and_then(|_| Ok(()))
});
}
Interval も Stream を実装しています。
zip
で KinesisShardIterator
と並べることで、1 秒ずつ Kinesis の record を確認しに行くようになりました。
0.5 秒とかにすると Rate Limit Exceed とかって出ました。知ってる。
これは動くようになったところを 1 段階目とすると、今は 3 段階目です。
最初は 1 record 毎に Sender を複製して send を呼んでいました。だせーと思いながら動かすために目をつむってかいたのが、1 段階目。
2 段階目は try_send が &mut self
なのに気づいて、毎度複製するよかましかーと書き換えたやつ。
3 段階目が、send_all とか forward をみて、mut なしでいけるのではと気付いて書き換えた今のやつです。
課題
Verbose mode での shard-id 表示
ほしくない?
ローカルでのテスト
いい加減 localstack つかったテストかけるようにしよう。これもこれで大変です。
しめ
適用できたぜい。なんちゃってでも Future と Stream、Tokio が扱えたのはでかいです。
もうちょっと待てば async/await を含んだ release がでます。けどいけそうっておもって書かずにはってかんじでした。
出たら書き直します。