TECHSTEP

ITインフラ関連の記事を公開してます。

NATSに入門する

今回はメッセージングシステムの一つであるNATSを試してみました。

NATSとは

分散システムにおけるアプリケーションとサービス間のコミュニケーションは複雑で理解が難しいものとなります。現代のメッセージングシステムは複数のコミュニケーションパターンをサポートする必要があり、それに加えてセキュリティ・マルチテナンシー・拡張性など多様な要素も求められています。

NATSはこれらの要求を満たすために開発されたソフトウェアで、以下のような特徴を備えています。なおNATSはNeural Automatic Transport Systemの略称です。

複数のメッセージングパターンのサポート

NATSはSubjectをベースとしたメッセージングシステムで、PublisherがSubjectに対してメッセージを送信し、Subjectと紐づくSubscriberへメッセージを転送します。Subjectは、PublisherとSubscriberが互いを見つけるために利用する名前を形成する「文字列」です。Subjectは階層構造やワイルドカードなどを利用して表現でき、複数のSubscriberへの送信や、特定のパターンに合致したもののみにメッセージを送信するといったコントロールも可能になります。

ワイルドカードを利用したSubject(画像は公式ドキュメントより)

※特定のSubscriberにのみ送信されるパターン(画像は公式ドキュメントより)

NATSはPublish-Subscribe / Request-Reply / Queueという3つのメッセージングパターンに対応しています。

Publish-Subscribeの場合、NATSは「1対多」のコミュニケーションとして扱い、Publisherから送られてきたメッセージはSubjectが合致したアクティブなSubscriberへと送られます。

※画像:公式ドキュメントより

Request-Replyの場合、クライアントがReply Subject付きのRequestを送信すると、Responderがそれを捉えてReply Subject宛にResponseを返します。また複数のResponderを設定することも可能で、それらを使って動的なQueue Groupを形成し、スケールアップすることもできます。

※画像:公式ドキュメントより

Queueの場合、NATSは分散Queueと呼ばれる負荷分散機能を提供します。同一のQueue Nameを与えられたSubscriberはQueue Groupのメンバーとなり、Publisherから送られるメッセージがグループ内のメンバーに対して均一に送信されます。この設定はPublisher/Subscriber側で行い、Serverでは行いません。

※画像:公式ドキュメントより

At Most Once / At Least Onceのサポート

NATSはCore NATS (NATS Server、NATSとも呼ばれる)とNATS Streamingとで構成されます。Core NATSはクライアント間のメッセージをルーティングします。クライアントとの接続は、クライアントライブラリによって確立されるTCP接続を介して行われます。

※画像:GitHubより

nats-server

NATS StreamingはCore NATSのクライアントとして位置しています。NATS Streamingを利用するにはCore NATSが必要で、クライアントからの通信はCore NATSを介してNATS Streamingへ送られます。

※画像:GitHubより

nats-streaming

Core NATSはAT Most Onceと呼ばれるデリバリー戦略を採用しており、これは最高でも1回しか配信を行わず、再送は行わないことを意味します。これはメッセージの欠損する可能性を含んでおり、アプリケーションによってはこの戦略は適さない場合があります。

その一方でNATS StreamingではAt Least Once、つまり最低でも1回は配信されるよう再送を行います。こちらを採用することでSubscriberへの配信が保証できる一方、メッセージの重複が発生する可能性もあります。

NATS StreamingはメッセージをディスクやSQLデータベースに書き込むことでAt Least Onceを実現しており、Kubernetesへデプロイする際はPersistentVolume PersistentVolumeClaimを利用しています。


※参考ドキュメント:


高可用性・拡張性

NATSは可用性と拡張性を実現するため、Server Clusteringをサポートしています。NATS ServerはClusterを形成するすべてのNATS Serverと接続し、フルメッシュを形成します。NATSが利用するクラスタリングプロトコルでは、NATS Serverにクラスターメンバーを伝え、全てのNATS Serverはクラスター内の他のメンバーをDiscoverすることができます。またクライアントがNATS Serverに接続すると、クラスターに関する情報が通知されます。これにより、クラスターの構成が動的に変更したり、自己修復することを可能にします。

※画像:GitHubより

cluster

