ソモサン

私rohkiによる活動や読書の記録をつらつらと書くページです

Kinesis Stream の中身を追いかけて出力する k-iter に verbose mode を付けた

github.com

0.3.0 でございます。

呼び出し方と形式

--verbose を付ける。以上!

k-iter --verbose -n sample-stream -r ap-northeast-1

形式

JSON で出してます。
既定ではデータを UTF8 の文字列としてだします。

{"ApproximateArrivalTimestamp":1533704447.987,"Data":"155.103.165.228 - - [08/Aug/2018:14:00:47 +09:00] \"DELETE /list\" 200 3568 \"-\" \"Mozilla/5.0 (Windows NT 6.0; Win64; x64; rv:5.8) Gecko/20100101 Firefox/5.8.6\"\n","PartitionKey":"9612982382","SequenceNumber":"49587101216856299235294266286047359784703484607321866242"}

背景

Kinesis Stream の中を追跡する際には、そのデータがいつ投入されたのか、ってのも大事かと思います。
データ生成のタイムスタンプとデータ投入のタイムスタンプを比べたい感じです。
データ生成は自分たちで埋め込んで頑張りますが、データ投入は Kinesis Stream のレコード情報にあるので、そちらを利用したいです。

jq などで整形・加工しましょう、ってスタンスです。これ以上は頑張らないです。

現在のオプション

--help で確認できるオプションは以下のような感じ。
--iterator-type 増やしたり、--data-format 追加したりもしてました。

USAGE:
    k-iter [FLAGS] [OPTIONS] --region <NAME> --stream-name <NAME>

FLAGS:
    -h, --help       Prints help information
    -V, --version    Prints version information
        --verbose    Enable verbose mode.

OPTIONS:
        --data-format <TYPE>       Set data output-format. [default: UTF8_STRING]  [possible values: RAW_BYTES,
                                   RAW_STRING, UTF8_STRING]
    -t, --iterator-type <TYPE>     Sets iterator type. [default: LATEST]  [possible values: LATEST, AT_SEQUENCE_NUMBER,
                                   AFTER_SEQUENCE_NUMBER, AT_TIMESTAMP, TRIM_HORIZON]
    -r, --region <NAME>            Sets a region name. [possible values: ap-northeast-1, ap-northeast-2, ap-south-1, ap
                                   -southeast-1, ap-southeast-2, ca-central-1, eu-central-1, eu-west-1, eu-west-2, eu
                                   -west-3, sa-east-1, us-east-1, us-east-2, us-west-1, us-west-2, us-gov-west-1, cn
                                   -north-1, cn-northwest-1]
        --sequence-number <NUM>    Set Sequence number when Iterator Type is AT_SEQUENCE_NUMBER or
                                   AFTER_SEQUENCE_NUMBER.
    -s, --shard-id <ID>            Set shard id. [default: shardId-000000000000]
    -n, --stream-name <NAME>       Sets a stream name.
        --timestamp <TIMESTAMP>    Set timestamp(UNIX Epoch milliseconds) when Iterator Type is AT_TIMESTAMP.

つづき

終了条件の指定、とかですかねー。ある条件に到達したらプロセスを終了する的な。

  • N レコードとったら
  • ある日付に到達したら
  • あるシーケンスナンバーに追い付いたら
  • 先頭に追い付いたら

できそうなのはこんなもんでしょうか?
あとは複数シャードもかな?

Rust のお試しコードを実行する: cargo run --example

タイトル落ち

その通り。自分メモです。

背景

新しい Web フレームワーク warp が出ました。
それで試してみよう、となった時に "この examples ってどう実行するんだろう" となった感じです。

方法

cargo run --example hello

先に書いた通り、cargo サブコマンドである run にオプションと値を付けることで実行できます。
もし、examples 内でのみのライブラリ依存関係がある場合は、Cargo.tomldev-dependencies に記載すればよい模様。 *1

おわりに

いやー、よく考えてはります。

*1:厳密にいうと、test と benchmark でも使います

OpenCensus について調べて試した

OpenCensus

