SOMPO Digital Lab 開発チームブログ

安心・安全・健康に資する開発情報を発信します

opentelemetry-rustを利用してサービス間認証経由でDatadog Agentにトレースを送信する

SOMPO Digital Lab ソフトウェアエンジニアの小林です。

私が参画しているRustのプロジェクトでDatadog Agentのコンテナに対してトレース情報を送信する必要がありました。
今回はその方法についてご紹介させていただきます。

RustでOpenTelemetryを利用する方法については、既にいくつか記事が出ています。 そのため本記事では opentelemetry-datadog を利用して、認証が必要なエージェントにリクエストを送信する部分を中心に紹介します。

アプリケーションの構成について

今回のプロジェクトではクラウド環境としてGoogle Cloud Platformを利用しており、各サービスは主に Cloud Runで起動しています。
Rustのサービスは actix-web を利用して、REST APIを提供するサービスとして起動しています。
システム全体のうち、トレーシングに関係する部分は以下の図のような構成になっています。

  1. RustアプリケーションからDdatadog Agentのコンテナに対して、OpenTelemetryのトレース情報を送信します。
    この時、Google Cloudのサービス間認証で認証処理を行っています。
    また、公式SDKが存在する言語を利用しているサービスでは公式SDKを利用してトレース情報を送信しています。
  2. Datadog Agentは送られてきた情報を、Datadogに送信します。
  3. ログはCloud Loggingに出力し、その後Datadogに送信されます。

※ 補足
2023年末にCloud Runでサイドカーが利用可能になりました。
Datadog Agentは現時点でサイドカーを公式にサポートしていません。(実際には動作するという記事も見かけました。)
今後正式にサイドカーとして利用できるようになった際には、opentelemetry-datadogの基本機能のみでDatadog Agentにトレース情報を送信する構成にできそうです。
 (Rust用のDatadog公式SDKが出てくると更に楽になるかと思います。

利用しているクレートについて

以下のクレートを利用しています。
(本記事で必要なものを抜粋

actix-cors = "0.7.0"
actix-web = "4.6.0"
tokio = { version = "1.37.0", features = ["full"] }
http = "0.2.12"
opentelemetry = {version = "0.22.0"}
opentelemetry-datadog = { version = "0.10.0" }
opentelemetry-http = "0.11.0"
opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] }
tracing = "0.1.40"
tracing-actix-web = { version="0.7.10"}
tracing-opentelemetry = { version = "0.23.0" }
tracing-stackdriver = { version ="0.10.0", features = ["opentelemetry"] }
tracing-subscriber = { version="0.3.18", default-features = false, features = ["env-filter", "alloc", "json", "fmt"] }
redis = { version = "0.25.3", features = ["tokio-native-tls-comp"] }
reqwest = { version = "0.11.27", features = ["json"]}

※ 記事の執筆時点で、httpopentelemetry はより新しいバージョンが公開されています。
しかし、opentelemetry-datadog の最新バージョン(v0.10.0) に合わせるため、上記のバージョンを指定しています。

実装

※簡潔に記載するため、エラー処理は一部割愛しています。

Datadog Agentに送信するためのクライアントの実装

今回はインフラレベルでサービス間認証をする設定となっているため、それに基づいた実装としています。

  • HTTPクライアントについて: 今回は reqwestを利用していますが、isahc等に差し替えることもできます。
  • サービス間認証で利用する IDトークンについて:
    • Rust用の公式SDKは提供されていないため、メタデータサーバーから取得しています。
    • リクエストは数秒おきに行われるため、Redisを用いてキャッシュするようにしました。
      • 起動するコンテナ数が少ない場合は、Mokaなどを利用してインメモリに保存しても良さそうです。
    • 有効期限が1時間であるため、それより短めのTTLを設定し、早めにIDトークンを取得し直すようにしています。
use async_trait::async_trait;
use http::{Request, Response};
use opentelemetry_http::{Bytes, HttpClient, HttpError};
// プロジェクト固有のモジュールは省略

pub struct DatadogHttpClient {
    client: reqwest::Client,
    redis_repository: RedisRepositoryImpl,
    audience: String,
}

impl DatadogHttpClient {
    pub fn new(redis_client: redis::Client, audience: String) -> Self {
        Self {
            client: reqwest::Client::new(),
            redis_repository: RedisRepositoryImpl::new(redis_client),
            audience,
        }
    }

    /// メタデータサーバーからIDトークンを取得する
    async fn get_gcp_id_token(&self) -> String {
        tracing::debug!("Start get_gcp_id_token.");
        let token_opt = match self
            .redis_repository
            .get_string("IDトークンを保存するRedisのキー")
            .await
        {
            Ok(token) => token,
            Err(_) => None,
        };
        if let Some(token) = token_opt {
            tracing::debug!("Get token from redis: {}", token);
            return token;
        }

        let url = format!("{}{}", "メタデータサーバーのURL", self.audience);
        let res = self
            .client
            .get(url)
            .header("Metadata-Flavor", "Google")
            .send()
            .await;
        if res.is_err() {
            return String::from("");
        }

        let token = res.unwrap().text().await.unwrap();
        let _ = self
            .redis_repository
            .set_string_with_expire("IDトークンを保存するRedisのキー", token.clone(), "キーの生存期間(s)")
            .await;
        token
    }
}