またクライアントライブラリでは、接続・Subscriptionを削除する際にDrainを行う機能が用意されています。これにより、実行中、あるいはキャッシュされたメッセージを処理してから、接続・Subscriptionの停止をすることが可能です。この機能を利用することで、メッセージが失われることなくScale Dwonを実行することも可能になります。


※参考ドキュメント:


その他


※参考ドキュメント:


NATSを使ってみる

ここから実際にNATSを動かしてみます。今回はKubernetesへNATS Serverをデプロイした後、Tutorialとして紹介されているこちらのページの内容をなぞっていきます。

検証環境

NATSのインストール

まずはNATSをインストールします。NATSはKubernetesへのインストールで利用するためのリポジトリを用意しており、こちらに置いてあるマニフェストファイルを利用します。

NATS、NATS Streaming用のマニフェストファイルは、それぞれ以下の通りです。今回はNATS、NATS Streamingを一つずつ作成する最小構成のパターンで行いました。

single-server-nats.yaml

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: nats-config
data:
  nats.conf: |
    pid_file: "/var/run/nats/nats.pid"
    http: 8222
---
apiVersion: v1
kind: Service
metadata:
  name: nats
  labels:
    app: nats
spec:
  selector:
    app: nats
  clusterIP: None
  ports:
  - name: client
    port: 4222
  - name: cluster
    port: 6222
  - name: monitor
    port: 8222
  - name: metrics
    port: 7777
  - name: leafnodes
    port: 7422
  - name: gateways
    port: 7522
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: nats
  labels:
    app: nats
spec:
  selector:
    matchLabels:
      app: nats
  replicas: 1
  serviceName: "nats"
  template:
    metadata:
      labels:
        app: nats
    spec:
      # Common volumes for the containers
      volumes:
      - name: config-volume
        configMap:
          name: nats-config
      - name: pid
        emptyDir: {}

      # Required to be able to HUP signal and apply config reload
      # to the server without restarting the pod.
      shareProcessNamespace: true

      #################
      #               #
      #  NATS Server  #
      #               #
      #################
      terminationGracePeriodSeconds: 60
      containers:
      - name: nats
        image: nats:2.1.7-alpine3.11
        ports:
        - containerPort: 4222
          name: client
          hostPort: 4222
        - containerPort: 7422
          name: leafnodes
          hostPort: 7422
        - containerPort: 6222
          name: cluster
        - containerPort: 8222
          name: monitor
        - containerPort: 7777
          name: metrics
        command:
         - "nats-server"
         - "--config"
         - "/etc/nats-config/nats.conf"

        # Required to be able to define an environment variable
        # that refers to other environment variables.  This env var
        # is later used as part of the configuration file.
        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: CLUSTER_ADVERTISE
          value: $(POD_NAME).nats.$(POD_NAMESPACE).svc
        volumeMounts:
          - name: config-volume
            mountPath: /etc/nats-config
          - name: pid
            mountPath: /var/run/nats

        # Liveness/Readiness probes against the monitoring
        #
        livenessProbe:
          httpGet:
            path: /
            port: 8222
          initialDelaySeconds: 10
          timeoutSeconds: 5
        readinessProbe:
          httpGet:
            path: /
            port: 8222
          initialDelaySeconds: 10
          timeoutSeconds: 5

        # Gracefully stop NATS Server on pod deletion or image upgrade.
        #
        lifecycle:
          preStop:
            exec:
              # Using the alpine based NATS image, we add an extra sleep that is
              # the same amount as the terminationGracePeriodSeconds to allow
              # the NATS Server to gracefully terminate the client connections.
              #
              command: ["/bin/sh", "-c", "/nats-server -sl=ldm=/var/run/nats/nats.pid && /bin/sleep 60"]

single-server-stan.yaml

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: stan-config
data:
  stan.conf: |
    port: 4222
    http: 8222
    streaming {
     ns: "nats://nats:4222"
     id: stan
     store: file
     dir: /data/stan/store
    }
---
apiVersion: v1
kind: Service
metadata:
  name: stan
  labels:
    app: stan
spec:
  selector:
    app: stan
  clusterIP: None
  ports:
  - name: metrics
    port: 7777
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: stan
  labels:
    app: stan