Google 発案の分散トレース/メトリクス収集の仕様および実装、のはず。
opentracing.io とは似て非なるもの。
分散トレーシングについての仕様がいくつかあるようなのだけれども、部分的なものや似たようなものがあるので、それを整理して一括りにしてみた、というイメージです。

組み込む方のインターフェースやスキーマ とか、ライブラリ構成 もあれば、伝播のための仕様もあります。

んで、その Scala Wrapper があるじゃあないですか、ということで試しました。

Github グループ

GitHub グループが 2 つあったんですねー、気が付かなかった。
1 つ目が OpenCensus のメイングループで、先ほど挙げた仕様もこのグループに所属してます。
2 つ目がメインを補完するグループで、今回試した opencensus-scala もこちらの所属です。

opencensus-scala

メイングループに所属してる opencensus-java の軽量 Wrapper とのこと。
と、いいつつ、akka-httphttp4selastic4s に対応してくれてます。Pray framework は 計画済み とのこと。

elastic4s でためす

HttpClientExampleApp.scala をちょっとばかし改造して、以下のようにしました。

package com.sksamuel.elastic4s.samples

import com.sksamuel.elastic4s.{ElasticsearchClientUri, RefreshPolicy}
import com.sksamuel.elastic4s.http.HttpClient
import com.sksamuel.elastic4s.http.search.SearchResponse
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.github.sebruck.opencensus.elastic4s.implicits._ // ここ

import scala.concurrent.ExecutionContext.Implicits.global


object HttpClientExampleApp extends App {

  // you must import the DSL to use the syntax helpers

  val client = HttpClient(ElasticsearchClientUri("localhost", 9200)).traced // ここ

  client.execute {
    bulk(
      indexInto("myindex" / "mytype").fields("country" -> "Mongolia", "capital" -> "Ulaanbaatar"),
      indexInto("myindex" / "mytype").fields("country" -> "Namibia", "capital" -> "Windhoek")
    ).refresh(RefreshPolicy.WAIT_UNTIL)
  }.await


  def result: SearchResponse = client.execute {
    search("myindex").matchQuery("capital", "ulaanbaatar")
  }.await.right.get.result

  // prints out the original json
  println(Iterator.continually(result).take(1000).toIterable.last.hits.hits.head.sourceAsString)

  Thread.sleep(1000)

  client.close()

}

あと設定ファイルも作りました。

opencensus-scala {
  trace {
    // The probability of which a trace gets sampled, the default is 1/10000
    sampling-probability = 1.0,

    exporters {
      zipkin {
        // Wether the Zipkin exporter should be enabled
        enabled = true

        // Example http://127.0.0.1:9411/api/v2/spans
        v-2-url =  "http://127.0.0.1:9411/api/v2/spans"

        // the local service name of the process
        service-name = "test"
      }

    }
  }
}

今回はローカルに zipkin を作って、そこに投げてます。
これは openzipkin/docker-zipkin: Docker images for OpenZipkin で作りました。
その結果

こんな感じでトレースできるようになりました。やったぜ。

とはいうものの

真価は複数段になってからなので、今回は「こんにちわ世界」と言った程度です。
どう適用させるかとか、お金のかかり具合とか。むずかしい。

OpenAPI Generator で Gatling Client を生成してみた

OpenAPI Generator 3.0.0 リリース!!

やったぜ。2.0 はとか野暮なことはなしです。*1
ということでリリースノートを見ていると、New Generators ところに Gatling の文字があるではないですか。
試すしかない、ってことでやってみました。

やってみた

コマンド

 java -jar openapi-generator-cli.jar generate -i "/path/to/input.yaml" -o output -g scala-gatling

-g scala-gatling で galing 生成を指定しております。

フォルダ構成

├── build.gradle
└── src
    └── gatling
        ├── resources
        │   ├── conf
        │   │   ├── baseline.conf
        │   │   ├── CD.conf
        │   │   ├── CI.conf
        │   │   ├── default.conf
        │   │   ├── logback.xml
        │   │   ├── longevity.conf
        │   │   └── stress.conf
        │   └── data
        │       └── null-pathParams.csv
        └── scala
            └── org
                └── openapitools
                    └── client
                        ├── api
                        │   └── DefaultApiSimulation.scala
                        └── model
                            └── Empty.scala

