メインコンテンツまでスキップ

「nosql」タグの記事が2件件あります

全てのタグを見る

· 約5分
moritalous
お知らせ

過去にQiitaに投稿した内容のアーカイブです。

Timestreamの検索をSDKを使ってやってみました。

クエリーをする最小のコードはこんな感じ。

import boto3
from botocore.config import Config

query = """
SELECT fleet, truck_id, fuel_capacity, model, load_capacity, make, measure_name, CREATE_TIME_SERIES(time, measure_value::double) as measure
FROM "IoT-sample"."IoT"
WHERE measure_value::double IS NOT NULL
AND measure_name = 'speed'
GROUP BY fleet, truck_id, fuel_capacity, model, load_capacity, make, measure_name
ORDER BY fleet, truck_id, fuel_capacity, model, load_capacity, make, measure_name
LIMIT 2
"""

config = Config(region_name = 'us-east-1')
config.endpoint_discovery_enabled = True
client = boto3.client('timestream-query', config=config)

result = client.query(QueryString=query)
resultはこんな感じ
{
"QueryId": "AEBQEAMXPVS6RHIUBHJXW6H4BRUINE7GFNEMJMP2AYZFQHAUJLR5ZO4PABHEU2A",
"Rows": [
{
"Data": [
{
"ScalarValue": "Alpha"
},
{
"ScalarValue": "1234546252"
},
{
"ScalarValue": "150"
},
{
"ScalarValue": "W925"
},
{
"ScalarValue": "1000"
},
{
"ScalarValue": "Kenworth"
},
{
"ScalarValue": "speed"
},
{
"TimeSeriesValue": [
{
"Time": "2020-10-09 20:53:57.273000000",
"Value": {
"ScalarValue": "75.0"
}
},
{
"Time": "2020-10-09 21:12:02.919000000",
"Value": {
"ScalarValue": "60.0"
}
},
{
"Time": "2020-10-09 21:26:49.334000000",
"Value": {
"ScalarValue": "10.0"
}
},
{
"Time": "2020-10-09 21:31:01.641000000",
"Value": {
"ScalarValue": "15.0"
}
},
{
"Time": "2020-10-09 21:49:01.249000000",
"Value": {
"ScalarValue": "47.0"
}
},
{
"Time": "2020-10-09 21:56:08.380000000",
"Value": {
"ScalarValue": "44.0"
}
},
{
"Time": "2020-10-09 23:50:37.597000000",
"Value": {
"ScalarValue": "45.0"
}
},
{
"Time": "2020-10-10 00:24:09.414000000",
"Value": {
"ScalarValue": "0.0"
}
},
{
"Time": "2020-10-10 00:48:40.046000000",
"Value": {
"ScalarValue": "55.0"
}
},
{
"Time": "2020-10-10 01:33:44.347000000",
"Value": {
"ScalarValue": "65.0"
}
}
]
}
]
},
{
"Data": [
{
"ScalarValue": "Alpha"
},
{
"ScalarValue": "1575739296"
},
{
"ScalarValue": "100"
},
{
"ScalarValue": "359"
},
{
"ScalarValue": "1000"
},
{
"ScalarValue": "Peterbilt"
},
{
"ScalarValue": "speed"
},
{
"TimeSeriesValue": [
{
"Time": "2020-10-09 21:24:41.479000000",
"Value": {
"ScalarValue": "17.0"
}
},
{
"Time": "2020-10-09 21:51:00.847000000",
"Value": {
"ScalarValue": "40.0"
}
},
{
"Time": "2020-10-09 23:07:10.695000000",
"Value": {
"ScalarValue": "41.0"
}
},
{
"Time": "2020-10-09 23:11:31.029000000",
"Value": {
"ScalarValue": "60.0"
}
},
{
"Time": "2020-10-10 00:03:54.235000000",
"Value": {
"ScalarValue": "69.0"
}
},
{
"Time": "2020-10-10 00:27:58.341000000",
"Value": {
"ScalarValue": "56.0"
}
},
{
"Time": "2020-10-10 00:29:38.188000000",
"Value": {
"ScalarValue": "4.0"
}
},
{
"Time": "2020-10-10 00:30:54.110000000",
"Value": {
"ScalarValue": "27.0"
}
},
{
"Time": "2020-10-10 00:58:07.005000000",
"Value": {
"ScalarValue": "21.0"
}
},
{
"Time": "2020-10-10 01:12:06.518000000",
"Value": {
"ScalarValue": "30.0"
}
}
]
}
]
}
],
"ColumnInfo": [
{
"Name": "fleet",
"Type": {
"ScalarType": "VARCHAR"
}
},
{
"Name": "truck_id",
"Type": {
"ScalarType": "VARCHAR"
}
},
{
"Name": "fuel_capacity",
"Type": {
"ScalarType": "VARCHAR"
}
},
{
"Name": "model",
"Type": {
"ScalarType": "VARCHAR"
}
},
{
"Name": "load_capacity",
"Type": {
"ScalarType": "VARCHAR"
}
},
{
"Name": "make",
"Type": {
"ScalarType": "VARCHAR"
}
},
{
"Name": "measure_name",
"Type": {
"ScalarType": "VARCHAR"
}
},
{
"Name": "measure",
"Type": {
"TimeSeriesMeasureValueColumnInfo": {
"Type": {
"ScalarType": "DOUBLE"
}
}
}
}
],
"ResponseMetadata": {
"RequestId": "JRJDO6RN63OVLGZ6ZX52GCCPF4",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid": "JRJDO6RN63OVLGZ6ZX52GCCPF4",
"content-type": "application/x-amz-json-1.0",
"content-length": "2413",
"date": "Sat, 10 Oct 2020 07:33:29 GMT"
},
"RetryAttempts": 0
}
}

