ソモサン

私rohkiによる活動や読書の記録をつらつらと書くページです

Kinesis Stream の中身を追いかけて出力する k-iter に verbose mode を付けた

github.com

0.3.0 でございます。

呼び出し方と形式

--verbose を付ける。以上!

k-iter --verbose -n sample-stream -r ap-northeast-1

形式

JSON で出してます。
既定ではデータを UTF8 の文字列としてだします。

{"ApproximateArrivalTimestamp":1533704447.987,"Data":"155.103.165.228 - - [08/Aug/2018:14:00:47 +09:00] \"DELETE /list\" 200 3568 \"-\" \"Mozilla/5.0 (Windows NT 6.0; Win64; x64; rv:5.8) Gecko/20100101 Firefox/5.8.6\"\n","PartitionKey":"9612982382","SequenceNumber":"49587101216856299235294266286047359784703484607321866242"}

背景

Kinesis Stream の中を追跡する際には、そのデータがいつ投入されたのか、ってのも大事かと思います。
データ生成のタイムスタンプとデータ投入のタイムスタンプを比べたい感じです。
データ生成は自分たちで埋め込んで頑張りますが、データ投入は Kinesis Stream のレコード情報にあるので、そちらを利用したいです。

jq などで整形・加工しましょう、ってスタンスです。これ以上は頑張らないです。

現在のオプション

--help で確認できるオプションは以下のような感じ。
--iterator-type 増やしたり、--data-format 追加したりもしてました。

USAGE:
    k-iter [FLAGS] [OPTIONS] --region <NAME> --stream-name <NAME>

FLAGS:
    -h, --help       Prints help information
    -V, --version    Prints version information
        --verbose    Enable verbose mode.

OPTIONS:
        --data-format <TYPE>       Set data output-format. [default: UTF8_STRING]  [possible values: RAW_BYTES,
                                   RAW_STRING, UTF8_STRING]
    -t, --iterator-type <TYPE>     Sets iterator type. [default: LATEST]  [possible values: LATEST, AT_SEQUENCE_NUMBER,
                                   AFTER_SEQUENCE_NUMBER, AT_TIMESTAMP, TRIM_HORIZON]
    -r, --region <NAME>            Sets a region name. [possible values: ap-northeast-1, ap-northeast-2, ap-south-1, ap
                                   -southeast-1, ap-southeast-2, ca-central-1, eu-central-1, eu-west-1, eu-west-2, eu
                                   -west-3, sa-east-1, us-east-1, us-east-2, us-west-1, us-west-2, us-gov-west-1, cn
                                   -north-1, cn-northwest-1]
        --sequence-number <NUM>    Set Sequence number when Iterator Type is AT_SEQUENCE_NUMBER or
                                   AFTER_SEQUENCE_NUMBER.
    -s, --shard-id <ID>            Set shard id. [default: shardId-000000000000]
    -n, --stream-name <NAME>       Sets a stream name.
        --timestamp <TIMESTAMP>    Set timestamp(UNIX Epoch milliseconds) when Iterator Type is AT_TIMESTAMP.

つづき

終了条件の指定、とかですかねー。ある条件に到達したらプロセスを終了する的な。

  • N レコードとったら
  • ある日付に到達したら
  • あるシーケンスナンバーに追い付いたら
  • 先頭に追い付いたら

できそうなのはこんなもんでしょうか?
あとは複数シャードもかな?