/// ここで opentelemetry_httpのHttpClientに対して固有のリクエスト機能を実装します。
#[async_trait]
impl HttpClient for DatadogHttpClient {  
    async fn send(&self, mut request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {

        // IDトークンを取得して、Datadog Agentに送る際のリクエストのヘッダにBearerを設定。
        let token = self.get_gcp_id_token().await;
        let bearer = format!("Bearer {}", token.clone());
        request.headers_mut().insert(
            "Authorization",
            reqwest::header::HeaderValue::from_str(&bearer).unwrap(),
        );

        let request = request.try_into()?;
        let mut response = self.client.execute(request).await?;
        let headers = std::mem::take(response.headers_mut());
        let mut http_response = Response::builder()
            .status(response.status())
            .body(response.bytes().await?)?;
        *http_response.headers_mut() = headers;

        Ok(http_response)
    }
}

OpenTelemetry関連の設定

OpenTelemetryに関する設定は以下のコードで行っています。
opentelemetry_datadogのパイプラインに対して、 with_http_client で自前のHTTPクライアントを渡すことができます。

        opentelemetry::global::set_text_map_propagator(
            opentelemetry_datadog::DatadogPropagator::new(),
        );

        // サービス間認証を実装したDatadog Agent用HTTPクライアントを定義
        let dd_client =
            DatadogHttpClient::new(redis_client.clone(), "Datadog AgentのURL");
        let tracer = opentelemetry_datadog::new_pipeline()
            .with_service_name("サービス名")
            .with_agent_endpoint("Datadog AgentのURL")
            .with_env("Env") // dev / stg / prodなど
            .with_http_client::<DatadogHttpClient>(dd_client) // Datadog Agent用のHTTPクライアントを設定
            .with_trace_config(
                trace::config()
                    .with_sampler(trace::Sampler::AlwaysOn)
                    .with_id_generator(RandomIdGenerator::default()),
            )
            .install_batch(opentelemetry_sdk::runtime::Tokio)
            .expect("failed to initialize tracer");
        let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
        let filter_layer = EnvFilter::try_new(app_config.rust_log).unwrap();

        // tracing_stackdriver::layerを挟むことで、構造化ログとしてCloud Loggingにログを出力できます。
        let subscriber = Registry::default()
            .with(filter_layer)
            .with(otel_layer)
            .with(tracing_stackdriver::layer()); 
        tracing::subscriber::set_global_default(subscriber)
            .expect("Failed to set tracing subscriber");

Spanの設定

ベースは以下のstructになります。
リクエスト開始時・リクエスト終了時にSpanとして定義したい情報に応じて、設定を追加することができます。

use actix_web::Error;

pub struct CustomRootSpanBuilder;

impl tracing_actix_web::RootSpanBuilder for CustomRootSpanBuilder {
    fn on_request_start(request: &actix_web::dev::ServiceRequest) -> tracing::Span {
        tracing_actix_web::root_span!(request)
    }

    fn on_request_end<B: actix_web::body::MessageBody>(
        span: tracing::Span,
        outcome: &Result<actix_web::dev::ServiceResponse<B>, Error>,
    ) {        tracing_actix_web::DefaultRootSpanBuilder::on_request_end(span, outcome);
    }
}

actix-webの起動設定部分で、上記の CustomRootSpanBuilder を差し込んでいます。

HttpServer::new(move || {
        App::new()
            .app_data("アプリケーションに渡すcontext")
            .wrap(tracing_actix_web::TracingLogger::<CustomRootSpanBuilder>::new())
            .wrap("その他のミドルウェアなど")
            .service(
                // サービス定義
            )
    })
    .bind(server_addr)?
    .run()
    .await?;

 // actix-webが終了した後、tracing_providerを停止
    opentelemetry::global::shutdown_tracer_provider(); 

各関数での実装

tracing::instrumentマクロを利用することで、Spanを追加することができます。
これを記載すると、どのタイミングでどの関数が呼ばれたかをトレースすることができます。
skipに変数を指定することで、ログに出力してはいけない情報を除外することができます。

    #[tracing::instrument(skip(self, ctx, claim))]
    pub async fn hoge(
        &self,
        ctx: web::Data<Context>,
        claim: Claim,
    ) -> Result<_, Error> {

ログ出力

今回は tracing-stackdriver クレートを利用しているため、アプリケーション全体で logger ではなく tracing を利用することで構造化ログをCloud Loggingに出力することができます。

tracing::info!()
tracing::debug!()

また、実際にはクラウド環境上で動作する場合のみ tracing-stackdriver を挟むことで、ローカルでは非構造化ログを出力し、動作確認がしやすいよう工夫しています。

動作確認

実際にアプリケーションを動かしてDatadog上のAPMを確認してみました。
下図のように、リクエストの開始〜終了までの間に、どの関数が呼ばれたか確認できる状態となりました。
※ 実際には、各関数の名前や実行時間が表示されますが、今回はminimapを添付しているため表示画像には表示されていません。

Datadog上のトレース(minimap)

まとめ

今回はRustを利用しているプロジェクトで、トレース情報をDatadogに送る方法について記載させていただきました。
OpenTelemetryの仕組みが複雑であり、なかなか事例もなかったので悩みどころが多かったです。
今後はサービスを跨いだトレーシングを進めていきたいと思います。

また、昨年にはAWSから公式に AWS SDK for Rust も公開されるなど、少しずつRsutのエコシステムが充実してきていると感じています。
今後ますますRustでの開発が便利になっていくと思うので、引き続きRustを活用してプロダクトを開発していきます。

参考文献およびリンク

SOMPO Digital Labでは一緒に働くソフトウェアエンジニアを募集しています

SOMPO Digital Labでは、Rustを活用して事業の成長に挑戦するエンジニアを募集しています。

以下のリンクからカジュアル面談の応募ができますので、興味を持った方は是非話を聞きに来て下さい。

www.wantedly.com

www.wantedly.com