result['Rows'][0]['Data'][0]に検索結果が入っています。 そのままではグラフにしづらいので、変形しました。timeがISO 8601形式のUTC時刻で扱いづらいので、datetimeに変換しています。

def convert_callback(x):
dt = datetime.fromisoformat(x[:26])
dt = dt.replace(tzinfo= timezone.utc)
return dt

rows = result['Rows']
for row in rows:
measure = row['Data'][7]['TimeSeriesValue']

time = list(map(lambda x: (convert_callback(x['Time'])), measure))
value = list(map(lambda x: (float(x['Value']['ScalarValue'])), measure))

あとはこいつをグラフにします。

register_matplotlib_converters()
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)

ax.plot(time, value)

daysFmt = mdates.DateFormatter('%m-%d %H:%M')
ax.xaxis.set_major_formatter(daysFmt)
fig.autofmt_xdate()

plt.show()

スクリーンショット 2020-10-10 11.57.45.png

めでたしめでたし。あまり派手なグラフではありませんが。。

コードの全体はこちら。

import json
from datetime import datetime, timezone

import boto3
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import pandas as pd
from botocore.config import Config
from pandas.plotting import register_matplotlib_converters

query = """
SELECT fleet, truck_id, fuel_capacity, model, load_capacity, make, measure_name, CREATE_TIME_SERIES(time, measure_value::double) as measure
FROM "IoT-sample"."IoT"
WHERE measure_value::double IS NOT NULL
AND measure_name = 'speed'
GROUP BY fleet, truck_id, fuel_capacity, model, load_capacity, make, measure_name
ORDER BY fleet, truck_id, fuel_capacity, model, load_capacity, make, measure_name
LIMIT 2
"""

def create_client():
config = Config(region_name = 'us-east-1')
config.endpoint_discovery_enabled = True
client = boto3.client('timestream-query', config=config)
return client

def convert_callback(x):
dt = datetime.fromisoformat(x[:26])
dt = dt.replace(tzinfo= timezone.utc)
return dt

if __name__ == "__main__":

client = create_client()
result = client.query(QueryString=query)

rows = result['Rows']

register_matplotlib_converters()
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)

for row in rows:
measure = row['Data'][7]['TimeSeriesValue']

time = list(map(lambda x: (convert_callback(x['Time'])), measure))
value = list(map(lambda x: (float(x['Value']['ScalarValue'])), measure))

ax.plot(time, value)

daysFmt = mdates.DateFormatter('%m-%d %H:%M')
ax.xaxis.set_major_formatter(daysFmt)
fig.autofmt_xdate()

plt.show()

· 約7分
moritalous
お知らせ

過去にQiitaに投稿した内容のアーカイブです。

GAリリースされたAmazon Timestreamにいろいろなクエリーを投げてみました。

サンプルデータ

Timestreamのデータベースを作成する際に、サンプルデータが入ったものを作成することができます。 今回はこちらを使ってクエリーを投げてみました。