ということで、Gradle のプロジェクトでした。
Android で軽く触ったくらいであんまりですが、まぁなんとかなります。

動かしてみて

食わせた Swagger は認証なしの適当なやつですが、きちんとアクセスしてくれました。
もうちょい確認が必要ですが、初期としては使えそう?
flood でも試してみましょうかねー
企業情報 or アプリ名のところ、test とかっていれてよいのかしら…?

*1:Swagger CodeGen の Pull Request に Gatling があってマージまだかなーと思ったら OpenAPI Generator に分派しておりました。

Rust の CLI ツールで引数の値が特定のものであれば別の引数を必須にしたい

短く

crap.rs の requires_if/requires_ifs を使おう。

        .arg(
            Arg::with_name("iterator-type")
                .short("t")
                .long("iterator-type")
                .possible_values(&IteratorType::variants())
                .requires_ifs(&[
                    ("AT_SEQUENCE_NUMBER", "sequence-number"),
                    ("AFTER_SEQUENCE_NUMBER", "sequence-number"),
                    ("AT_TIMESTAMP", "timestamp"),
                ])
                .default_value("LATEST")
                .value_name("TYPE")
                .help("Sets iterator type."),
        )
        .arg(
            Arg::with_name("sequence-number")
                .long("sequence-number")
                .value_name("NUM")
                .help("Set Sequence number when Iterator Type is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER.")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("timestamp")
                .long("timestamp")
                .value_name("TIMESTAMP")
                .help("Set timestamp(UNIX Epoch milliseconds) when Iterator Type is AT_TIMESTAMP.")
                .takes_value(true),
        )

k-iter/main.rs at f22bd285818a23ec5eb766d53798a584ddaffed2 · ROki1988/k-iter

背景

AWS Kinesis Stream のイテレータは、下記の通り 5 種類あります。

  • LATEST
  • AT_SEQUENCE_NUMBER
  • AFTER_SEQUENCE_NUMBER
  • AT_TIMESTAMP
  • TRIM_HORIZON

このうち 3 種には値が必要なので、追加のオプションで指定してもらいたい感じでした。
なので、そのオプションはイテレータが 3 種のうちのどれかであれば必須、となります。
そんなのかけるのやら…

多機能引数パーザ clap

すごいですねー。ありましたよ。requires_ifs
ある引数に対する追加設定として記述できます。
タプルの1つ目が、その引数がとりうる値で、2つ目が必須となるオプションの名前となります。

            Arg::with_name("iterator-type")
                .short("t")
                .long("iterator-type")
                .possible_values(&IteratorType::variants())
                .requires_ifs(&[
                    ("AT_SEQUENCE_NUMBER", "sequence-number"),
                    ("AFTER_SEQUENCE_NUMBER", "sequence-number"),
                    ("AT_TIMESTAMP", "timestamp"),
                ])

先にあげた例を抜粋しました。
今回であれば、iterator-type に対して、AT_SEQUENCE_NUMBERAFTER_SEQUENCE_NUMBERAT_TIMESTAMP の時にそれぞれ必須となる引数の名前を指定してます。

自分で書いてもよいのですが、あるのであれば使います。
ありがたやありがたや。

というわけで

無事 k-iter でオプションを追加できましたー!やったぜ。
これで追跡がらくになる。はず。

localstack を使ったテストを書きたい。

Rust でクロスプラットフォーム対応するときに便利だった Cargo Plugin: cross

これ

github.com

使い方

cargo install cross
cross build --target i686-unknown-linux-gnu

実際に Travis で動かしている k-iter/.travis.yml at master · ROki1988/k-iter とかを見るとよいかも。

発端

rohki.hatenablog.com

前回 AWS Kinesis Stream をひたすら見るやつを作ったわけですが、バイナリも用意せねば、となりました。
んで、WindowsAppVeyor でサクッとできました。macOS 向けも Travis 上でできました。
でもなぜか、一部の Linux 向けのバイナリができない。 で、調べて cross にいきつきました。

苦悩の跡

f:id:rohki:20180509224755p:plain

めっちゃ頑張った…

