위에서는 influxDB의 특징과 설치방법에 대해 알아보았습니다.
이번에는 설정에 대해 알아보겠습니다.
influxDB의 설정 종류
설정 용어
duration : 데이터를 보관할 기간을 지정합니다. 이 기간이 지나면 데이터가 삭제됩니다.
replication : 데이터를 복제할 샤드의 개수를 지정합니다.
shard_duration : 데이터를 샤드로 나누는 기간을 지정합니다. 이 기간이 지나면 새로운 샤드가 생성됩니다.
measurement : RDBMS의 테이블과 같은 개념(지속적으로 측정되는 데이터 단위로 담은 것)
Tag : RDBMS의 컬럼과 같은 개념(Tag Key를 이용해서 인덱싱에 사용합니다.)
모든 데이터는 반드시 1개의 값을 가져야합니다. 이러한 값들의 모임을 field라고 합니다. select 쿼리의 결과에서 나오는 value를 field key라고 합니다. 그리고 이에 해당하는 실제 값을 field value라고 부릅니다. 여기서 주의할 점은 한번 필드 값이 설정이 되면 float, int등으로 고정되기 때문에 float 값을 사용하는 필드 값에 대해서 문자열을 사용할 수 없습니다.
그래서 저는 한 값에대해 double과 string이 다 써야된다면 value를 2개로 두고 사용했습니다.
Spring Boot 에서 설정 방법
# application.yaml
spring:
influxdb:
url: http://10.0.1.45:8086
/*
밑에 3가지는 저의경우 secret yaml파일을 만들어서 따로 저장했었습니다.
username: 유저네임 # influxDB 처음 로그인 아이디
password: 비밀번호
token: token # 토큰 생성번호 안되면 재생성해서 넣으셔도 됩니다.
*/
# retention-policy: autogen # 데이터를 얼마나 오래 보관할지
# consistency: ONE # 데이터를 쓰거나 읽을 때 몇개의 노드에서 확인할지
# enable-gzip: true # 데이터 전송을 압축할 것인지
# enable-batching: true # 데이터 전송을 일괄 처리할 것인지
# batch-size: 1000 # 한번에 처리할 개수
# flush-duration: 1000ms # 일괄 처리할 시간 간격
package com.example.data.config;
import com.influxdb.client.*;
import com.influxdb.client.write.events.BackpressureEvent;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PreDestroy;
import java.util.concurrent.TimeUnit;
@Configuration
public class InfluxDBConfig {
private WriteApi writeApi;
@Value("${spring.influxdb.url}")
private String url;
@Value("${spring.influxdb.username}")
private String username;
@Value("${spring.influxdb.password}")
private String password;
@Value("${spring.influxdb.token}")
private String token;
@Bean
public InfluxDBClient influxDBClient() {
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder()
.connectTimeout(40, TimeUnit.SECONDS)
.readTimeout(40, TimeUnit.SECONDS)
.writeTimeout(40, TimeUnit.SECONDS)
.connectionPool(new ConnectionPool(10, 5, TimeUnit.MINUTES)); // Connection Pool 설정
InfluxDBClientOptions options = InfluxDBClientOptions.builder()
.url(url)
.authenticateToken(token.toCharArray())
.okHttpClient(okHttpClientBuilder)
.build();
return InfluxDBClientFactory.create(options);
}
@Bean
public WriteApi writeApi(InfluxDBClient influxDBClient) {
WriteOptions options = WriteOptions.builder()
// .batchSize(1_000) // 한번에 보내는 데이터 량 (기본 1000)
// .bufferLimit(100_000)
.flushInterval(1)
.build();
WriteApi writeApi = influxDBClient.makeWriteApi(options);
writeApi.listenEvents(BackpressureEvent.class, event -> {
// BackpressureEvent 처리 로직
});
return writeApi;
}
@PreDestroy
public void onShutdown() {
if(writeApi != null) {
writeApi.close();
}
}
}
@Bean은 버퍼의 크기를 조정하기 위해 사용한 것이고 없어도 무관합니다.
@PreDestory는 끝난경우 close 작업을 통해 api관리를 해주려고 했는데 없어도 무관합니다.
# 저장로직 Point를 만들어서 해당 값을 add명령처리하면 저장됩니다.
# 추가로 구현된 것을 밑에 batch size를 만들어서 그 값만큼 모였을 때 저장하는 로직을 넣어두었습니다.
protected void addTSData(String server, String type, String value, String time, int batchSize) {
try {
float fieldValue = Float.parseFloat(value);
String dataType = type.replaceAll("[0-9]", "");
Point row = Point
.measurement(server)
.addTag("name", type)
.addTag("generate_time", time)
.addTag("big_name",dataType)
.addField("value", fieldValue)
.time(Instant.now(), WritePrecision.NS);
points.add(row);
if (points.size() >= batchSize) {
writeApi.writePoints("week", "semse", new ArrayList<>(points));
points.clear();
log.info("Saved server for data: {}", server);
}
} catch (NumberFormatException e) {
log.error("Failed to parse value {} as a Long. Exception message: {}", value, e.getMessage());
writeApi.close();
sseService.sendError("[" + time + "]" + server + "> " + type + "> 데이터가 저장되고 있지 않습니다.");
// 예외 처리 로직 추가
} catch (Exception e) {
log.error("Unexpected error occurred while adding TS data. Exception message: {}", e.getMessage());
writeApi.close();
sseService.sendError("[" + time + "]" + server + "> " + type + "> 데이터가 저장되고 있지 않습니다.");
// 예외 처리 로직 추가
}
}
쿼리 예제
2.7에서 사용하는 쿼리들을 flux문으로 구성되어 있습니다.
제 프로젝트를 기준으로 쿼리를 보여드립니다.
// Start by selecting data from the schema browser or typing flux here
from(bucket: "CLIENT1")
|> range(start:-3h,stop:now())
|> filter(fn: (r) => r._measurement == "MOTOR")
|> group(columns:["max_value"])
|> aggregateWindow(every:1m, fn:min)
|> map(fn: (r) => ({value:r._value,time:r.generate_time,name:r.name}))
|> limit(n:10)
- from(bucket: "CLIENT1"): "CLIENT1"이라는 버킷(데이터 저장 공간)에서 데이터를 가져옵니다.
- range(start:-3h, stop:now()): 최근 3시간 동안의 데이터를 선택합니다. now()는 현재 시간을 의미합니다.
- filter(fn: (r) => r._measurement == "MOTOR"): 필터링 함수를 사용하여 "_measurement" 필드의 값이 "MOTOR"인 레코드만 선택합니다.
- group(columns:["max_value"]): "max_value" 열을 기준으로 데이터를 그룹화합니다.
- aggregateWindow(every:1m, fn:min): 각 1분 동안의 데이터를 그룹화하고, 그 그룹 내에서 최소값을 계산합니다.
- map(fn: (r) => ({value:r._value,time:r.generate_time,name:r.name})): 각 레코드를 새로운 형태로 변환합니다. 새 레코드는 원래 레코드의 _value, generate_time, name 필드를 value, time, name 필드로 갖습니다.
- limit(n:10): 결과 레코드 중 처음 10개만 선택합니다.
시간 단위로 하는 모니터링과 같은 서비스일 경우 편하게 사용할 수 있고 데이터 관리를 잘할수 있다는 장점을 가졌습니다.
다만 데이터를 초당 200만개 정도 저장하려면 생각보다 많은 메모리 사용으로 메모리를 적게 사용하는 방법을 고민했었는데요.
제가 한 방법으로는
1. 버퍼를 만들어서 데이터 저장횟수를 줄이는 방법
2. bucket을 여러개 만들어서 따로 저장하는 방법(이 방법은 한개의 bucket으로 관리하게 되었습니다.-> 비동기의 경우 bucket이 여러개를 저장할 때, 많은 시간이 걸리는 것같습니다.)
3. duration을 조정해서 저장되는 묶음을 여러개 하는것
4. influxDB를 replica를 만들어서 여러개의 서버로 관리하는 방법(이건 2023.3월 기준으로 없어진다고 봤었는데 추가 확인이 필요합니다)
5. 쿼리를 최적화하는 것(찾는 시간이나 인덱싱을 작게 만들기)
이 정도였습니다. 더 좋은 아이디어가 있었다면 댓글로 달아주세요..ㅠㅠ