fleettruck_idfuel_capacitymodelload_capacitymakemeasure_value::doublemeasure_value::varcharmeasure_nametime
Alpha71285552981003591000Peterbilt7.842702716127531-fuel-reading2020-10-09 14:11:54.267000000
Alpha16416886151003591000Peterbilt-36.1627° N, 86.7816° Wlocation2020-10-09 14:11:52.086000000
Alpha9355060257100Wrecker500Peterbilt12.0-speed2020-10-09 14:11:48.210000000
Alpha7219245711100Wrecker500Peterbilt19.78241022554396-fuel-reading2020-10-09 14:11:44.294000000
Alpha211355171003591000Peterbilt74.1987909949444-fuel-reading2020-10-09 14:11:27.965000000
Alpha433496933100Wrecker500Peterbilt112.0-load2020-10-09 14:11:27.788000000
Alpha3496454495150W9251000Kenworth-36.1627° N, 86.7816° Wlocation2020-10-09 14:11:27.742000000
Alpha43540449371003591000Peterbilt69.50232420155284-fuel-reading2020-10-09 14:11:25.140000000
Alpha5644432615150W9251000Kenworth101.12753874492175-fuel-reading2020-10-09 14:11:24.987000000
Alpha76021942111003591000Peterbilt138.0-load2020-10-09 14:11:18.528000000

timeに時刻、measure_name測定値の名前が入り、measure_value::doubleまたはmeasure_value::varcharに計測値が入ってます。 fleetやtruck_idなどは属性値で、Dimensionと呼ぶようです。

SQL

いわゆる通常のSQLが使えます。サブクエリも使えるとのことです。

SELECT * 
FROM "IoT-Sample"."IoT"
WHERE truck_id = '4132246625' AND measure_name = 'speed'
ORDER BY time DESC
LIMIT 10
fleettruck_idfuel_capacitymodelload_capacitymakemeasure_value::doublemeasure_value::varcharmeasure_nametime
Alpha41322466251003591000Peterbilt67.0-speed2020-10-09 14:09:21.208000000
Alpha41322466251003591000Peterbilt73.0-speed2020-10-09 12:24:41.422000000
Alpha41322466251003591000Peterbilt16.0-speed2020-10-09 12:16:49.933000000
Alpha41322466251003591000Peterbilt3.0-speed2020-10-09 12:00:57.192000000
Alpha41322466251003591000Peterbilt35.0-speed2020-10-09 11:51:33.847000000
Alpha41322466251003591000Peterbilt49.0-speed2020-10-09 11:48:21.102000000
Alpha41322466251003591000Peterbilt50.0-speed2020-10-09 11:47:10.982000000
Alpha41322466251003591000Peterbilt52.0-speed2020-10-09 11:01:30.840000000
Alpha41322466251003591000Peterbilt54.0-speed2020-10-09 09:22:08.533000000
Alpha41322466251003591000Peterbilt19.0-speed2020-10-09 09:03:11.096000000

Date / Time Functions

時系列データベースというので、時刻関係の関数が用意されており、bin(timestamp, X unit)関数はX unitに切り捨てしてくれます。 例えば30分ごとの平均を取得。

SELECT truck_id, avg(measure_value::double) as avg_speed, BIN(time, 30m) as binned_time
FROM "IoT-Sample"."IoT"
WHERE truck_id = '4132246625' AND measure_name = 'speed'
GROUP BY truck_id, BIN(time, 30m)
ORDER BY binned_time DESC
LIMIT 10
truck_idavg_speedbinned_time
413224662567.02020-10-09 14:00:00.000000000
413224662530.6666666666666682020-10-09 12:00:00.000000000
413224662544.6666666666666642020-10-09 11:30:00.000000000
413224662552.02020-10-09 11:00:00.000000000
413224662536.52020-10-09 09:00:00.000000000

Timeseries views

Timestreamでは、timeseriesというデータタイプが用意されいて、CREATE_TIME_SERIES関数を使うことで、timeseries型に変換できます。 timeseries型のデータは、時刻と計測値のペアがJSON形式で格納されます。

