fluentdのmonitor_agentのデータをGoでGoogle Stackdriverに送って監視する

(2017-02-19)

fluentdのmonitor_agent

メトリクスをjsonで返すAPIを提供する。

<source>
  @type monitor_agent
  bind 0.0.0.0
  port 24220
</source>
$ curl localhost:24220/api/plugins.json | jq
{
  "plugins": [
    {
      "plugin_id": "object:3f8590d8c250",
      "plugin_category": "input",
      "type": "forward",
      "config": {
        "@type": "forward",
        "port": "24222",
        "bind": "0.0.0.0"
      },
      "output_plugin": false,
      "retry_count": null
    },
    {
      "plugin_id": "object:3f8590d894c4",
      "plugin_category": "input",
      "type": "monitor_agent",
      "config": {
        "@type": "monitor_agent",
        "bind": "0.0.0.0",
        "port": "24220"
      },
      "output_plugin": false,
      "retry_count": null
    },
    {
      "plugin_id": "object:3f8590dc1f2c",
      "plugin_category": "output",
      "type": "file",
      "config": {
        "@type": "file",
        "path": "/var/log/td-agent/hoge.log",
        "buffer_path": "/var/log/td-agent/hoge.log.*"
      },
      "output_plugin": true,
      "buffer_queue_length": 0,
      "buffer_total_queued_size": 0,
      "retry_count": 0
    }
  ]
}

これをもとにStackdriverで異常を検知できるようにする。

Google Stackdriver

GoogleがStackdriverを買収して改造したもの。GCPだけではなくAWSのリソースも監視できる。 まだBeta。

EC2インスタンスを監視する

GCPのメニューのSTACKDRIVER -> モニタリングで、プロジェクトを指定してStackdriverアカウントを作成する。

今回はEC2で動いているfluentdを監視するので指示に従ってクロスアカウントアクセスのロールを作成、 Role ARNを入力してAWSアカウントと接続すると、 StackdriverのResouces->InstancesでCPUの使用率などは確認できるが、 EC2にAgentを入れると詳細な情報を取得できる。

GCPのメニューのサービスアカウントから接続したAWSアカウントを選択し、 Project->編集者とLogging->ログ書き込みロールのサービスアカウントを作成する。 新しい秘密鍵の提供にチェックを入れて、JSONのキーをダウンロードする。 これをEC2の/etc/google/auth/application_default_credentials.jsonに置いて chown root:rootchmod 400する。

Monitoring AgentとLogging Agentをインストールし、 stackdriver-collectdgoogle-fluentdのプロセスがあれば正常。

curl -O https://repo.stackdriver.com/stack-install.sh
sudo bash stack-install.sh --write-gcm

curl -sSO https://dl.google.com/cloudagents/install-logging-agent.sh
sudo bash install-logging-agent.sh

メモリの使用量やTCPコネクション数などがとれていることを確認する。 Googleのドキュメントには見つからなかったけど、 旧Stackdriverと同様、stackdriver_monitor: falseのタグを付けると 監視対象から外れる っぽい。

カスタムメトリクスを送る

MetricDescriptorを作成し、これにTimeSeriesデータを書き込んでいく。

MetricDescriptorの作成

MetricDescriptor

typeはcustom.googleapis.com/ から始める

metricKind にはGAUGEのほかに変化量をとるDELTA、累積するCUMULATIVEを指定できる。

labelはフィルタリングのためのもの。

{
  "name": "",
  "description": "fluentd buffer_queue_length",
  "displayName": "fluentd-buffer_queue_length",
  "type": "custom.googleapis.com/fluentd/buffer_queue_length",
  "metricKind": "GAUGE",
  "valueType": "INT64",
  "labels": [
    {
      "key": "plugin_type",
      "valueType": "STRING",
      "description": "The type of the plugin"
    },
  ],
}

これをGoで登録する。