spec:
  selector:
    matchLabels:
      app: stan
  serviceName: stan
  replicas: 1
  volumeClaimTemplates:
  - metadata:
      name: stan-sts-vol
    spec:
      accessModes:
      - ReadWriteOnce
      volumeMode: "Filesystem"
      resources:
        requests:
          storage: 1Gi
  template:
    metadata:
      labels:
        app: stan
    spec:
      # Prevent NATS Streaming pods running in same host.
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - topologyKey: "kubernetes.io/hostname"
            labelSelector:
              matchExpressions:
              - key: app
                operator: In
                values:
                - stan
      # STAN Server
      containers:
      - name: stan
        image: nats-streaming:0.16.2
        ports:
        - containerPort: 8222
          name: monitor
        - containerPort: 7777
          name: metrics
        args:
         - "-sc"
         - "/etc/stan-config/stan.conf"

        # Required to be able to define an environment variable
        # that refers to other environment variables.  This env var
        # is later used as part of the configuration file.
        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        volumeMounts:
          - name: config-volume
            mountPath: /etc/stan-config
          - name: stan-sts-vol
            mountPath: /data/stan

        # Disable CPU limits.
        resources:
          requests:
            cpu: 0

        livenessProbe:
          httpGet:
            path: /
            port: 8222
          initialDelaySeconds: 10
          timeoutSeconds: 5
      volumes:
      - name: config-volume
        configMap:
          name: stan-config

上記マニフェストファイルを利用してインストールを行います。

# GitHubリポジトリのクローン
$ git clone https://github.com/nats-io/k8s
$ cd k8s/


# NATSのインストール
$ kubectl apply -f nats-server/single-server-nats.yml 
configmap/nats-config created
service/nats created
statefulset.apps/nats created

# NATS Streamingのインストール
$ kubectl apply -f nats-streaming-server/single-server-stan.yml 
configmap/stan-config created
service/stan created
statefulset.apps/stan created


# インストール後の確認
$ kubectl get cm
NAME          DATA   AGE
nats-config   1      57s
stan-config   1      26s


$ kubectl get svc
NAME         TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                 AGE
kubernetes   ClusterIP   10.100.0.1   <none>        443/TCP                                                 2d
nats         ClusterIP   None         <none>        4222/TCP,6222/TCP,8222/TCP,7777/TCP,7422/TCP,7522/TCP   74s
stan         ClusterIP   None         <none>        7777/TCP                                                43s


# NATS Streamingが利用するPV/PVC
$ kubectl get pvc
NAME                  STATUS   VOLUME                                     CAPACITY   ACCESS MODES   STORAGECLASS   AGE
stan-sts-vol-stan-0   Bound    pvc-37cf09c9-b32b-4573-bb3b-9a32d6bfff7d   1Gi        RWO            gp2            20h


$ kubectl get pv
NAME                                       CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM                         STORAGECLASS   REASON   AGE
pvc-37cf09c9-b32b-4573-bb3b-9a32d6bfff7d   1Gi        RWO            Delete           Bound    default/stan-sts-vol-stan-0   gp2                     20h


$ kubectl get sc
NAME            PROVISIONER             RECLAIMPOLICY   VOLUMEBINDINGMODE      ALLOWVOLUMEEXPANSION   AGE
gp2 (default)   kubernetes.io/aws-ebs   Delete          WaitForFirstConsumer   false                  2d22h


$ kubectl get statefulsets
NAME   READY   AGE
nats   1/1     94s
stan   1/1     64s


$ kubectl get pods
NAME     READY   STATUS    RESTARTS   AGE
nats-0   1/1     Running   0          101s
stan-0   1/1     Running   0          71s


# NATSのログは以下の通り
$ kubectl logs nats-0
[7] 2020/12/28 04:50:09.349020 [INF] Starting nats-server version 2.1.7
[7] 2020/12/28 04:50:09.349061 [INF] Git commit [bf0930e]
[7] 2020/12/28 04:50:09.349569 [INF] Starting http monitor on 0.0.0.0:8222
[7] 2020/12/28 04:50:09.349616 [INF] Listening for client connections on 0.0.0.0:4222
[7] 2020/12/28 04:50:09.349625 [INF] Server id is NBBQ76INWDZXMFBGF5V4MBXCV3PBEQ5EWNFPPKS3FSTNPF3VFCR3RY2K
[7] 2020/12/28 04:50:09.349629 [INF] Server is ready


