caojianbin
8 months ago
7 changed files with 445 additions and 16 deletions
@ -0,0 +1,162 @@
@@ -0,0 +1,162 @@
|
||||
package com.ecell.internationalize.common.issue.mqtt; |
||||
|
||||
import com.ecell.internationalize.common.core.utils.StringUtils; |
||||
import com.ecell.internationalize.common.issue.callback.CustomMessageCallback; |
||||
import com.ecell.internationalize.common.issue.util.RandomUtil; |
||||
import org.eclipse.paho.client.mqttv3.*; |
||||
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** |
||||
* @author borui |
||||
*/ |
||||
public class AppSingleMqttClient { |
||||
private static final Logger logger = LoggerFactory.getLogger(AppSingleMqttClient.class); |
||||
private static MqttClient mqttClient=null; |
||||
//防止指令重排
|
||||
private volatile static AppSingleMqttClient appSingleMqttClient; |
||||
|
||||
//初始化链接
|
||||
private AppSingleMqttClient() { |
||||
MqttConnectOptions mqttConnectOptions = getMqttConnectOptions(); |
||||
//设置持久化方式
|
||||
MemoryPersistence memoryPersistence = new MemoryPersistence(); |
||||
//集群就随机
|
||||
String clientId ="ecell-internationalize"+ RandomUtil.generateVerCode(6)+System.currentTimeMillis(); |
||||
//单实例写死
|
||||
//String clientId ="yisai@@@777";
|
||||
if (null != clientId) { |
||||
try { |
||||
//tcp://120.77.209.176:1883
|
||||
mqttClient = new MqttClient("tcp://1.13.186.145:1883", clientId, memoryPersistence); |
||||
} catch (MqttException e) { |
||||
logger.info("创建链接失败:{}",e.getMessage()); |
||||
e.printStackTrace(); |
||||
} |
||||
} |
||||
//设置连接和回调
|
||||
if (null != mqttClient) { |
||||
if (!mqttClient.isConnected()) { |
||||
//创建回调函数对象
|
||||
// MQTTReceiveCallback MQTTReceiveCallback = new MQTTReceiveCallback();
|
||||
//客户端添加回调函数(默认打印)
|
||||
mqttClient.setCallback(new CustomMessageCallback()); |
||||
//创建连接
|
||||
try { |
||||
logger.info("开始建立链接"); |
||||
mqttClient.connect(mqttConnectOptions); |
||||
} catch (MqttException e) { |
||||
// destroy();
|
||||
// try {
|
||||
// Thread.sleep(100);
|
||||
// reconnection();
|
||||
// } catch (InterruptedException interruptedException) {
|
||||
// interruptedException.printStackTrace();
|
||||
// }
|
||||
// logger.info("创建链接失败:",e.getMessage());
|
||||
// e.printStackTrace();
|
||||
} |
||||
|
||||
} |
||||
} |
||||
|
||||
} |
||||
|
||||
/** |
||||
* 设置参数 |
||||
*/ |
||||
private static MqttConnectOptions getMqttConnectOptions(){ |
||||
//初始化连接设置对象
|
||||
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); |
||||
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
|
||||
//这里设置为true表示每次连接到服务器都以新的身份连接
|
||||
mqttConnectOptions.setCleanSession(true); |
||||
//设置连接超时时间,单位是秒
|
||||
mqttConnectOptions.setConnectionTimeout(10); |
||||
mqttConnectOptions.setUserName("admin"); |
||||
mqttConnectOptions.setPassword("public".toCharArray()); |
||||
//保留回话
|
||||
// mqttConnectOptions.setCleanSession(false);
|
||||
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
|
||||
// mqttConnectOptions.setKeepAliveInterval(60);
|
||||
//自动重连
|
||||
// mqttConnectOptions.setAutomaticReconnect(true);
|
||||
//建立链接
|
||||
//setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 //遗嘱
|
||||
// mqttConnectOptions.setWill(mqttConfig.getSubTopic(), "客户端已经下线".getBytes(), 2, true);
|
||||
return mqttConnectOptions; |
||||
} |
||||
|
||||
public static AppSingleMqttClient getAppSingleMqttClient(){ |
||||
logger.info("开始建立链接:{}",appSingleMqttClient); |
||||
if (null==appSingleMqttClient){ |
||||
synchronized (AppSingleMqttClient.class){ |
||||
if (null==appSingleMqttClient){ |
||||
appSingleMqttClient= new AppSingleMqttClient(); |
||||
} |
||||
} |
||||
} |
||||
return appSingleMqttClient; |
||||
} |
||||
|
||||
public static void destroy(){ |
||||
logger.info("释放链接start:{}",mqttClient); |
||||
if (null!=mqttClient){ |
||||
try { |
||||
mqttClient.disconnect(); |
||||
mqttClient.close(); |
||||
}catch (MqttException e){ |
||||
logger.info("释放资源失败:{}",e.getMessage()); |
||||
} |
||||
} |
||||
} |
||||
|
||||
public static void reconnection(){ |
||||
logger.info("重新链接方法被调用:{},{}",appSingleMqttClient,mqttClient,mqttClient.isConnected()); |
||||
if (null==appSingleMqttClient){ |
||||
logger.info("重新链接"); |
||||
AppSingleMqttClient.getAppSingleMqttClient(); |
||||
}else { |
||||
if (null !=mqttClient){ |
||||
try { |
||||
MqttConnectOptions mqttConnectOptions = getMqttConnectOptions(); |
||||
logger.info("进行重新链接:{}",mqttConnectOptions); |
||||
mqttClient.connect(mqttConnectOptions); |
||||
} catch (MqttException e) { |
||||
e.printStackTrace(); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
/**自定义发布消息*/ |
||||
public static void publishCustomMessage(int qos,String message,String pubTopic) { |
||||
logger.info("开始发送的信息是:{}",message); |
||||
if(StringUtils.isNotNull(mqttClient) && mqttClient.isConnected()) { |
||||
logger.info("发布自定义信息客户端状态:{},id:{},topic:{}",mqttClient.isConnected(),mqttClient.getClientId(),pubTopic); |
||||
MqttMessage mqttMessage = new MqttMessage(); |
||||
mqttMessage.setQos(qos); |
||||
mqttMessage.setPayload(message.getBytes()); |
||||
MqttTopic topic = mqttClient.getTopic(pubTopic); |
||||
if(null != topic) { |
||||
try { |
||||
logger.info("发送的信息是:{}",message); |
||||
MqttDeliveryToken publish = topic.publish(mqttMessage); |
||||
if(!publish.isComplete()) { |
||||
System.out.println("消息发布成功"); |
||||
} |
||||
} catch (MqttException exception) { |
||||
logger.info("发送的信息失败:{}",exception.getMessage()); |
||||
} |
||||
|
||||
} |
||||
|
||||
|
||||
}else { |
||||
// AppSingleMqttClient.getAppSingleMqttClient();
|
||||
//这里可以做重新链接
|
||||
logger.info("客户端为null{}",mqttClient); |
||||
reconnection(); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,18 @@
@@ -0,0 +1,18 @@
|
||||
package com.ecell.internationalize.system.annotation; |
||||
|
||||
|
||||
import java.lang.annotation.ElementType; |
||||
import java.lang.annotation.Retention; |
||||
import java.lang.annotation.RetentionPolicy; |
||||
import java.lang.annotation.Target; |
||||
|
||||
/** |
||||
* @author borui |
||||
*/ |
||||
@Retention(RetentionPolicy.RUNTIME) |
||||
@Target(ElementType.METHOD) |
||||
public @interface ApiRateLimiter { |
||||
String key() default "rate_limit:"; |
||||
int time() default 60; |
||||
int count() default 100; |
||||
} |
@ -0,0 +1,20 @@
@@ -0,0 +1,20 @@
|
||||
package com.ecell.internationalize.system.config; |
||||
|
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.core.io.ClassPathResource; |
||||
import org.springframework.data.redis.core.script.DefaultRedisScript; |
||||
import org.springframework.scripting.support.ResourceScriptSource; |
||||
|
||||
/** |
||||
* @author borui |
||||
*/ |
||||
//@Configuration
|
||||
public class ApiRedisConfig { |
||||
@Bean |
||||
public DefaultRedisScript<Long> limitScript(){ |
||||
DefaultRedisScript<Long> script = new DefaultRedisScript<>(); |
||||
script.setResultType(Long.class); |
||||
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("111.lua"))); |
||||
return script; |
||||
} |
||||
} |
@ -0,0 +1,36 @@
@@ -0,0 +1,36 @@
|
||||
package com.ecell.internationalize.system.entity.api; |
||||
|
||||
import com.fasterxml.jackson.annotation.JsonFormat; |
||||
import io.swagger.annotations.ApiModelProperty; |
||||
import lombok.Data; |
||||
import lombok.NoArgsConstructor; |
||||
|
||||
import java.io.Serializable; |
||||
import java.util.Date; |
||||
|
||||
/** |
||||
* @author borui |
||||
*/ |
||||
@Data |
||||
@NoArgsConstructor |
||||
public class ApiTrackInfo implements Serializable { |
||||
private static final long serialVersionUID=1L; |
||||
@ApiModelProperty(value = "经度") |
||||
private String longitude; |
||||
|
||||
@ApiModelProperty(value = "纬度") |
||||
private String latitude; |
||||
|
||||
@ApiModelProperty(value = "(定位类型 LBS, GPS,WIFI)") |
||||
private String locationType; |
||||
|
||||
@ApiModelProperty(value = "详细地址") |
||||
private String addr; |
||||
|
||||
@ApiModelProperty(value = "更新日期") |
||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") |
||||
private Date updateTime; |
||||
|
||||
|
||||
|
||||
} |
@ -0,0 +1,21 @@
@@ -0,0 +1,21 @@
|
||||
package com.ecell.internationalize.system.service.api.impl; |
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
||||
import com.ecell.internationalize.system.entity.api.ApiOutward; |
||||
import com.ecell.internationalize.system.mapper.api.ApiOutwardMapper; |
||||
import com.ecell.internationalize.system.service.api.ApiOutwardService; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
/** |
||||
* <p> |
||||
* |
||||
* API对外开放接口表 服务实现类 |
||||
* </p> |
||||
* |
||||
* @author ${author} |
||||
* @since 2023-03-07 |
||||
*/ |
||||
@Service |
||||
public class ApiOutwardServiceImpl extends ServiceImpl<ApiOutwardMapper, ApiOutward> implements ApiOutwardService { |
||||
|
||||
} |
@ -0,0 +1,90 @@
@@ -0,0 +1,90 @@
|
||||
spring: |
||||
servlet: |
||||
multipart: |
||||
max-file-size: 1024MB |
||||
max-request-size: 1024MB |
||||
datasource: |
||||
driver-class-name: com.mysql.cj.jdbc.Driver |
||||
url: jdbc:mysql://192.168.0.113:3306/ys-business?serverTimeZone=UTC |
||||
username: root |
||||
password: 123456 |
||||
#数据源的其他配置 |
||||
druid: |
||||
initial-size: 5 |
||||
min-idle: 5 |
||||
max-active: 20 |
||||
max-wait: 60000 |
||||
time-between-eviction-runs-millis: 60000 |
||||
min-evictable-idle-time-millis: 300000 |
||||
validation-query: SELECT 1 FROM DUAL |
||||
testWhileIdle: true |
||||
testOnBorrow: false |
||||
testOnReturn: false |
||||
poolPreparedStatements: true |
||||
maxPoolPreparedStatementPerConnectionSize: 20 |
||||
# redis: |
||||
# host: localhost |
||||
# port: 6379 |
||||
redis: |
||||
host: 120.77.209.176 |
||||
port: 6379 |
||||
database: 2 |
||||
password: Ecell...20201001 |
||||
kafka: |
||||
bootstrap-servers: 120.77.209.176:8092 #172.18.238.14:9095,172.18.238.13:9095,172.18.238.11:9095 # |
||||
producer: |
||||
acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) |
||||
retries: 1 # 重试次数 |
||||
batch-size: 1024 # 批量大小 |
||||
properties: |
||||
linger: |
||||
ms: 0 |
||||
buffer-memory: 22334455 # 生产端缓冲区大小 |
||||
key-serializer: org.apache.kafka.common.serialization.StringSerializer # Kafka提供的序列化和反序列化类(可以不用配置默认就是) |
||||
value-serializer: org.apache.kafka.common.serialization.StringSerializer # Kafka提供的序列化和反序列化类(可以不用配置默认就是) |
||||
|
||||
# 配置国际化资源文件路径 |
||||
messages: |
||||
basename: i18n/messages |
||||
encoding: UTF-8 |
||||
#设置静态资源路径,多个以逗号分隔 |
||||
web: |
||||
resources: |
||||
static-locations: classpath:static/ |
||||
|
||||
mybatis: |
||||
#配置SQL映射文件路径 |
||||
mapper-locations: classpath:mapper/*.xml |
||||
# 搜索指定包别名 |
||||
typeAliasesPackage: com.ecell.internationalize.system |
||||
#驼峰命名 |
||||
configuration: |
||||
map-underscore-to-camel-case: true |
||||
# feign 配置 |
||||
feign: |
||||
sentinel: |
||||
enabled: true |
||||
okhttp: |
||||
enabled: true |
||||
httpclient: |
||||
enabled: false |
||||
client: |
||||
config: |
||||
default: |
||||
connectTimeout: 10000 |
||||
readTimeout: 10000 |
||||
compression: |
||||
request: |
||||
enabled: true |
||||
response: |
||||
enabled: true |
||||
# 暴露监控端点 |
||||
management: |
||||
endpoints: |
||||
web: |
||||
exposure: |
||||
include: '*' |
||||
logging: |
||||
level: |
||||
com.ecell.internationalize.system.mapper: debug |
||||
|
Loading…
Reference in new issue