gcpのほうのprojectでProject->編集者のサービスアカウントを作成してパスを 環境変数GOOGLE_APPLICATION_CREDENTIALSに入れて Default Credential にする。

必要なパッケージをgo get。

$ go get google.golang.org/api/monitoring/v3
$ go get golang.org/x/oauth2/google
func main() {
	ctx := context.Background()
	httpClient, err := google.DefaultClient(ctx, monitoring.CloudPlatformScope)
	if err != nil {
		panic(err)
	}
	client, err := monitoring.New(httpClient)
	if err != nil {
		panic(err)
	}

	var (
		// The project on which to execute the request. The format is `"projects/{project_id_or_number}"`.
		name = "projects/*****"

		requestBody = &monitoring.MetricDescriptor{
			Description: "fluentd buffer_queue_length",
			DisplayName: "fluentd-buffer_queue_length",
			Type:        "custom.googleapis.com/fluentd/buffer_queue_length",
			MetricKind:  "GAUGE",
			ValueType:   "INT64",
			Labels: []*monitoring.LabelDescriptor{
				&monitoring.LabelDescriptor{
					Key:         "plugin_type",
					ValueType:   "STRING",
					Description: "The type of the plugin",
				},
			},
		}
	)

	response, err := client.Projects.MetricDescriptors.Create(name, requestBody).Context(ctx).Do()
	if err != nil {
		panic(err)
	}

	fmt.Println("done")
}

登録されたことをlistで確認する。

response, err := client.Projects.MetricDescriptors.List(name).Context(ctx).Do()
if err != nil {
  panic(err)
}

for _, v := range response.MetricDescriptors {
  fmt.Println(v.DisplayName)
}
API Request Count
Agent Memory Usage
Stream Space Used
...
fluentd-buffer_queue_length
...

TimeSeriesの書き込み

TimeSeries

metricのtypeはMetricDescriptorのtypeと対応する。 pointsのendTimeはRFC3339のUTC文字列で渡す。

{
 "timeSeries": [
  {
   "metric": {
    "type": "custom.googleapis.com/fluentd/buffer_queue_length",
    "labels": {
     "plugin_type": "file"
    }
   },
   "resource": {
    "type": "aws_ec2_instance",
    "labels": {
     "project_id": "*****",
     "instance_id": "*****",
     "region": "aws:ap-northeast-1",
     "aws_account": "*****"
    }
   },
   "points": [
    {
     "interval": {
      "endTime": "2016-06-01T10:00:00-04:00"
     },
     "value": {
      "int64Value": 0
     }
    }
   ]
  }
 ]
}

resourceのtypeは MonitoredResourceDescriptor と対応していて、 listで確認できる。

{
 "resourceDescriptors": [
   {
   "type": "aws_ec2_instance",
   "displayName": "Amazon EC2 Instance",
   "description": "A VM instance in Amazon EC2.",
   "labels": [
    {
     "key": "project_id",
     "description": "The identifier of the GCP project under which data is stored for the AWS account specified in the aws_account label (e.g., my-project)."
    },
    {
     "key": "instance_id",
     "description": "The VM instance identifier assigned by AWS."
    },
    {
     "key": "region",
     "description": "The AWS region in which the VM is running. Supported AWS region values are listed by service at http://docs.aws.amazon.com/general/latest/gr/rande.html. The value supplied for this label must be prefixed with 'aws:' (for example, 'aws:us-east-1' is a valid value while 'us-east-1' is not)."
    },
    {
     "key": "aws_account",
     "description": "The AWS account number under which the VM is running."
    }
   ]
  },
  ...
 ]
}

書くコード。