# NATS Streamingのログは以下の通り
$ kubectl logs stan-0
[1] 2020/12/28 04:51:02.605219 [INF] STREAM: Starting nats-streaming-server[stan] version 0.16.2
[1] 2020/12/28 04:51:02.605250 [INF] STREAM: ServerID: TgyiHElDc171yiUZ30g5pL
[1] 2020/12/28 04:51:02.605254 [INF] STREAM: Go version: go1.11.13
[1] 2020/12/28 04:51:02.605258 [INF] STREAM: Git commit: [910d6e1]
[1] 2020/12/28 04:51:02.631773 [INF] STREAM: Recovering the state...
[1] 2020/12/28 04:51:02.631932 [INF] STREAM: No recovered state
[1] 2020/12/28 04:51:02.883723 [INF] STREAM: Message store is FILE
[1] 2020/12/28 04:51:02.883738 [INF] STREAM: Store location: /data/stan/store
[1] 2020/12/28 04:51:02.883770 [INF] STREAM: ---------- Store Limits ----------
[1] 2020/12/28 04:51:02.883774 [INF] STREAM: Channels:                  100 *
[1] 2020/12/28 04:51:02.883778 [INF] STREAM: --------- Channels Limits --------
[1] 2020/12/28 04:51:02.883781 [INF] STREAM:   Subscriptions:          1000 *
[1] 2020/12/28 04:51:02.883785 [INF] STREAM:   Messages     :       1000000 *
[1] 2020/12/28 04:51:02.883788 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2020/12/28 04:51:02.883791 [INF] STREAM:   Age          :     unlimited *
[1] 2020/12/28 04:51:02.883795 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2020/12/28 04:51:02.883798 [INF] STREAM: ----------------------------------
[1] 2020/12/28 04:51:02.883802 [INF] STREAM: Streaming Server is ready

インストールが完了したので、簡単な動作確認を行います。

NATSはnats-boxというコンテナイメージを用意しており、これを利用することでNATSの各コマンドを実行することができます。

# nats-boxコンテナの起動
$ kubectl run -i --rm --tty nats-box --image=synadia/nats-box --restart=Never
If you dont see a command prompt, try pressing enter.
nats-box:~#


# バックグラウンドでSubscriberを起動
nats-box:~# nats-sub -s nats hello &
nats-box:~# Listening on [hello]