SELECT truck_id, CREATE_TIME_SERIES(time, measure_value::double) as speed
FROM "IoT-Sample"."IoT"
WHERE measure_name = 'speed'
GROUP BY truck_id
ORDER BY truck_id
LIMIT 10
truck_idspeed
1234546252[
{ time: 2020-10-09 09:45:32.192000000, value: 65.0 },
{ time: 2020-10-09 11:16:40.034000000, value: 45.0 }...
View 8 more row(s)
]
1575739296[
{ time: 2020-10-09 09:15:07.856000000, value: 56.0 },
{ time: 2020-10-09 10:29:40.381000000, value: 30.0 }...
View 8 more row(s)
]
1588092325[
{ time: 2020-10-09 09:01:49.081000000, value: 19.0 },
{ time: 2020-10-09 09:22:57.346000000, value: 67.0 }...
View 8 more row(s)
]
1641688615[
{ time: 2020-10-09 09:49:20.010000000, value: 37.0 },
{ time: 2020-10-09 10:14:42.971000000, value: 42.0 }...
View 8 more row(s)
]
1682738967[
{ time: 2020-10-09 09:07:58.814000000, value: 75.0 },
{ time: 2020-10-09 09:28:31.453000000, value: 8.0 }...
View 8 more row(s)
]
1712492054[
{ time: 2020-10-09 09:36:27.020000000, value: 44.0 },
{ time: 2020-10-09 09:49:04.039000000, value: 65.0 }...
View 8 more row(s)
]
1836816173[
{ time: 2020-10-09 08:59:28.330000000, value: 29.0 },
{ time: 2020-10-09 09:21:43.510000000, value: 15.0 }...
View 8 more row(s)
]
199744055[
{ time: 2020-10-09 09:16:35.398000000, value: 42.0 },
{ time: 2020-10-09 09:23:48.835000000, value: 46.0 }...
View 8 more row(s)
]
2062792987[
{ time: 2020-10-09 09:30:31.074000000, value: 45.0 },
{ time: 2020-10-09 09:54:24.573000000, value: 50.0 }...
View 8 more row(s)
]
21135517[
{ time: 2020-10-09 09:23:58.552000000, value: 64.0 },
{ time: 2020-10-09 10:43:06.367000000, value: 48.0 }...
View 8 more row(s)
]

View X more row(s)の部分はマネジメントコンソールでは省略して表示されており、クリックすると、詳細が表示されます。

image.png

timevalue
2020-10-09 09:45:32.19200000065.0
2020-10-09 11:16:40.03400000045.0
2020-10-09 11:28:41.93800000047.0
2020-10-09 11:33:45.04700000055.0
2020-10-09 12:19:13.36700000075.0
2020-10-09 12:36:46.97000000010.0
2020-10-09 13:30:40.72200000044.0
2020-10-09 13:56:28.83700000060.0
2020-10-09 14:01:50.17700000015.0
2020-10-09 14:03:38.0200000000.0

Time series functions

時系列データに対する関数も用意されています。例えば、INTERPOLATE_LINEAR関数は値を埋めてくれます。

Fills in missing data using linear interpolation.

下の例は、30分ごとのデータを線形補間で生成しています。sequence(start, stop, step)関数で生成する値の間隔を指定していますが、startとstopは実際の値の範囲内じゃないとだめっぽいです。

SELECT truck_id, 
INTERPOLATE_LINEAR(
CREATE_TIME_SERIES(time, measure_value::double),
SEQUENCE(min(time), max(time), 30m)
) as speed
FROM "IoT-Sample"."IoT"
WHERE measure_name = 'speed'
GROUP BY truck_id
ORDER BY truck_id
LIMIT 10
truck_idspeed
1234546252[
{ time: 2020-10-09 09:45:32.192000000, value: 65.0 },
{ time: 2020-10-09 10:15:32.192000000, value: 58.416049695656895 }...
View 7 more row(s)
]
1575739296[
{ time: 2020-10-09 09:15:07.856000000, value: 56.0 },
{ time: 2020-10-09 09:45:07.856000000, value: 45.53611215141335 }...
View 8 more row(s)
]
1588092325[
{ time: 2020-10-09 09:01:49.081000000, value: 19.0 },
{ time: 2020-10-09 09:31:49.081000000, value: 68.91556160771218 }...
View 7 more row(s)
]
1641688615[
{ time: 2020-10-09 09:49:20.010000000, value: 37.0 },
{ time: 2020-10-09 10:19:20.010000000, value: 47.34174652449723 }...
View 7 more row(s)
]
1682738967[
{ time: 2020-10-09 09:07:58.814000000, value: 75.0 },
{ time: 2020-10-09 09:37:58.814000000, value: 9.162118557623112 }...
View 7 more row(s)
]
1712492054[
{ time: 2020-10-09 09:36:27.020000000, value: 44.0 },
{ time: 2020-10-09 10:06:27.020000000, value: 34.61657724615213 }...
View 6 more row(s)
]
1836816173[
{ time: 2020-10-09 08:59:28.330000000, value: 29.0 },
{ time: 2020-10-09 09:29:28.330000000, value: 31.383254052802055 }...
View 8 more row(s)
]
199744055[
{ time: 2020-10-09 09:16:35.398000000, value: 42.0 },
{ time: 2020-10-09 09:46:35.398000000, value: 13.60871678326465 }...
View 6 more row(s)
]
2062792987[
{ time: 2020-10-09 09:30:31.074000000, value: 45.0 },
{ time: 2020-10-09 10:00:31.074000000, value: 52.07211022782296 }...
View 7 more row(s)
]
21135517[
{ time: 2020-10-09 09:23:58.552000000, value: 64.0 },
{ time: 2020-10-09 09:53:58.552000000, value: 57.93405176907693 }...
View 8 more row(s)
]

