最近有个物联网项目,需要在SpringBoot里对influxdb进行整合,并且需要插入数据到influxdb中,基本的代码示例记录一下。
POM依赖
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.15</version>
</dependency>
Application.yml配置文件
spring:
influx:
url: http://192.168.10.59:8086
user:
password:
database: iotbigdata
InfluxDbConfig类
import com.tk.wisdombigdatamiddleware.utils.InfluxDbUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class InfluxDbConfig {
@Value("${spring.influx.url:''}")
private String influxDBUrl;
@Value("${spring.influx.user:''}")
private String userName="";
@Value("${spring.influx.password:''}")
private String password="";
@Value("${spring.influx.database:''}")
private String database;
@Bean
public InfluxDbUtils influxDbUtils() {
return new InfluxDbUtils(userName, password, influxDBUrl, database, "2_years_iot");
}
public String getDatabase() {
return database;
}
}
上面代码里2_years_iot是设定的一个influxdb的存储策略,如下,这里我设置的是数据存储两年,保存一个副本
CREATE RETENTION POLICY "2_years_iot" ON "iotbigdata" DURATION 17520h REPLICATION 1 DEFAULT
查看使用的存储策略,可以使用如下的语句查询:
SHOW RETENTION POLICIES ON "iotbigdata";
InfluxDbUtils类
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Query;
@Data
@Slf4j
public class InfluxDbUtils {
private String userName;
private String password;
private String url;
public String database;
private String retentionPolicy;
// InfluxDB实例
private InfluxDB influxDB;
// 数据保存策略
public static String policyNamePix = "2_years_iot";
public InfluxDbUtils(String userName, String password, String url, String database,
String retentionPolicy) {
this.userName = userName;
this.password = password;
this.url = url;
this.database = database;
this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;
this.influxDB = influxDbBuild();
}
/**
* 连接数据库 ,若不存在则创建
*
* @return influxDb实例
*/
private InfluxDB influxDbBuild() {
if (influxDB == null) {
if (StringChargeUtils.isObjectEmpty(userName) && StringChargeUtils.isObjectEmpty(password)) {
influxDB = InfluxDBFactory.connect(url);
} else {
influxDB = InfluxDBFactory.connect(url, userName, password);
}
}
try {
createDB(database);
influxDB.setDatabase(database);
} catch (Exception e) {
log.error("create influx db failed, error: {}", e.getMessage());
} finally {
influxDB.setRetentionPolicy(retentionPolicy);
}
influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
return influxDB;
}
/****
* 创建数据库
* @param database
*/
private void createDB(String database) {
influxDB.query(new Query("CREATE DATABASE " + database));
}
}
数据封装类
这里使用的influxdb提供的@Measurement注解封装的IotElectricity类,方便的是你可以在创建变量的时候指定是否为tag等等,代码如下:
import lombok.Builder;
import lombok.Data;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
@Builder
@Data
@Measurement(name = "iotelectricity")
public class IotElectricity {
// Column中的name为measurement中的列名
// 此外,需要注意InfluxDB中时间戳均是以UTC时保存,在保存以及提取过程中需要注意时区转换
@Column(name = "time")
private String time;
// 注解中添加tag = true,表示当前字段内容为tag内容
@Column(name = "type", tag = true)
private String type;
@Column(name = "brand", tag = true)
private String brand;
@Column(name = "deviceSn", tag = true)
private String deviceSn;
@Column(name = "param", tag = true)
private String param;
@Column(name = "value")
private String value;
}
BatchPoints批量插入数据调用方法
InfluxDbConfig influxDbConfig = new InfluxDbConfig();//new一个influxdb config
InfluxDbUtils influxDbUtils = influxDbConfig.influxDbUtils();
InfluxDB influxDB = influxDbUtils.getInfluxDB();//创建一个influxdb连接
BatchPoints batchPoints = BatchPoints.builder().build();//创建批量数据存储batch
IotElectricity iotElectricity =
IotElectricity.builder().brand("TM").deviceSn("890000002872").type("breaker").param("voltage").value("195").build();
Point point =
Point.measurementByPOJO(iotElectricity.getClass()).addFieldsFromPOJO(iotElectricity).time(System.currentTimeMillis(), TimeUnit.MILLISECONDS).build();
batchPoints.point(point);//将point放入batch中
iotElectricity =
IotElectricity.builder().brand("TM").deviceSn("890000002872").type("breaker").param("electric").value("45").build();
point =
Point.measurementByPOJO(iotElectricity.getClass()).addFieldsFromPOJO(iotElectricity).time(System.currentTimeMillis(), TimeUnit.MILLISECONDS).build();
batchPoints.point(point);
influxDB.write(batchPoints);//将这一批次数据一次写入influxdb
文章参考:
https://www.cnblogs.com/jason1990/archive/2019/06/24/11076310.html
https://blog.csdn.net/qq_33326449/article/details/87972168
» 订阅本站:https://www.kgraph.cn
» 转载请注明来源:九五青年博客 » 《SpringBoot整合influxdb使用BatchPoints实现批量插入数据功能》