Travis CI の設定をパクッt(ry

Travis CI で Linux (x86_64, i686, aarch64) 向け(とついでに macOS 向け)に Rust で書いたツールのバイナリをリリースする - はやくプログラムになりたい
上記のような記事を公開いただいていて、パクれ との記述にありがとうございます!!!と適用してみました。
Build #9 - ROki1988/k-iter - Travis CI がビルド結果になります。
文面に書くと、macOS 向けと x86_64 Linux 向けのビルドに成功し、i686 Linux 向けと AArch64 Linux 向けに失敗しました。なぜだ…

ハッハーン…いつものだな?

OpenSSL を疑いました。
というのも、Rust で OpenSSL を使用する場合、crate が対応しているバージョンがインストールされていないとビルドできなかったり、実行できなかったりするからです。
rust-openssl/build.rs at master · sfackler/rust-openssl を全部見切れていないので事情は定かでないですが、ビルド時にいろいろ見に行っている模様。

ので、

しましたが、ことごとくダメでした。
ただ最後の環境変数指定でもできなかったときのエラーメッセージをボヤっと見てピンときて、クロス環境向けにビルドされた OpenSSL のがいるのか!と仮説を立てました。

とはいうものの

さすがにクロスのバイナリそれぞれ用意して云々はハードル高いぞ…と、なんかあるやろで見つけたのが cross になります。

github.com

上記にて使われているのを発見して、参考にさせていただきました。
結果としてビルドもできて、やったーというところです。
タグ切ったところ、パクってきた拝借してきた設定どおりにきちんと Github に公開されました。

つかってみて

すっごい考えられてるというか、賢くやってはるなーというかんじです。
CLI のコマンドも cargo から cross に変えるだけですし、docker 環境がない場合とある場合も同じように動いてくれますし、CI のキャッシュも効きますし。

AWS Lambda の時も感じましたけど、コンテナホント便利ですね。
あとはテストとかにも使いたいんだよなぁ。こう…#[test] の下とかにアトリビュート書いて実現できると嬉しいかも。

CLI で AWS Kinesis Stream の中身をひたすら追ってくれる k-iter をつくってる

作ったもの

github.com

インストール方法

rustup.rs - The Rust toolchain installer

rustup を設定したうえで、下記コマンドを実行。

cargo install --git https://github.com/ROki1988/k-iter.git

もしくは Releases · ROki1988/k-iter より合致するものをダウンロードして展開。

使い方

k-iter -n event-stream -r ap-northeast-1

n の後にストリーム名を、r の後にリージョン名を入れればガンガン見てくれます。
今のところはコマンド実行以降に Put されたレコードを見ていくだけです。

動機

AWS Kinesis Stream はいったん投入してしまうと、中になにが入ってるか追うためにコードをかく必要があります。コンソールから見れないから。
Lambda の Blueprint に Kinesis のイベントを処理するぜー、てのもあるのですが、CLI に出したい、という欲がでます。
ですので、秘伝のたれのごとき Python コードを実行して CLI 上で出してました。

だがしかし、迂闊に brew upgrade を実行した結果、Python の実行環境が再構築の憂き目にあい、こりゃシングルバイナリで動くやつ作らにゃきつい、となった次第です。

Rust 製なのは趣味です。

やってみたいこと/考えてること/もらえた意見

  • 実行バイナリはよ
  • Verbose モードとかほしい。Put された時間とかパーティションキーとか
    • 綺麗に出さなければできそう。きれいに出そうとすると大変そう。そのあたりのセンス磨いてないし。
      • CSV を整形するコマンドとかあるし、CSV 出力でこと足りる?
  • Iterator Type 対応したい。TRIM_HORIZON とか。
    • オプションに依存関係が出てくるので、ちょいと厄介
      clap-rs の修行が必要そう
  • --exec とか作ってデータに処理かけたい
    • jq かけたりするイメージ。やりたくない?
  • UTF-8 文字列決め打ちで出力してるけど、バイナリ表示の需要もありそう?
    • Big とか Little の切り替えもいるのだろうか。--print-format=string|byte-be|byte-le てな感じ
  • Shard 複数を同時に
    • できっかなぁ…