データが30分おきになっています。

timevalue
2020-10-09 09:45:32.19200000065.0
2020-10-09 10:15:32.19200000058.416049695656895
2020-10-09 10:45:32.19200000051.83209939131379
2020-10-09 11:15:32.19200000045.248149086970685
2020-10-09 11:45:32.19200000060.18373944405349
2020-10-09 12:15:32.19200000073.37867258972554
2020-10-09 12:45:32.19200000015.522237945272241
2020-10-09 13:15:32.19200000034.44762245218557
2020-10-09 13:45:32.19200000053.213475743081105

min(time)からの30分間隔は少し気持ちが悪いので、00分、30分ごとにするにはこんな感じでしょうか

SELECT truck_id, 
INTERPOLATE_LINEAR(
CREATE_TIME_SERIES(time, measure_value::double),
SEQUENCE(bin(min(time)+ 1h ,1h), max(time), 30m)
) as speed
FROM "IoT-Sample"."IoT"
WHERE measure_name = 'speed'
GROUP BY truck_id
ORDER BY truck_id
LIMIT 10
truck_idspeed
1234546252[
{ time: 2020-10-09 10:00:00.000000000, value: 61.82577514127146 },
{ time: 2020-10-09 10:30:00.000000000, value: 55.24182483692835 }...
View 7 more row(s)
]
1575739296[
{ time: 2020-10-09 10:00:00.000000000, value: 40.34983728430808 },
{ time: 2020-10-09 10:30:00.000000000, value: 29.77482942057512 }...
View 6 more row(s)
]
1588092325[
{ time: 2020-10-09 10:00:00.000000000, value: 46.790964648508535 },
{ time: 2020-10-09 10:30:00.000000000, value: 63.37907403508092 }...
View 5 more row(s)
]
1641688615[
{ time: 2020-10-09 10:00:00.000000000, value: 39.1011371926136 },
{ time: 2020-10-09 10:30:00.000000000, value: 59.68175770780711 }...
View 6 more row(s)
]
1682738967[
{ time: 2020-10-09 10:00:00.000000000, value: 16.966425157908184 },
{ time: 2020-10-09 10:30:00.000000000, value: 70.67966775160798 }...
View 6 more row(s)
]
1712492054[
{ time: 2020-10-09 10:00:00.000000000, value: 45.89098423361806 },
{ time: 2020-10-09 10:30:00.000000000, value: 12.212910765588772 }...
View 5 more row(s)
]
1836816173[
{ time: 2020-10-09 09:00:00.000000000, value: 28.667924923980287 },
{ time: 2020-10-09 09:30:00.000000000, value: 32.49950906732863 }...
View 8 more row(s)
]
199744055[
{ time: 2020-10-09 10:00:00.000000000, value: 45.46459879696737 },
{ time: 2020-10-09 10:30:00.000000000, value: 3.39022689038606 }...
View 4 more row(s)
]
2062792987[
{ time: 2020-10-09 10:00:00.000000000, value: 50.54843613050327 },
{ time: 2020-10-09 10:30:00.000000000, value: 21.4323793899003 }...
View 6 more row(s)
]
21135517[
{ time: 2020-10-09 10:00:00.000000000, value: 56.71598240453767 },
{ time: 2020-10-09 10:30:00.000000000, value: 50.6500341736146 }...
View 6 more row(s)
]