今回はメッセージングシステムの一つであるNATSを試してみました。
NATSとは
分散システムにおけるアプリケーションとサービス間のコミュニケーションは複雑で理解が難しいものとなります。現代のメッセージングシステムは複数のコミュニケーションパターンをサポートする必要があり、それに加えてセキュリティ・マルチテナンシー・拡張性など多様な要素も求められています。
NATSはこれらの要求を満たすために開発されたソフトウェアで、以下のような特徴を備えています。なおNATSはNeural Automatic Transport System
の略称です。
複数のメッセージングパターンのサポート
NATSはSubjectをベースとしたメッセージングシステムで、PublisherがSubjectに対してメッセージを送信し、Subjectと紐づくSubscriberへメッセージを転送します。Subjectは、PublisherとSubscriberが互いを見つけるために利用する名前を形成する「文字列」です。Subjectは階層構造やワイルドカードなどを利用して表現でき、複数のSubscriberへの送信や、特定のパターンに合致したもののみにメッセージを送信するといったコントロールも可能になります。
※ワイルドカードを利用したSubject(画像は公式ドキュメントより)
※特定のSubscriberにのみ送信されるパターン(画像は公式ドキュメントより)
NATSはPublish-Subscribe / Request-Reply / Queueという3つのメッセージングパターンに対応しています。
Publish-Subscribeの場合、NATSは「1対多」のコミュニケーションとして扱い、Publisherから送られてきたメッセージはSubjectが合致したアクティブなSubscriberへと送られます。
※画像:公式ドキュメントより
Request-Replyの場合、クライアントがReply Subject付きのRequestを送信すると、Responderがそれを捉えてReply Subject宛にResponseを返します。また複数のResponderを設定することも可能で、それらを使って動的なQueue Groupを形成し、スケールアップすることもできます。
※画像:公式ドキュメントより
Queueの場合、NATSは分散Queueと呼ばれる負荷分散機能を提供します。同一のQueue Nameを与えられたSubscriberはQueue Groupのメンバーとなり、Publisherから送られるメッセージがグループ内のメンバーに対して均一に送信されます。この設定はPublisher/Subscriber側で行い、Serverでは行いません。
※画像:公式ドキュメントより
At Most Once / At Least Onceのサポート
NATSはCore NATS (NATS Server、NATSとも呼ばれる)とNATS Streamingとで構成されます。Core NATSはクライアント間のメッセージをルーティングします。クライアントとの接続は、クライアントライブラリによって確立されるTCP接続を介して行われます。
※画像:GitHubより
NATS StreamingはCore NATSのクライアントとして位置しています。NATS Streamingを利用するにはCore NATSが必要で、クライアントからの通信はCore NATSを介してNATS Streamingへ送られます。
※画像:GitHubより
Core NATSはAT Most Once
と呼ばれるデリバリー戦略を採用しており、これは最高でも1回しか配信を行わず、再送は行わないことを意味します。これはメッセージの欠損する可能性を含んでおり、アプリケーションによってはこの戦略は適さない場合があります。
その一方でNATS StreamingではAt Least Once
、つまり最低でも1回は配信されるよう再送を行います。こちらを採用することでSubscriberへの配信が保証できる一方、メッセージの重複が発生する可能性もあります。
NATS StreamingはメッセージをディスクやSQLデータベースに書き込むことでAt Least Once
を実現しており、Kubernetesへデプロイする際はPersistentVolume
PersistentVolumeClaim
を利用しています。
※参考ドキュメント:
高可用性・拡張性
NATSは可用性と拡張性を実現するため、Server Clusteringをサポートしています。NATS ServerはClusterを形成するすべてのNATS Serverと接続し、フルメッシュを形成します。NATSが利用するクラスタリングプロトコルでは、NATS Serverにクラスターメンバーを伝え、全てのNATS Serverはクラスター内の他のメンバーをDiscoverすることができます。またクライアントがNATS Serverに接続すると、クラスターに関する情報が通知されます。これにより、クラスターの構成が動的に変更したり、自己修復することを可能にします。
※画像:GitHubより
またクライアントライブラリでは、接続・Subscriptionを削除する際にDrainを行う機能が用意されています。これにより、実行中、あるいはキャッシュされたメッセージを処理してから、接続・Subscriptionの停止をすることが可能です。この機能を利用することで、メッセージが失われることなくScale Dwonを実行することも可能になります。
※参考ドキュメント:
その他
軽量・高パフォーマンス: NATSは軽量・高パフォーマンスを謳うソフトウェアです。2014年に実施したパフォーマンス比較記事を見てみると、他のメッセージング基盤ソフトウェアと比較しても、高いパフォーマンスを示すことが確認できます。
複数の開発言語をクライアントとしてサポート: NATSはGo / Java / Node.js / Pythonなど複数の開発言語をクライアントとしてサポートしています。
CNCFプロジェクト: NATSはCNCFのStreaming & Messagingカテゴリーに分類されています。プロジェクトの成熟度は2020年12月時点で
Incubating
となっています。
※参考ドキュメント:
NATSを使ってみる
ここから実際にNATSを動かしてみます。今回はKubernetesへNATS Serverをデプロイした後、Tutorialとして紹介されているこちらのページの内容をなぞっていきます。
検証環境
- Kubernetesマネージドサービス: Amazon EKS (v1.18)
- 構築方法: eksctlによる構築
- ローカル環境: WSL (Ubuntu 18.04.4)
NATSのインストール
まずはNATSをインストールします。NATSはKubernetesへのインストールで利用するためのリポジトリを用意しており、こちらに置いてあるマニフェストファイルを利用します。
NATS、NATS Streaming用のマニフェストファイルは、それぞれ以下の通りです。今回はNATS、NATS Streamingを一つずつ作成する最小構成のパターンで行いました。
single-server-nats.yaml
--- apiVersion: v1 kind: ConfigMap metadata: name: nats-config data: nats.conf: | pid_file: "/var/run/nats/nats.pid" http: 8222 --- apiVersion: v1 kind: Service metadata: name: nats labels: app: nats spec: selector: app: nats clusterIP: None ports: - name: client port: 4222 - name: cluster port: 6222 - name: monitor port: 8222 - name: metrics port: 7777 - name: leafnodes port: 7422 - name: gateways port: 7522 --- apiVersion: apps/v1 kind: StatefulSet metadata: name: nats labels: app: nats spec: selector: matchLabels: app: nats replicas: 1 serviceName: "nats" template: metadata: labels: app: nats spec: # Common volumes for the containers volumes: - name: config-volume configMap: name: nats-config - name: pid emptyDir: {} # Required to be able to HUP signal and apply config reload # to the server without restarting the pod. shareProcessNamespace: true ################# # # # NATS Server # # # ################# terminationGracePeriodSeconds: 60 containers: - name: nats image: nats:2.1.7-alpine3.11 ports: - containerPort: 4222 name: client hostPort: 4222 - containerPort: 7422 name: leafnodes hostPort: 7422 - containerPort: 6222 name: cluster - containerPort: 8222 name: monitor - containerPort: 7777 name: metrics command: - "nats-server" - "--config" - "/etc/nats-config/nats.conf" # Required to be able to define an environment variable # that refers to other environment variables. This env var # is later used as part of the configuration file. env: - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace - name: CLUSTER_ADVERTISE value: $(POD_NAME).nats.$(POD_NAMESPACE).svc volumeMounts: - name: config-volume mountPath: /etc/nats-config - name: pid mountPath: /var/run/nats # Liveness/Readiness probes against the monitoring # livenessProbe: httpGet: path: / port: 8222 initialDelaySeconds: 10 timeoutSeconds: 5 readinessProbe: httpGet: path: / port: 8222 initialDelaySeconds: 10 timeoutSeconds: 5 # Gracefully stop NATS Server on pod deletion or image upgrade. # lifecycle: preStop: exec: # Using the alpine based NATS image, we add an extra sleep that is # the same amount as the terminationGracePeriodSeconds to allow # the NATS Server to gracefully terminate the client connections. # command: ["/bin/sh", "-c", "/nats-server -sl=ldm=/var/run/nats/nats.pid && /bin/sleep 60"]
single-server-stan.yaml
--- apiVersion: v1 kind: ConfigMap metadata: name: stan-config data: stan.conf: | port: 4222 http: 8222 streaming { ns: "nats://nats:4222" id: stan store: file dir: /data/stan/store } --- apiVersion: v1 kind: Service metadata: name: stan labels: app: stan spec: selector: app: stan clusterIP: None ports: - name: metrics port: 7777 --- apiVersion: apps/v1 kind: StatefulSet metadata: name: stan labels: app: stan spec: selector: matchLabels: app: stan serviceName: stan replicas: 1 volumeClaimTemplates: - metadata: name: stan-sts-vol spec: accessModes: - ReadWriteOnce volumeMode: "Filesystem" resources: requests: storage: 1Gi template: metadata: labels: app: stan spec: # Prevent NATS Streaming pods running in same host. affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - topologyKey: "kubernetes.io/hostname" labelSelector: matchExpressions: - key: app operator: In values: - stan # STAN Server containers: - name: stan image: nats-streaming:0.16.2 ports: - containerPort: 8222 name: monitor - containerPort: 7777 name: metrics args: - "-sc" - "/etc/stan-config/stan.conf" # Required to be able to define an environment variable # that refers to other environment variables. This env var # is later used as part of the configuration file. env: - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace volumeMounts: - name: config-volume mountPath: /etc/stan-config - name: stan-sts-vol mountPath: /data/stan # Disable CPU limits. resources: requests: cpu: 0 livenessProbe: httpGet: path: / port: 8222 initialDelaySeconds: 10 timeoutSeconds: 5 volumes: - name: config-volume configMap: name: stan-config
上記マニフェストファイルを利用してインストールを行います。
# GitHubリポジトリのクローン $ git clone https://github.com/nats-io/k8s $ cd k8s/ # NATSのインストール $ kubectl apply -f nats-server/single-server-nats.yml configmap/nats-config created service/nats created statefulset.apps/nats created # NATS Streamingのインストール $ kubectl apply -f nats-streaming-server/single-server-stan.yml configmap/stan-config created service/stan created statefulset.apps/stan created # インストール後の確認 $ kubectl get cm NAME DATA AGE nats-config 1 57s stan-config 1 26s $ kubectl get svc NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kubernetes ClusterIP 10.100.0.1 <none> 443/TCP 2d nats ClusterIP None <none> 4222/TCP,6222/TCP,8222/TCP,7777/TCP,7422/TCP,7522/TCP 74s stan ClusterIP None <none> 7777/TCP 43s # NATS Streamingが利用するPV/PVC $ kubectl get pvc NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE stan-sts-vol-stan-0 Bound pvc-37cf09c9-b32b-4573-bb3b-9a32d6bfff7d 1Gi RWO gp2 20h $ kubectl get pv NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE pvc-37cf09c9-b32b-4573-bb3b-9a32d6bfff7d 1Gi RWO Delete Bound default/stan-sts-vol-stan-0 gp2 20h $ kubectl get sc NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE gp2 (default) kubernetes.io/aws-ebs Delete WaitForFirstConsumer false 2d22h $ kubectl get statefulsets NAME READY AGE nats 1/1 94s stan 1/1 64s $ kubectl get pods NAME READY STATUS RESTARTS AGE nats-0 1/1 Running 0 101s stan-0 1/1 Running 0 71s # NATSのログは以下の通り $ kubectl logs nats-0 [7] 2020/12/28 04:50:09.349020 [INF] Starting nats-server version 2.1.7 [7] 2020/12/28 04:50:09.349061 [INF] Git commit [bf0930e] [7] 2020/12/28 04:50:09.349569 [INF] Starting http monitor on 0.0.0.0:8222 [7] 2020/12/28 04:50:09.349616 [INF] Listening for client connections on 0.0.0.0:4222 [7] 2020/12/28 04:50:09.349625 [INF] Server id is NBBQ76INWDZXMFBGF5V4MBXCV3PBEQ5EWNFPPKS3FSTNPF3VFCR3RY2K [7] 2020/12/28 04:50:09.349629 [INF] Server is ready # NATS Streamingのログは以下の通り $ kubectl logs stan-0 [1] 2020/12/28 04:51:02.605219 [INF] STREAM: Starting nats-streaming-server[stan] version 0.16.2 [1] 2020/12/28 04:51:02.605250 [INF] STREAM: ServerID: TgyiHElDc171yiUZ30g5pL [1] 2020/12/28 04:51:02.605254 [INF] STREAM: Go version: go1.11.13 [1] 2020/12/28 04:51:02.605258 [INF] STREAM: Git commit: [910d6e1] [1] 2020/12/28 04:51:02.631773 [INF] STREAM: Recovering the state... [1] 2020/12/28 04:51:02.631932 [INF] STREAM: No recovered state [1] 2020/12/28 04:51:02.883723 [INF] STREAM: Message store is FILE [1] 2020/12/28 04:51:02.883738 [INF] STREAM: Store location: /data/stan/store [1] 2020/12/28 04:51:02.883770 [INF] STREAM: ---------- Store Limits ---------- [1] 2020/12/28 04:51:02.883774 [INF] STREAM: Channels: 100 * [1] 2020/12/28 04:51:02.883778 [INF] STREAM: --------- Channels Limits -------- [1] 2020/12/28 04:51:02.883781 [INF] STREAM: Subscriptions: 1000 * [1] 2020/12/28 04:51:02.883785 [INF] STREAM: Messages : 1000000 * [1] 2020/12/28 04:51:02.883788 [INF] STREAM: Bytes : 976.56 MB * [1] 2020/12/28 04:51:02.883791 [INF] STREAM: Age : unlimited * [1] 2020/12/28 04:51:02.883795 [INF] STREAM: Inactivity : unlimited * [1] 2020/12/28 04:51:02.883798 [INF] STREAM: ---------------------------------- [1] 2020/12/28 04:51:02.883802 [INF] STREAM: Streaming Server is ready
インストールが完了したので、簡単な動作確認を行います。
NATSはnats-box
というコンテナイメージを用意しており、これを利用することでNATSの各コマンドを実行することができます。
# nats-boxコンテナの起動 $ kubectl run -i --rm --tty nats-box --image=synadia/nats-box --restart=Never If you dont see a command prompt, try pressing enter. nats-box:~# # バックグラウンドでSubscriberを起動 nats-box:~# nats-sub -s nats hello & nats-box:~# Listening on [hello] # Publisherからメッセージを送信 nats-box:~# nats-pub -s nats hello world [#1] Received on [hello]: 'world' # PublisherからNATS Streamingへメッセージ送信 nats-box:~# stan-pub -s nats -c stan hello world Published [hello] : 'world' # NATS Streamingからも送られたメッセージが確認できる nats-box:~# stan-sub -s nats -c stan hello Connected to nats clusterID: [stan] clientID: [stan-sub] Listening on [hello], clientID=[stan-sub], qgroup=[] durable=[] [#1] Received: sequence:1 subject:"hello" data:"world" timestamp:1609131378713322197 # NATS Streaming Podでは以下のようなログが確認できる $ kubectl logs stan-0 -f [1] 2020/12/28 04:56:18.713310 [INF] STREAM: Channel "hello" has been created
NATSはその他にも複数のツールを提供しており、例えばnats-top
コマンドを実行するとNATS Serverのモニタリングを行うことができます。
nats-box:~# nats-top -s nats NATS server version 2.1.7 (uptime: 8m24s) Server: Load: CPU: 0.0% Memory: 9.9M Slow Consumers: 0 In: Msgs: 26 Bytes: 1022 Msgs/Sec: 0.0 Bytes/Sec: 0 Out: Msgs: 25 Bytes: 985 Msgs/Sec: 0.0 Bytes/Sec: 0 Connections Polled: 3 HOST CID NAME SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM LANG VERSION UPTIME LAST ACTIVITY 192.168.1.45:56406 1 _NSS-stan-send 0 0 0 8 0 159 go 1.8.1 7m57s 2020-12-28 08:45:11.674703256 +0000 U 192.168.1.45:56408 2 _NSS-stan-general 8 0 8 5 361 461 go 1.8.1 7m57s 2020-12-28 08:45:11.677642417 +0000 U 192.168.1.45:56410 3 _NSS-stan-acks 0 0 4 0 36 0 go 1.8.1 7m57s 2020-12-28 08:45:11.673086331 +0000 U
またNATSはモニタリング機能を提供しており、以下のようにPort: 8222
からアクセスすることで、各種設定やメッセージ受信数などを確認することができます。
$ kubectl get svc NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kubernetes ClusterIP 10.100.0.1 <none> 443/TCP 2d20h nats ClusterIP None <none> 4222/TCP,6222/TCP,8222/TCP,7777/TCP,7422/TCP,7522/TCP 44s stan ClusterIP None <none> 7777/TCP 27s # ポートフォワーディング $ kubectl port-forward svc/nats 8222:8222 & [1] 10071 Forwarding from [::1]:8222 -> 8222
例えばlocalhost:8222/subsz
にアクセスすると、Subscriptionの数などが表示されます。
NATSを利用する
NATS/NATS Streamingの起動確認ができたので、クライアントライブラリからNATS Serverを利用してみます。今回はnats.goというGoのライブラリに含まれているサンプルプログラムを実行していきます。
まずはnat.go
パッケージを入手します。Goのバージョンが古いと取得できないことがあるので、必要があればバージョンアップを実施します。
$ go version go version go1.15.6 linux/amd64 $ go get github.com/nats-io/nats.go/
また以降の操作ではlocalhostからNATS Serverにアクセスするため、kubectl port-forward
コマンドによりNATSへアクセスできるようにします。
$ kubectl port-forward svc/nats 4222:4222 & [2] 10088 $ Forwarding from 127.0.0.1:4222 -> 4222 Forwarding from [::1]:4222 -> 4222
NATS Pub/Sub
まずはPub/Subパターンを実行します。ここではPublisher1つ、Subscriber 3つを起動し、それぞれにメッセージが配信される様子を確認します。
まずは1つ目のSubscriberを起動します。ここではmsg.test
というSubjectを指定します。
# Subscriber-1 $ cd $GOPATH/src/src/github.com/nats-io/nats.go/examples $ go run nats-sub/main.go msg.test Listening on [msg.test]
次に別のターミナルを用意し、Publisherを起動、メッセージを送信します。
# Publisher # “hello”メッセージを送信 $ cd $GOPATH/src/src/github.com/nats-io/nats.go/examples $ go run nats-pub/main.go msg.test hello # “NATS MESSAGE”メッセージを送信 $ go run nats-pub/main.go msg.test "NATS MESSAGE"
Publisherからメッセージを送信すると、Publisher側では送信したことを示すメッセージが表示され、Subscriber側ではメッセージの受け取りが確認できます。
# Publisher Published [msg.test] : 'hello' Published [msg.test] : 'NATS MESSAGE' # Subscriber-1 [#1] Received on [msg.test]: 'hello' [#2] Received on [msg.test]: 'NATS MESSAGE'
次に新しいターミナルを開き、2つ目のSubscrierを起動します。ここでもmsg.test
Subjectを指定します。
# Subscriber-2 $ cd $GOPATH/src/src/github.com/nats-io/nats.go/examples $ go run nats-sub/main.go msg.test Listening on [msg.test]
この状態でPublisherからメッセージを送信すると、2つのSubscriberへメッセージが送信されることが確認できます。
# Publisher $ go run nats-pub/main.go msg.test "NATS MESSAGE 2" Published [msg.test] : 'NATS MESSAGE 2' # Subscriber-1 [#3] Received on [msg.test]: 'NATS MESSAGE 2' # Subscriber-2 [#1] Received on [msg.test]: 'NATS MESSAGE 2'
ここでmsg.test.new
という別のSubjectを指定したSubscriberを起動します。
# Subscriber-3 $ cd $GOPATH/src/src/github.com/nats-io/nats.go/examples $ go run nats-sub/main.go msg.test.new Listening on [msg.test.new]
ここで再びPublisherからメッセージを送信すると、今回はSubscriber-3はメッセージを受け取らない様子が確認できます。これはSubscriberで指定したmsg.test.new
というSubjectが、Publisherがメッセージ送信時に指定したmsg.test
というSubjectと一致しなかったためです。
# Publisher $ go run nats-pub/main.go msg.test "NATS MESSAGE 3" Published [msg.test] : 'NATS MESSAGE 3' # Subscriber-1 [#4] Received on [msg.test]: 'NATS MESSAGE 3' # Subscriber-2 [#2] Received on [msg.test]: 'NATS MESSAGE 3' # Subscriber-3 (no message)
ここでSubscriber-3を一度停止し、起動時に指定するSubjectをmsg.*
に変更します。今度はSubjectにワイルドカードを含み、先ほどPublisherが指定したmsg.test
が条件に一致するため、メッセージを受け取る様子が確認できます。
# Subscriber-3を停止、再起動 $ go run nats-sub/main.go msg.test.new Listening on [msg.test.new] ^Csignal: interrupt $ go run nats-sub/main.go msg.* Listening on [msg.*] # Publisher $ go run nats-pub/main.go msg.test "NATS MESSAGE 4" Published [msg.test] : 'NATS MESSAGE 4' # Subscriber-1 [#5] Received on [msg.test]: 'NATS MESSAGE 4' # Subscriber-2 [#3] Received on [msg.test]: 'NATS MESSAGE 4' # Subscriber-3 [#1] Received on [msg.test]: 'NATS MESSAGE 4'
なお、NATSを起動する際に特定のオプションを指定することで、NATS Server側でログを出力することができます。例えば-V
オプションを追加すると、Subscriber・Publisher間でやり取りが発生した際、以下のようにログが出力されます。
single-server-nats.yaml
command: - "nats-server" - "-V" # 追加 - "--config" - "/etc/nats-config/nats.conf"
$ kubectl logs nats-0 -f # NATS Server起動時のログ [7] 2020/12/29 04:42:11.784867 [INF] Starting nats-server version 2.1.7 [7] 2020/12/29 04:42:11.784895 [INF] Git commit [bf0930e] [7] 2020/12/29 04:42:11.785860 [INF] Starting http monitor on 0.0.0.0:8222 [7] 2020/12/29 04:42:11.785910 [INF] Listening for client connections on 0.0.0.0:4222 [7] 2020/12/29 04:42:11.785919 [INF] Server id is NDK7LIGLOBR5MC5DOJKNVHHTUOBZWIMJRVXBRNNDFNHM5UKUZKJPZ6ST [7] 2020/12/29 04:42:11.785923 [INF] Server is ready # Subscriber-1起動時 [7] 2020/12/29 04:43:35.461255 [TRC] 127.0.0.1:38544 - cid:1 - <<- [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS Sample Subscriber","lang":"go","version":"1.11.0","protocol":1,"echo":true,"headers":false,"no_responders":false}] [7] 2020/12/29 04:43:35.461364 [TRC] 127.0.0.1:38544 - cid:1 - <<- [PING] [7] 2020/12/29 04:43:35.461372 [TRC] 127.0.0.1:38544 - cid:1 - ->> [PONG] [7] 2020/12/29 04:43:35.471896 [TRC] 127.0.0.1:38544 - cid:1 - <<- [SUB msg.test 1] [7] 2020/12/29 04:43:35.471916 [TRC] 127.0.0.1:38544 - cid:1 - <<- [PING] [7] 2020/12/29 04:43:35.471921 [TRC] 127.0.0.1:38544 - cid:1 - ->> [PONG] [7] 2020/12/29 04:43:37.787668 [TRC] 127.0.0.1:38544 - cid:1 - ->> [PING] [7] 2020/12/29 04:43:37.798709 [TRC] 127.0.0.1:38544 - cid:1 - <<- [PONG] # Publisher起動・メッセージ送信時 [7] 2020/12/29 04:44:02.334750 [TRC] 127.0.0.1:38702 - cid:2 - <<- [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS Sample Subscriber","lang":"go","version":"1.11.0","protocol":1,"echo":true,"headers":false,"no_responders":false}] [7] 2020/12/29 04:44:02.334822 [TRC] 127.0.0.1:38702 - cid:2 - <<- [PING] [7] 2020/12/29 04:44:02.334829 [TRC] 127.0.0.1:38702 - cid:2 - ->> [PONG] [7] 2020/12/29 04:44:02.345670 [TRC] 127.0.0.1:38702 - cid:2 - <<- [SUB msg.test 1] [7] 2020/12/29 04:44:02.345694 [TRC] 127.0.0.1:38702 - cid:2 - <<- [PING] [7] 2020/12/29 04:44:02.345700 [TRC] 127.0.0.1:38702 - cid:2 - ->> [PONG] [7] 2020/12/29 04:44:04.424048 [TRC] 127.0.0.1:38702 - cid:2 - ->> [PING] [7] 2020/12/29 04:44:04.437254 [TRC] 127.0.0.1:38702 - cid:2 - <<- [PONG]
NATS Request/Reply
次にRequest/Replyパターンを実行します。ここではPublisher/Subscriberを一つずつ起動し、Publisherからメッセージを送信するとResponseが返ってくる様子を確認します。
まずはSubscriberを起動します。ここではhelp.please
というSubjectを設定し、またResponseメッセージも設定します。
# Subscriber $ go run nats-rply/main.go help.please "OK, I CAN HELP!" Listening on [help.please]
次にPublisherを起動し、同一のSubjectとRequestメッセージを指定します。
# Publisher $ go run nats-req/main.go help.please "I need help!"
Publisherがメッセージを送信すると、Subscriberがメッセージを受け取る様子に加え、Publisher側でResponseメッセージが表示されることが確認できます。
# Subscriber [#1] Received on [help.please]: 'I need help!' # Publisher Published [help.please] : 'I need help!' Received [_INBOX.g672l3mO17Q5MuaBrdZalB.roViAMQe] : 'OK, I CAN HELP!' # Response Message
NATS Queueing
最後にQueueingパターンを実行します。ここでは複数のSubscriberを起動してQueue Groupを構成し、Publisherからのメッセージが各Subscriberに分散されて送られる様子を確認します。
まずは2つのSubscriberをQueue Groupを与えて起動します。
# Subscriber-1 (Queue Group) $ go run nats-qsub/main.go foo my-queue Listening on [foo], queue group [my-queue] # Subscriber-2 (Queue Group) $ go run nats-qsub/main.go foo my-queue Listening on [foo], queue group [my-queue]
次にQueue Groupを指定せず、先ほどと同じSubjectを指定したSubscriberを起動します。
# Subscriber-3 (No Queue Group) go run nats-sub/main.go foo Listening on [foo]
この状態でPublisherからメッセージが送られると、Queue Groupに属するSubscriberはメッセージが分散されて送られますが、Queue Groupに属さないSubscriberは全てのメッセージが配信されます。
# Publisher $ go run nats-pub/main.go foo "Hello NATS-1" # 同様のメッセージを計5回送信する $ go run nats-pub/main.go foo "Hello NATS-5" # Subscriber-1 (Queue Group) [#1] Received on [foo] Queue[my-queue] Pid[11326]: 'Hello NATS-3' [#2] Received on [foo] Queue[my-queue] Pid[11326]: 'Hello NATS-4' # Subscriber-2 (Queue Group) [#1] Received on [foo] Queue[my-queue] Pid[11392]: 'Hello NATS-1' [#2] Received on [foo] Queue[my-queue] Pid[11392]: 'Hello NATS-2' [#3] Received on [foo] Queue[my-queue] Pid[11392]: 'Hello NATS-5' # Subscriber-3 (No Queue Group) [#1] Received on [foo]: 'Hello NATS-1' [#2] Received on [foo]: 'Hello NATS-2' [#3] Received on [foo]: 'Hello NATS-3' [#4] Received on [foo]: 'Hello NATS-4' [#5] Received on [foo]: 'Hello NATS-5'