func writeFluentdBufferQueueLength() error {
	ctx := context.Background()
	httpClient, err := google.DefaultClient(ctx, monitoring.CloudPlatformScope)
	if err != nil {
		return err
	}
	client, err := monitoring.New(httpClient)
	if err != nil {
		return err
	}

	now := time.Now().UTC().Format(time.RFC3339)

	resource := &monitoring.MonitoredResource{
		Type: "aws_ec2_instance",
		Labels: map[string]string{
			"project_id":  "*****",
			"instance_id": "*****",
			"region":      "aws:ap-northeast-1",
			"aws_account": "*****",
		},
	}

	metrics, err := fetchFluentdMetrics()
	if err != nil {
		return err
	}

	timeSeries := []*monitoring.TimeSeries{}

	for _, v := range metrics.Plugins {
		if v.OutputPlugin {

			fmt.Printf("send %s\n", v.Type)

			timeSeries = append(
				timeSeries,
				&monitoring.TimeSeries{
					Metric: &monitoring.Metric{
						Type: "custom.googleapis.com/fluentd/buffer_queue_length",
						Labels: map[string]string{
							"plugin_type": v.Type,
						},
					},
					Resource: resource,
					Points: []*monitoring.Point{
						&monitoring.Point{
							Interval: &monitoring.TimeInterval{
								EndTime: now,
							},
							Value: &monitoring.TypedValue{
								Int64Value: int64p(v.BufferQueueLength),
							},
						},
					},
				},
			)
		}
	}

	var (
		// The project on which to execute the request. The format is `"projects/{project_id_or_number}"`.
		name = "projects/try-stackdriver-159110"

		requestBody = &monitoring.CreateTimeSeriesRequest{
			TimeSeries: timeSeries,
		}
	)

	_, err = client.Projects.TimeSeries.Create(name, requestBody).Context(ctx).Do()
	if err != nil {
		return err
	}

	fmt.Println("done")

	return nil
}

const fluentdMonitorEndpoint = "http://localhost:24220/api/plugins.json"

type fluentdMetrics struct {
	Plugins []fluentdMetricsPlugin `json:"plugins"`
}
type fluentdMetricsPlugin struct {
	Type              string `json:"type"`
	OutputPlugin      bool   `json:"output_plugin"`
	BufferQueueLength int64  `json:"buffer_queue_length"`
}

// monitor_agentからfluentdのメトリクスを取得する
func fetchFluentdMetrics() (*fluentdMetrics, error) {

	resp, err := http.Get(fluentdMonitorEndpoint)
	if err != nil {
		return nil, err
	}

	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return nil, err
	}

	var ret fluentdMetrics

	if err := json.Unmarshal(body, &ret); err != nil {
		return nil, err
	}

	return &ret, nil
}

// int64 -> *int64
func int64p(n int64) *int64 {
	return &n
}

これをgocronなどで定期的に実行させる。

読むコード。確認用。

func readFluentdBufferQueueLength() error {
	ctx := context.Background()
	httpClient, err := google.DefaultClient(ctx, monitoring.CloudPlatformScope)
	if err != nil {
		return err
	}
	client, err := monitoring.New(httpClient)
	if err != nil {
		return err
	}

	var (
		// The project on which to execute the request. The format is `"projects/{project_id_or_number}"`.
		name = "projects/*****"
	)

	start := time.Now().Add(time.Hour * -3).UTC().Format(time.RFC3339)
	now := time.Now().UTC().Format(time.RFC3339)

	filter := "metric.type = \"custom.googleapis.com/fluentd/buffer_queue_length\""
	resp, err := client.Projects.TimeSeries.List(name).
		IntervalStartTime(start).
		IntervalEndTime(now).
		Filter(filter).Context(ctx).Do()
	if err != nil {
		return err
	}

	for _, v := range resp.TimeSeries {
		fmt.Println(v.Metric.Type)
		for _, p := range v.Points {
			fmt.Println(*(p.Value.Int64Value))
		}
	}

	return nil
}

ちゃんと届いていれば Resource->Metrics Explorerでもcustom/fluentd/buffer_queue_lengthを確認できる。

これでAlertを設定できるようになった。TargetのResource TypeはCustom Metrics。

Alertの設定