# Publisherからメッセージを送信
nats-box:~# nats-pub -s nats hello world
[#1] Received on [hello]: 'world'


# PublisherからNATS Streamingへメッセージ送信
nats-box:~# stan-pub -s nats -c stan hello world
Published [hello] : 'world'


# NATS Streamingからも送られたメッセージが確認できる
nats-box:~# stan-sub -s nats -c stan hello
Connected to nats clusterID: [stan] clientID: [stan-sub]
Listening on [hello], clientID=[stan-sub], qgroup=[] durable=[]
[#1] Received: sequence:1 subject:"hello" data:"world" timestamp:1609131378713322197


# NATS Streaming Podでは以下のようなログが確認できる
$ kubectl logs stan-0 -f

[1] 2020/12/28 04:56:18.713310 [INF] STREAM: Channel "hello" has been created

NATSはその他にも複数のツールを提供しており、例えばnats-topコマンドを実行するとNATS Serverのモニタリングを行うことができます。

nats-box:~# nats-top -s nats

NATS server version 2.1.7 (uptime: 8m24s)
Server:
  Load: CPU:  0.0%  Memory: 9.9M  Slow Consumers: 0
  In:   Msgs: 26  Bytes: 1022  Msgs/Sec: 0.0  Bytes/Sec: 0
  Out:  Msgs: 25  Bytes: 985  Msgs/Sec: 0.0  Bytes/Sec: 0

Connections Polled: 3
  HOST                  CID    NAME                SUBS    PENDING     MSGS_TO     MSGS_FROM   BYTES_TO    BYTES_FROM  LANG     VERSION  UPTIME   LAST ACTIVITY
  192.168.1.45:56406    1      _NSS-stan-send      0       0           0           8           0           159         go       1.8.1    7m57s    2020-12-28 08:45:11.674703256 +0000 U  192.168.1.45:56408    2      _NSS-stan-general   8       0           8           5           361         461         go       1.8.1    7m57s    2020-12-28 08:45:11.677642417 +0000 U  192.168.1.45:56410    3      _NSS-stan-acks      0       0           4           0           36          0           go       1.8.1    7m57s    2020-12-28 08:45:11.673086331 +0000 U

またNATSはモニタリング機能を提供しており、以下のようにPort: 8222からアクセスすることで、各種設定やメッセージ受信数などを確認することができます。

$ kubectl get svc
NAME         TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                 AGE
kubernetes   ClusterIP   10.100.0.1   <none>        443/TCP                                                 2d20h
nats         ClusterIP   None         <none>        4222/TCP,6222/TCP,8222/TCP,7777/TCP,7422/TCP,7522/TCP   44s
stan         ClusterIP   None         <none>        7777/TCP                                                27s

# ポートフォワーディング
$ kubectl port-forward svc/nats 8222:8222 &
[1] 10071

Forwarding from [::1]:8222 -> 8222

f:id:FY0323:20201229135232j:plain

例えばlocalhost:8222/subszにアクセスすると、Subscriptionの数などが表示されます。

f:id:FY0323:20201229135246j:plain

NATSを利用する

NATS/NATS Streamingの起動確認ができたので、クライアントライブラリからNATS Serverを利用してみます。今回はnats.goというGoのライブラリに含まれているサンプルプログラムを実行していきます。

まずはnat.goパッケージを入手します。Goのバージョンが古いと取得できないことがあるので、必要があればバージョンアップを実施します。

$ go version
go version go1.15.6 linux/amd64

$ go get github.com/nats-io/nats.go/

また以降の操作ではlocalhostからNATS Serverにアクセスするため、kubectl port-forwardコマンドによりNATSへアクセスできるようにします。

$ kubectl port-forward svc/nats 4222:4222 &
[2] 10088
$ Forwarding from 127.0.0.1:4222 -> 4222
Forwarding from [::1]:4222 -> 4222
NATS Pub/Sub

まずはPub/Subパターンを実行します。ここではPublisher1つ、Subscriber 3つを起動し、それぞれにメッセージが配信される様子を確認します。

まずは1つ目のSubscriberを起動します。ここではmsg.testというSubjectを指定します。

# Subscriber-1
$ cd $GOPATH/src/src/github.com/nats-io/nats.go/examples
$ go run nats-sub/main.go msg.test
Listening on [msg.test]

次に別のターミナルを用意し、Publisherを起動、メッセージを送信します。

# Publisher
# “hello”メッセージを送信
$ cd $GOPATH/src/src/github.com/nats-io/nats.go/examples
$ go run nats-pub/main.go msg.test hello

# “NATS MESSAGE”メッセージを送信
$ go run nats-pub/main.go msg.test "NATS MESSAGE"

Publisherからメッセージを送信すると、Publisher側では送信したことを示すメッセージが表示され、Subscriber側ではメッセージの受け取りが確認できます。

# Publisher
Published [msg.test] : 'hello'
Published [msg.test] : 'NATS MESSAGE'


# Subscriber-1
[#1] Received on [msg.test]: 'hello'
[#2] Received on [msg.test]: 'NATS MESSAGE'

次に新しいターミナルを開き、2つ目のSubscrierを起動します。ここでもmsg.testSubjectを指定します。

# Subscriber-2
$ cd $GOPATH/src/src/github.com/nats-io/nats.go/examples
$ go run nats-sub/main.go msg.test
Listening on [msg.test]

この状態でPublisherからメッセージを送信すると、2つのSubscriberへメッセージが送信されることが確認できます。

# Publisher
$ go run nats-pub/main.go msg.test "NATS MESSAGE 2"
Published [msg.test] : 'NATS MESSAGE 2'

# Subscriber-1
[#3] Received on [msg.test]: 'NATS MESSAGE 2'

# Subscriber-2
[#1] Received on [msg.test]: 'NATS MESSAGE 2'

ここでmsg.test.newという別のSubjectを指定したSubscriberを起動します。

# Subscriber-3
$ cd $GOPATH/src/src/github.com/nats-io/nats.go/examples
$ go run nats-sub/main.go msg.test.new
Listening on [msg.test.new]

ここで再びPublisherからメッセージを送信すると、今回はSubscriber-3はメッセージを受け取らない様子が確認できます。これはSubscriberで指定したmsg.test.newというSubjectが、Publisherがメッセージ送信時に指定したmsg.testというSubjectと一致しなかったためです。

# Publisher
$ go run nats-pub/main.go msg.test "NATS MESSAGE 3"
Published [msg.test] : 'NATS MESSAGE 3'

# Subscriber-1
[#4] Received on [msg.test]: 'NATS MESSAGE 3'

# Subscriber-2
[#2] Received on [msg.test]: 'NATS MESSAGE 3'

# Subscriber-3

(no message)

ここでSubscriber-3を一度停止し、起動時に指定するSubjectをmsg.*に変更します。今度はSubjectにワイルドカードを含み、先ほどPublisherが指定したmsg.testが条件に一致するため、メッセージを受け取る様子が確認できます。

# Subscriber-3を停止、再起動
$ go run nats-sub/main.go msg.test.new
Listening on [msg.test.new]

^Csignal: interrupt

$ go run nats-sub/main.go msg.*
Listening on [msg.*]


# Publisher
$ go run nats-pub/main.go msg.test "NATS MESSAGE 4"
Published [msg.test] : 'NATS MESSAGE 4'

# Subscriber-1
[#5] Received on [msg.test]: 'NATS MESSAGE 4'

# Subscriber-2
[#3] Received on [msg.test]: 'NATS MESSAGE 4'

# Subscriber-3
[#1] Received on [msg.test]: 'NATS MESSAGE 4'

なお、NATSを起動する際に特定のオプションを指定することで、NATS Server側でログを出力することができます。例えば-Vオプションを追加すると、Subscriber・Publisher間でやり取りが発生した際、以下のようにログが出力されます。

single-server-nats.yaml

        command:
         - "nats-server"
         - "-V"  # 追加
         - "--config"
         - "/etc/nats-config/nats.conf"
$ kubectl logs nats-0 -f
# NATS Server起動時のログ
[7] 2020/12/29 04:42:11.784867 [INF] Starting nats-server version 2.1.7
[7] 2020/12/29 04:42:11.784895 [INF] Git commit [bf0930e]
[7] 2020/12/29 04:42:11.785860 [INF] Starting http monitor on 0.0.0.0:8222
[7] 2020/12/29 04:42:11.785910 [INF] Listening for client connections on 0.0.0.0:4222
[7] 2020/12/29 04:42:11.785919 [INF] Server id is NDK7LIGLOBR5MC5DOJKNVHHTUOBZWIMJRVXBRNNDFNHM5UKUZKJPZ6ST
[7] 2020/12/29 04:42:11.785923 [INF] Server is ready



# Subscriber-1起動時
[7] 2020/12/29 04:43:35.461255 [TRC] 127.0.0.1:38544 - cid:1 - <<- [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS Sample Subscriber","lang":"go","version":"1.11.0","protocol":1,"echo":true,"headers":false,"no_responders":false}]
[7] 2020/12/29 04:43:35.461364 [TRC] 127.0.0.1:38544 - cid:1 - <<- [PING]
[7] 2020/12/29 04:43:35.461372 [TRC] 127.0.0.1:38544 - cid:1 - ->> [PONG]
[7] 2020/12/29 04:43:35.471896 [TRC] 127.0.0.1:38544 - cid:1 - <<- [SUB msg.test  1]
[7] 2020/12/29 04:43:35.471916 [TRC] 127.0.0.1:38544 - cid:1 - <<- [PING]
[7] 2020/12/29 04:43:35.471921 [TRC] 127.0.0.1:38544 - cid:1 - ->> [PONG]
[7] 2020/12/29 04:43:37.787668 [TRC] 127.0.0.1:38544 - cid:1 - ->> [PING]
[7] 2020/12/29 04:43:37.798709 [TRC] 127.0.0.1:38544 - cid:1 - <<- [PONG]



# Publisher起動・メッセージ送信時
[7] 2020/12/29 04:44:02.334750 [TRC] 127.0.0.1:38702 - cid:2 - <<- [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"NATS Sample Subscriber","lang":"go","version":"1.11.0","protocol":1,"echo":true,"headers":false,"no_responders":false}]
[7] 2020/12/29 04:44:02.334822 [TRC] 127.0.0.1:38702 - cid:2 - <<- [PING]
[7] 2020/12/29 04:44:02.334829 [TRC] 127.0.0.1:38702 - cid:2 - ->> [PONG]
[7] 2020/12/29 04:44:02.345670 [TRC] 127.0.0.1:38702 - cid:2 - <<- [SUB msg.test  1]
[7] 2020/12/29 04:44:02.345694 [TRC] 127.0.0.1:38702 - cid:2 - <<- [PING]
[7] 2020/12/29 04:44:02.345700 [TRC] 127.0.0.1:38702 - cid:2 - ->> [PONG]
[7] 2020/12/29 04:44:04.424048 [TRC] 127.0.0.1:38702 - cid:2 - ->> [PING]
[7] 2020/12/29 04:44:04.437254 [TRC] 127.0.0.1:38702 - cid:2 - <<- [PONG]
NATS Request/Reply

次にRequest/Replyパターンを実行します。ここではPublisher/Subscriberを一つずつ起動し、Publisherからメッセージを送信するとResponseが返ってくる様子を確認します。

まずはSubscriberを起動します。ここではhelp.pleaseというSubjectを設定し、またResponseメッセージも設定します。

# Subscriber
$ go run nats-rply/main.go help.please "OK, I CAN HELP!"
Listening on [help.please]

次にPublisherを起動し、同一のSubjectとRequestメッセージを指定します。

# Publisher
$ go run nats-req/main.go help.please "I need help!"

Publisherがメッセージを送信すると、Subscriberがメッセージを受け取る様子に加え、Publisher側でResponseメッセージが表示されることが確認できます。

# Subscriber
[#1] Received on [help.please]: 'I need help!'


# Publisher
Published [help.please] : 'I need help!'
Received  [_INBOX.g672l3mO17Q5MuaBrdZalB.roViAMQe] : 'OK, I CAN HELP!'  # Response Message
NATS Queueing

最後にQueueingパターンを実行します。ここでは複数のSubscriberを起動してQueue Groupを構成し、Publisherからのメッセージが各Subscriberに分散されて送られる様子を確認します。

まずは2つのSubscriberをQueue Groupを与えて起動します。

# Subscriber-1 (Queue Group)
$ go run nats-qsub/main.go foo my-queue
Listening on [foo], queue group [my-queue]

# Subscriber-2 (Queue Group)
$ go run nats-qsub/main.go foo my-queue
Listening on [foo], queue group [my-queue]

次にQueue Groupを指定せず、先ほどと同じSubjectを指定したSubscriberを起動します。

# Subscriber-3 (No Queue Group)
go run nats-sub/main.go foo
Listening on [foo]

この状態でPublisherからメッセージが送られると、Queue Groupに属するSubscriberはメッセージが分散されて送られますが、Queue Groupに属さないSubscriberは全てのメッセージが配信されます。

# Publisher
$ go run nats-pub/main.go foo "Hello NATS-1"

# 同様のメッセージを計5回送信する
$ go run nats-pub/main.go foo "Hello NATS-5"


# Subscriber-1 (Queue Group)
[#1] Received on [foo] Queue[my-queue] Pid[11326]: 'Hello NATS-3'
[#2] Received on [foo] Queue[my-queue] Pid[11326]: 'Hello NATS-4'


# Subscriber-2 (Queue Group)
[#1] Received on [foo] Queue[my-queue] Pid[11392]: 'Hello NATS-1'
[#2] Received on [foo] Queue[my-queue] Pid[11392]: 'Hello NATS-2'
[#3] Received on [foo] Queue[my-queue] Pid[11392]: 'Hello NATS-5'


# Subscriber-3 (No Queue Group)
[#1] Received on [foo]: 'Hello NATS-1'
[#2] Received on [foo]: 'Hello NATS-2'
[#3] Received on [foo]: 'Hello NATS-3'
[#4] Received on [foo]: 'Hello NATS-4'
[#5] Received on [foo]: 'Hello NATS-5'

参考ドキュメント