"...git@interne.hydatis.fr:Yassmine.Mestiri/thingsboard.git" did not exist on "98ad3b984fc891b3ced4e4187370c4dc58d3df38"
Commit 98ad3b98 authored by Yassmine Mestiri's avatar Yassmine Mestiri
Browse files

iot

parent aea55d6d
Pipeline #2009 failed with stages
in 0 seconds
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors;
import lombok.RequiredArgsConstructor;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import java.util.function.Predicate;
@RequiredArgsConstructor
public class TbEntityTypeActorIdPredicate implements Predicate<TbActorId> {
private final EntityType entityType;
@Override
public boolean test(TbActorId actorId) {
return actorId instanceof TbEntityActorId && testEntityId(((TbEntityActorId) actorId).getEntityId());
}
protected boolean testEntityId(EntityId entityId) {
return entityId.getEntityType().equals(entityType);
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.app;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
import org.thingsboard.server.actors.TbActorId;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.device.SessionTimeoutCheckMsg;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.tenant.TenantActor;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import java.util.HashSet;
import java.util.Set;
@Slf4j
public class AppActor extends ContextAwareActor {
private final TenantService tenantService;
private final Set<TenantId> deletedTenants;
private volatile boolean ruleChainsInitialized;
private AppActor(ActorSystemContext systemContext) {
super(systemContext);
this.tenantService = systemContext.getTenantService();
this.deletedTenants = new HashSet<>();
}
@Override
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
if (systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE)) {
systemContext.schedulePeriodicMsgWithDelay(ctx, SessionTimeoutCheckMsg.instance(),
systemContext.getSessionReportTimeout(), systemContext.getSessionReportTimeout());
}
}
@Override
protected boolean doProcess(TbActorMsg msg) {
if (!ruleChainsInitialized) {
initTenantActors();
ruleChainsInitialized = true;
if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) {
log.warn("Rule Chains initialized by unexpected message: {}", msg);
}
}
switch (msg.getMsgType()) {
case APP_INIT_MSG:
break;
case PARTITION_CHANGE_MSG:
ctx.broadcastToChildren(msg);
break;
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case QUEUE_TO_RULE_ENGINE_MSG:
onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
break;
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg, false);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg, true);
break;
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
onToEdgeSessionMsg((EdgeSessionMsg) msg);
break;
case SESSION_TIMEOUT_MSG:
ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
break;
default:
return false;
}
return true;
}
private void initTenantActors() {
log.info("Starting main system actor.");
try {
if (systemContext.isTenantComponentsInitEnabled()) {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
for (Tenant tenant : tenantIterator) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
log.debug("[{}] Tenant actor created.", tenant.getId());
}
}
log.info("Main system actor started.");
} catch (Exception e) {
log.warn("Unknown failure", e);
}
}
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
} else {
if (!deletedTenants.contains(msg.getTenantId())) {
getOrCreateTenantActor(msg.getTenantId()).tell(msg);
} else {
msg.getMsg().getCallback().onSuccess();
}
}
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
TbActorRef target = null;
if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
if (!EntityType.TENANT_PROFILE.equals(msg.getEntityId().getEntityType())) {
log.warn("Message has system tenant id: {}", msg);
}
} else {
if (EntityType.TENANT.equals(msg.getEntityId().getEntityType())) {
TenantId tenantId = TenantId.fromUUID(msg.getEntityId().getId());
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg);
deletedTenants.add(tenantId);
ctx.stop(new TbEntityActorId(tenantId));
} else {
target = getOrCreateTenantActor(msg.getTenantId());
}
} else {
target = getOrCreateTenantActor(msg.getTenantId());
}
}
if (target != null) {
target.tellWithHighPriority(msg);
} else {
log.debug("[{}] Invalid component lifecycle msg: {}", msg.getTenantId(), msg);
}
}
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
if (!deletedTenants.contains(msg.getTenantId())) {
TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId());
if (priority) {
tenantActor.tellWithHighPriority(msg);
} else {
tenantActor.tell(msg);
}
} else {
if (msg instanceof TransportToDeviceActorMsgWrapper) {
((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
}
}
}
private TbActorRef getOrCreateTenantActor(TenantId tenantId) {
return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),
() -> DefaultActorService.TENANT_DISPATCHER_NAME,
() -> new TenantActor.ActorCreator(systemContext, tenantId));
}
private void onToEdgeSessionMsg(EdgeSessionMsg msg) {
TbActorRef target = null;
if (ModelConstants.SYSTEM_TENANT.equals(msg.getTenantId())) {
log.warn("Message has system tenant id: {}", msg);
} else {
target = getOrCreateTenantActor(msg.getTenantId());
}
if (target != null) {
target.tellWithHighPriority(msg);
} else {
log.debug("[{}] Invalid edge session msg: {}", msg.getTenantId(), msg);
}
}
public static class ActorCreator extends ContextBasedCreator {
public ActorCreator(ActorSystemContext context) {
super(context);
}
@Override
public TbActorId createActorId() {
return new TbEntityActorId(TenantId.SYS_TENANT_ID);
}
@Override
public TbActor createActor() {
return new AppActor(context);
}
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.app;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
public class AppInitMsg implements TbActorMsg {
@Override
public MsgType getMsgType() {
return MsgType.APP_INIT_MSG;
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.device;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg;
import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorException;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
import org.thingsboard.server.service.rpc.RemoveRpcActorMsg;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
@Slf4j
public class DeviceActor extends ContextAwareActor {
private final DeviceActorMessageProcessor processor;
DeviceActor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
super(systemContext);
this.processor = new DeviceActorMessageProcessor(systemContext, tenantId, deviceId);
}
@Override
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
log.debug("[{}][{}] Starting device actor.", processor.tenantId, processor.deviceId);
try {
processor.init(ctx);
log.debug("[{}][{}] Device actor started.", processor.tenantId, processor.deviceId);
} catch (Exception e) {
log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.deviceId, e);
throw new TbActorException("Failed to initialize device actor", e);
}
}
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
processor.process(ctx, (TransportToDeviceActorMsgWrapper) msg);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processAttributesUpdate(ctx, (DeviceAttributesEventNotificationMsg) msg);
break;
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processCredentialsUpdate(msg);
break;
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processNameOrTypeUpdate((DeviceNameOrTypeUpdateMsg) msg);
break;
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
processor.processRpcRequest(ctx, (ToDeviceRpcRequestActorMsg) msg);
break;
case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
processor.processRpcResponsesFromEdge(ctx, (FromDeviceRpcResponseActorMsg) msg);
break;
case DEVICE_ACTOR_SERVER_SIDE_RPC_TIMEOUT_MSG:
processor.processServerSideRpcTimeout(ctx, (DeviceActorServerSideRpcTimeoutMsg) msg);
break;
case SESSION_TIMEOUT_MSG:
processor.checkSessionsTimeout();
break;
case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
processor.processEdgeUpdate((DeviceEdgeUpdateMsg) msg);
break;
case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
processor.processRemoveRpc(ctx, (RemoveRpcActorMsg) msg);
break;
default:
return false;
}
return true;
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.device;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorId;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
public class DeviceActorCreator extends ContextBasedCreator {
private final TenantId tenantId;
private final DeviceId deviceId;
public DeviceActorCreator(ActorSystemContext context, TenantId tenantId, DeviceId deviceId) {
super(context);
this.tenantId = tenantId;
this.deviceId = deviceId;
}
@Override
public TbActorId createActorId() {
return new TbEntityActorId(deviceId);
}
@Override
public TbActor createActor() {
return new DeviceActor(context, tenantId, deviceId);
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.device;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.LinkedHashMapRemoveEldest;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg;
import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg;
import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceSessionsCacheEntry;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.SessionSubscriptionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseStatusMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportUpdateCredentialsProto;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
import org.thingsboard.server.service.rpc.RemoveRpcActorMsg;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* @author Andrew Shvayka
*/
@Slf4j
class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
static final String SESSION_TIMEOUT_MESSAGE = "session timeout!";
final TenantId tenantId;
final DeviceId deviceId;
final LinkedHashMapRemoveEldest<UUID, SessionInfoMetaData> sessions;
private final Map<UUID, SessionInfo> attributeSubscriptions;
private final Map<UUID, SessionInfo> rpcSubscriptions;
private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
private final boolean rpcSequential;
private int rpcSeq = 0;
private String deviceName;
private String deviceType;
private TbMsgMetaData defaultMetaData;
private EdgeId edgeId;
DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
super(systemContext);
this.tenantId = tenantId;
this.deviceId = deviceId;
this.rpcSequential = systemContext.isRpcSequential();
this.attributeSubscriptions = new HashMap<>();
this.rpcSubscriptions = new HashMap<>();
this.toDeviceRpcPendingMap = new LinkedHashMap<>();
this.sessions = new LinkedHashMapRemoveEldest<>(systemContext.getMaxConcurrentSessionsPerDevice(), this::notifyTransportAboutClosedSessionMaxSessionsLimit);
if (initAttributes()) {
restoreSessions();
}
}
boolean initAttributes() {
Device device = systemContext.getDeviceService().findDeviceById(tenantId, deviceId);
if (device != null) {
this.deviceName = device.getName();
this.deviceType = device.getType();
this.defaultMetaData = new TbMsgMetaData();
this.defaultMetaData.putValue("deviceName", deviceName);
this.defaultMetaData.putValue("deviceType", deviceType);
if (systemContext.isEdgesEnabled()) {
this.edgeId = findRelatedEdgeId();
}
return true;
} else {
return false;
}
}
private EdgeId findRelatedEdgeId() {
List<EntityRelation> result =
systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON);
if (result != null && result.size() > 0) {
EntityRelation relationToEdge = result.get(0);
if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) {
log.trace("[{}][{}] found edge [{}] for device", tenantId, deviceId, relationToEdge.getFrom().getId());
return new EdgeId(relationToEdge.getFrom().getId());
} else {
log.trace("[{}][{}] edge relation is empty {}", tenantId, deviceId, relationToEdge);
}
} else {
log.trace("[{}][{}] device doesn't have any related edge", tenantId, deviceId);
}
return null;
}
void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) {
ToDeviceRpcRequest request = msg.getMsg();
ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request);
long timeout = request.getExpirationTime() - System.currentTimeMillis();
boolean persisted = request.isPersisted();
if (timeout <= 0) {
log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
if (persisted) {
createRpc(request, RpcStatus.EXPIRED);
}
return;
} else if (persisted) {
createRpc(request, RpcStatus.QUEUED);
}
boolean sent = false;
if (systemContext.isEdgesEnabled() && edgeId != null) {
log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId());
try {
saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId()).get();
sent = true;
} catch (InterruptedException | ExecutionException e) {
log.error("[{}][{}][{}] Failed to save rpc request to edge queue {}", tenantId, deviceId, edgeId.getId(), request, e);
}
} else if (isSendNewRpcAvailable()) {
sent = rpcSubscriptions.size() > 0;
Set<UUID> syncSessionSet = new HashSet<>();
rpcSubscriptions.forEach((key, value) -> {
sendToTransport(rpcRequest, key, value.getNodeId());
if (SessionType.SYNC == value.getType()) {
syncSessionSet.add(key);
}
});
log.trace("Rpc syncSessionSet [{}] subscription after sent [{}]", syncSessionSet, rpcSubscriptions);
syncSessionSet.forEach(rpcSubscriptions::remove);
}
if (persisted) {
ObjectNode response = JacksonUtil.newObjectNode();
response.put("rpcId", request.getId().toString());
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null));
}
if (!persisted && request.isOneway() && sent) {
log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
} else {
registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
}
if (sent) {
log.debug("[{}] RPC request {} is sent!", deviceId, request.getId());
} else {
log.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
}
}
private boolean isSendNewRpcAvailable() {
return !rpcSequential || toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty();
}
private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
Rpc rpc = new Rpc(new RpcId(request.getId()));
rpc.setCreatedTime(System.currentTimeMillis());
rpc.setTenantId(tenantId);
rpc.setDeviceId(deviceId);
rpc.setExpirationTime(request.getExpirationTime());
rpc.setRequest(JacksonUtil.valueToTree(request));
rpc.setStatus(status);
rpc.setAdditionalInfo(JacksonUtil.toJsonNode(request.getAdditionalInfo()));
return systemContext.getTbRpcService().save(tenantId, rpc);
}
private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) {
ToDeviceRpcRequestBody body = request.getBody();
return ToDeviceRpcRequestMsg.newBuilder()
.setRequestId(rpcSeq++)
.setMethodName(body.getMethod())
.setParams(body.getParams())
.setExpirationTime(request.getExpirationTime())
.setRequestIdMSB(request.getId().getMostSignificantBits())
.setRequestIdLSB(request.getId().getLeastSignificantBits())
.setOneway(request.isOneway())
.setPersisted(request.isPersisted())
.build();
}
void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) {
log.debug("[{}] Processing rpc command response from edge session", deviceId);
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
boolean success = requestMd != null;
if (success) {
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(responseMsg.getMsg());
} else {
log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
}
}
void processRemoveRpc(TbActorCtx context, RemoveRpcActorMsg msg) {
log.debug("[{}] Processing remove rpc command", msg.getRequestId());
Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry = null;
for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> e : toDeviceRpcPendingMap.entrySet()) {
if (e.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) {
entry = e;
break;
}
}
if (entry != null) {
if (entry.getValue().isDelivered()) {
toDeviceRpcPendingMap.remove(entry.getKey());
} else {
Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> firstRpc = getFirstRpc();
if (firstRpc.isPresent() && entry.getKey().equals(firstRpc.get().getKey())) {
toDeviceRpcPendingMap.remove(entry.getKey());
sendNextPendingRequest(context);
} else {
toDeviceRpcPendingMap.remove(entry.getKey());
}
}
}
}
private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
scheduleMsgWithDelay(context, timeoutMsg, timeoutMsg.getTimeout());
}
void processServerSideRpcTimeout(TbActorCtx context, DeviceActorServerSideRpcTimeoutMsg msg) {
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
if (requestMd != null) {
log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
if (requestMd.getMsg().getMsg().isPersisted()) {
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.EXPIRED, null);
}
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
if (!requestMd.isDelivered()) {
sendNextPendingRequest(context);
}
}
}
private void sendPendingRequests(TbActorCtx context, UUID sessionId, String nodeId) {
SessionType sessionType = getSessionType(sessionId);
if (!toDeviceRpcPendingMap.isEmpty()) {
log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
if (sessionType == SessionType.SYNC) {
log.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
}
} else {
log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
}
Set<Integer> sentOneWayIds = new HashSet<>();
if (rpcSequential) {
getFirstRpc().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
} else if (sessionType == SessionType.ASYNC) {
toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
} else {
toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
}
sentOneWayIds.stream().filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted()).forEach(toDeviceRpcPendingMap::remove);
}
private Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> getFirstRpc() {
return toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst();
}
private void sendNextPendingRequest(TbActorCtx context) {
if (rpcSequential) {
rpcSubscriptions.forEach((id, s) -> sendPendingRequests(context, id, s.getNodeId()));
}
}
private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
return entry -> {
ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
ToDeviceRpcRequestBody body = request.getBody();
if (request.isOneway() && !rpcSequential) {
sentOneWayIds.add(entry.getKey());
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null));
}
ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder()
.setRequestId(entry.getKey())
.setMethodName(body.getMethod())
.setParams(body.getParams())
.setExpirationTime(request.getExpirationTime())
.setRequestIdMSB(request.getId().getMostSignificantBits())
.setRequestIdLSB(request.getId().getLeastSignificantBits())
.setOneway(request.isOneway())
.setPersisted(request.isPersisted())
.build();
sendToTransport(rpcRequest, sessionId, nodeId);
};
}
void process(TbActorCtx context, TransportToDeviceActorMsgWrapper wrapper) {
TransportToDeviceActorMsg msg = wrapper.getMsg();
TbCallback callback = wrapper.getCallback();
var sessionInfo = msg.getSessionInfo();
if (msg.hasSessionEvent()) {
processSessionStateMsgs(sessionInfo, msg.getSessionEvent());
}
if (msg.hasSubscribeToAttributes()) {
processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToAttributes());
}
if (msg.hasSubscribeToRPC()) {
processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC());
}
if (msg.hasSendPendingRPC()) {
sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo.getNodeId());
}
if (msg.hasGetAttributes()) {
handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes());
}
if (msg.hasToDeviceRPCCallResponse()) {
processRpcResponses(context, sessionInfo, msg.getToDeviceRPCCallResponse());
}
if (msg.hasSubscriptionInfo()) {
handleSessionActivity(context, sessionInfo, msg.getSubscriptionInfo());
}
if (msg.hasClaimDevice()) {
handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice());
}
if (msg.hasRpcResponseStatusMsg()) {
processRpcResponseStatus(context, sessionInfo, msg.getRpcResponseStatusMsg());
}
if (msg.hasUplinkNotificationMsg()) {
processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg());
}
callback.onSuccess();
}
private void processUplinkNotificationMsg(TbActorCtx context, SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg uplinkNotificationMsg) {
String nodeId = sessionInfo.getNodeId();
sessions.entrySet().stream()
.filter(kv -> kv.getValue().getSessionInfo().getNodeId().equals(nodeId) && (kv.getValue().isSubscribedToAttributes() || kv.getValue().isSubscribedToRPC()))
.forEach(kv -> {
ToTransportMsg msg = ToTransportMsg.newBuilder()
.setSessionIdMSB(kv.getKey().getMostSignificantBits())
.setSessionIdLSB(kv.getKey().getLeastSignificantBits())
.setUplinkNotificationMsg(uplinkNotificationMsg)
.build();
systemContext.getTbCoreToTransportService().process(kv.getValue().getSessionInfo().getNodeId(), msg);
});
}
private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, ClaimDeviceMsg msg) {
DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB()));
systemContext.getClaimDevicesService().registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs());
}
private void reportSessionOpen() {
systemContext.getDeviceStateService().onDeviceConnect(tenantId, deviceId);
}
private void reportSessionClose() {
systemContext.getDeviceStateService().onDeviceDisconnect(tenantId, deviceId);
}
private void handleGetAttributesRequest(TbActorCtx context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
int requestId = request.getRequestId();
if (request.getOnlyShared()) {
Futures.addCallback(findAllAttributesByScope(DataConstants.SHARED_SCOPE), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<AttributeKvEntry> result) {
GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
.setRequestId(requestId)
.setSharedStateMsg(true)
.addAllSharedAttributeList(toTsKvProtos(result))
.setIsMultipleAttributesRequest(request.getSharedAttributeNamesCount() > 1)
.build();
sendToTransport(responseMsg, sessionInfo);
}
@Override
public void onFailure(Throwable t) {
GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
.setError(t.getMessage())
.setSharedStateMsg(true)
.build();
sendToTransport(responseMsg, sessionInfo);
}
}, MoreExecutors.directExecutor());
} else {
Futures.addCallback(getAttributesKvEntries(request), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
.setRequestId(requestId)
.addAllClientAttributeList(toTsKvProtos(result.get(0)))
.addAllSharedAttributeList(toTsKvProtos(result.get(1)))
.setIsMultipleAttributesRequest(
request.getSharedAttributeNamesCount() + request.getClientAttributeNamesCount() > 1)
.build();
sendToTransport(responseMsg, sessionInfo);
}
@Override
public void onFailure(Throwable t) {
GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
.setError(t.getMessage())
.build();
sendToTransport(responseMsg, sessionInfo);
}
}, MoreExecutors.directExecutor());
}
}
private ListenableFuture<List<List<AttributeKvEntry>>> getAttributesKvEntries(GetAttributeRequestMsg request) {
ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture;
ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture;
if (CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
clientAttributesFuture = findAllAttributesByScope(DataConstants.CLIENT_SCOPE);
sharedAttributesFuture = findAllAttributesByScope(DataConstants.SHARED_SCOPE);
} else if (!CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && !CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
clientAttributesFuture = findAttributesByScope(toSet(request.getClientAttributeNamesList()), DataConstants.CLIENT_SCOPE);
sharedAttributesFuture = findAttributesByScope(toSet(request.getSharedAttributeNamesList()), DataConstants.SHARED_SCOPE);
} else if (CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && !CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
clientAttributesFuture = Futures.immediateFuture(Collections.emptyList());
sharedAttributesFuture = findAttributesByScope(toSet(request.getSharedAttributeNamesList()), DataConstants.SHARED_SCOPE);
} else {
sharedAttributesFuture = Futures.immediateFuture(Collections.emptyList());
clientAttributesFuture = findAttributesByScope(toSet(request.getClientAttributeNamesList()), DataConstants.CLIENT_SCOPE);
}
return Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture));
}
private ListenableFuture<List<AttributeKvEntry>> findAllAttributesByScope(String scope) {
return systemContext.getAttributesService().findAll(tenantId, deviceId, scope);
}
private ListenableFuture<List<AttributeKvEntry>> findAttributesByScope(Set<String> attributesSet, String scope) {
return systemContext.getAttributesService().find(tenantId, deviceId, scope, attributesSet);
}
private Set<String> toSet(List<String> strings) {
return new HashSet<>(strings);
}
private SessionType getSessionType(UUID sessionId) {
return sessions.containsKey(sessionId) ? SessionType.ASYNC : SessionType.SYNC;
}
void processAttributesUpdate(TbActorCtx context, DeviceAttributesEventNotificationMsg msg) {
if (attributeSubscriptions.size() > 0) {
boolean hasNotificationData = false;
AttributeUpdateNotificationMsg.Builder notification = AttributeUpdateNotificationMsg.newBuilder();
if (msg.isDeleted()) {
List<String> sharedKeys = msg.getDeletedKeys().stream()
.filter(key -> DataConstants.SHARED_SCOPE.equals(key.getScope()))
.map(AttributeKey::getAttributeKey)
.collect(Collectors.toList());
if (!sharedKeys.isEmpty()) {
notification.addAllSharedDeleted(sharedKeys);
hasNotificationData = true;
}
} else {
if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) {
List<AttributeKvEntry> attributes = new ArrayList<>(msg.getValues());
if (attributes.size() > 0) {
List<TsKvProto> sharedUpdated = msg.getValues().stream().map(this::toTsKvProto)
.collect(Collectors.toList());
if (!sharedUpdated.isEmpty()) {
notification.addAllSharedUpdated(sharedUpdated);
hasNotificationData = true;
}
} else {
log.debug("[{}] No public shared side attributes changed!", deviceId);
}
}
}
if (hasNotificationData) {
AttributeUpdateNotificationMsg finalNotification = notification.build();
attributeSubscriptions.forEach((key, value) -> sendToTransport(finalNotification, key, value.getNodeId()));
}
} else {
log.debug("[{}] No registered attributes subscriptions to process!", deviceId);
}
}
private void processRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) {
UUID sessionId = getSessionId(sessionInfo);
log.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
boolean success = requestMd != null;
if (success) {
boolean hasError = StringUtils.isNotEmpty(responseMsg.getError());
try {
String payload = hasError ? responseMsg.getError() : responseMsg.getPayload();
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(
new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
payload, null));
if (requestMd.getMsg().getMsg().isPersisted()) {
RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;
JsonNode response;
try {
response = JacksonUtil.toJsonNode(payload);
} catch (IllegalArgumentException e) {
response = JacksonUtil.newObjectNode().put("error", payload);
}
systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);
}
} finally {
if (hasError && !requestMd.isDelivered()) {
sendNextPendingRequest(context);
}
}
} else {
log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
}
}
private void processRpcResponseStatus(TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseStatusMsg responseMsg) {
UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB());
RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus());
ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(responseMsg.getRequestId());
if (md != null) {
JsonNode response = null;
if (status.equals(RpcStatus.DELIVERED)) {
if (md.getMsg().getMsg().isOneway()) {
toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
if (rpcSequential) {
systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null));
}
} else {
md.setDelivered(true);
}
} else if (status.equals(RpcStatus.TIMEOUT)) {
Integer maxRpcRetries = md.getMsg().getMsg().getRetries();
maxRpcRetries = maxRpcRetries == null ? systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries());
if (maxRpcRetries <= md.getRetries()) {
toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
status = RpcStatus.FAILED;
response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry attempts have been exhausted. Retry attempts set: " + maxRpcRetries);
} else {
md.setRetries(md.getRetries() + 1);
}
}
if (md.getMsg().getMsg().isPersisted()) {
systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response);
}
if (status != RpcStatus.SENT) {
sendNextPendingRequest(context);
}
} else {
log.info("[{}][{}] Rpc has already removed from pending map.", deviceId, rpcId);
}
}
private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
UUID sessionId = getSessionId(sessionInfo);
if (subscribeCmd.getUnsubscribe()) {
log.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
attributeSubscriptions.remove(sessionId);
} else {
SessionInfoMetaData sessionMD = sessions.get(sessionId);
if (sessionMD == null) {
sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId()));
}
sessionMD.setSubscribedToAttributes(true);
log.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo());
dumpSessions();
}
}
private UUID getSessionId(SessionInfoProto sessionInfo) {
return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
}
private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
UUID sessionId = getSessionId(sessionInfo);
if (subscribeCmd.getUnsubscribe()) {
log.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.remove(sessionId);
} else {
SessionInfoMetaData sessionMD = sessions.get(sessionId);
if (sessionMD == null) {
sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId()));
}
sessionMD.setSubscribedToRPC(true);
log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
sendPendingRequests(context, sessionId, sessionInfo.getNodeId());
dumpSessions();
}
}
private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
UUID sessionId = getSessionId(sessionInfo);
Objects.requireNonNull(sessionId);
if (msg.getEvent() == SessionEvent.OPEN) {
if (sessions.containsKey(sessionId)) {
log.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
return;
}
log.debug("[{}] Processing new session [{}]. Current sessions size {}", deviceId, sessionId, sessions.size());
sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId())));
if (sessions.size() == 1) {
reportSessionOpen();
}
systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
dumpSessions();
} else if (msg.getEvent() == SessionEvent.CLOSED) {
log.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
sessions.remove(sessionId);
attributeSubscriptions.remove(sessionId);
rpcSubscriptions.remove(sessionId);
if (sessions.isEmpty()) {
reportSessionClose();
}
dumpSessions();
}
}
private void handleSessionActivity(TbActorCtx context, SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) {
UUID sessionId = getSessionId(sessionInfoProto);
Objects.requireNonNull(sessionId);
SessionInfoMetaData sessionMD = sessions.get(sessionId);
if (sessionMD != null) {
sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
if (subscriptionInfo.getAttributeSubscription()) {
attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
}
if (subscriptionInfo.getRpcSubscription()) {
rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
}
}
systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, subscriptionInfo.getLastActivityTime());
if (sessionMD != null) {
dumpSessions();
}
}
void processCredentialsUpdate(TbActorMsg msg) {
if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) {
sessions.forEach((k, v) -> {
notifyTransportAboutDeviceCredentialsUpdate(k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials());
});
} else {
sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, "device credentials updated!"));
attributeSubscriptions.clear();
rpcSubscriptions.clear();
dumpSessions();
}
}
private void notifyTransportAboutClosedSessionMaxSessionsLimit(UUID sessionId, SessionInfoMetaData sessionMd) {
log.debug("remove eldest session (max concurrent sessions limit reached per device) sessionId [{}] sessionMd [{}]", sessionId, sessionMd);
notifyTransportAboutClosedSession(sessionId, sessionMd, "max concurrent sessions limit reached per device!");
}
private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, String message) {
SessionCloseNotificationProto sessionCloseNotificationProto = SessionCloseNotificationProto
.newBuilder()
.setMessage(message).build();
ToTransportMsg msg = ToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setSessionCloseNotification(sessionCloseNotificationProto)
.build();
systemContext.getTbCoreToTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg);
}
void notifyTransportAboutDeviceCredentialsUpdate(UUID sessionId, SessionInfoMetaData sessionMd, DeviceCredentials deviceCredentials) {
ToTransportUpdateCredentialsProto.Builder notification = ToTransportUpdateCredentialsProto.newBuilder();
notification.addCredentialsId(deviceCredentials.getCredentialsId());
notification.addCredentialsValue(deviceCredentials.getCredentialsValue());
ToTransportMsg msg = ToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setToTransportUpdateCredentialsNotification(notification).build();
systemContext.getTbCoreToTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg);
}
void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
this.deviceName = msg.getDeviceName();
this.deviceType = msg.getDeviceType();
this.defaultMetaData = new TbMsgMetaData();
this.defaultMetaData.putValue("deviceName", deviceName);
this.defaultMetaData.putValue("deviceType", deviceType);
}
void processEdgeUpdate(DeviceEdgeUpdateMsg msg) {
log.trace("[{}] Processing edge update {}", deviceId, msg);
this.edgeId = msg.getEdgeId();
}
private void sendToTransport(GetAttributeResponseMsg responseMsg, SessionInfoProto sessionInfo) {
ToTransportMsg msg = ToTransportMsg.newBuilder()
.setSessionIdMSB(sessionInfo.getSessionIdMSB())
.setSessionIdLSB(sessionInfo.getSessionIdLSB())
.setGetAttributesResponse(responseMsg).build();
systemContext.getTbCoreToTransportService().process(sessionInfo.getNodeId(), msg);
}
private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, String nodeId) {
ToTransportMsg msg = ToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setAttributeUpdateNotification(notificationMsg).build();
systemContext.getTbCoreToTransportService().process(nodeId, msg);
}
private void sendToTransport(ToDeviceRpcRequestMsg rpcMsg, UUID sessionId, String nodeId) {
ToTransportMsg msg = ToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setToDeviceRequest(rpcMsg).build();
systemContext.getTbCoreToTransportService().process(nodeId, msg);
}
private void sendToTransport(ToServerRpcResponseMsg rpcMsg, UUID sessionId, String nodeId) {
ToTransportMsg msg = ToTransportMsg.newBuilder()
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setToServerResponse(rpcMsg).build();
systemContext.getTbCoreToTransportService().process(nodeId, msg);
}
private ListenableFuture<Void> saveRpcRequestToEdgeQueue(ToDeviceRpcRequest msg, Integer requestId) {
ObjectNode body = mapper.createObjectNode();
body.put("requestId", requestId);
body.put("requestUUID", msg.getId().toString());
body.put("oneway", msg.isOneway());
body.put("expirationTime", msg.getExpirationTime());
body.put("method", msg.getBody().getMethod());
body.put("params", msg.getBody().getParams());
body.put("persisted", msg.isPersisted());
body.put("retries", msg.getRetries());
body.put("additionalInfo", msg.getAdditionalInfo());
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DEVICE, EdgeEventActionType.RPC_CALL, deviceId, body);
return Futures.transform(systemContext.getEdgeEventService().saveAsync(edgeEvent), unused -> {
systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
return null;
}, systemContext.getDbCallbackExecutor());
}
private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
List<TsKvProto> clientAttributes;
if (result == null || result.isEmpty()) {
clientAttributes = Collections.emptyList();
} else {
clientAttributes = new ArrayList<>(result.size());
for (AttributeKvEntry attrEntry : result) {
clientAttributes.add(toTsKvProto(attrEntry));
}
}
return clientAttributes;
}
private TsKvProto toTsKvProto(AttributeKvEntry attrEntry) {
return TsKvProto.newBuilder().setTs(attrEntry.getLastUpdateTs())
.setKv(toKeyValueProto(attrEntry)).build();
}
private KeyValueProto toKeyValueProto(KvEntry kvEntry) {
KeyValueProto.Builder builder = KeyValueProto.newBuilder();
builder.setKey(kvEntry.getKey());
switch (kvEntry.getDataType()) {
case BOOLEAN:
builder.setType(KeyValueType.BOOLEAN_V);
builder.setBoolV(kvEntry.getBooleanValue().get());
break;
case DOUBLE:
builder.setType(KeyValueType.DOUBLE_V);
builder.setDoubleV(kvEntry.getDoubleValue().get());
break;
case LONG:
builder.setType(KeyValueType.LONG_V);
builder.setLongV(kvEntry.getLongValue().get());
break;
case STRING:
builder.setType(KeyValueType.STRING_V);
builder.setStringV(kvEntry.getStrValue().get());
break;
case JSON:
builder.setType(KeyValueType.JSON_V);
builder.setJsonV(kvEntry.getJsonValue().get());
break;
}
return builder.build();
}
void restoreSessions() {
if (systemContext.isLocalCacheType()) {
return;
}
log.debug("[{}] Restoring sessions from cache", deviceId);
DeviceSessionsCacheEntry sessionsDump;
try {
sessionsDump = systemContext.getDeviceSessionCacheService().get(deviceId);
} catch (Exception e) {
log.warn("[{}] Failed to decode device sessions from cache", deviceId);
return;
}
if (sessionsDump.getSessionsCount() == 0) {
log.debug("[{}] No session information found", deviceId);
return;
}
// TODO: Take latest max allowed sessions size from cache
for (SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
SessionInfoProto sessionInfoProto = sessionSubscriptionInfoProto.getSessionInfo();
UUID sessionId = getSessionId(sessionInfoProto);
SessionInfo sessionInfo = new SessionInfo(SessionType.ASYNC, sessionInfoProto.getNodeId());
SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
SessionInfoMetaData sessionMD = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
sessions.put(sessionId, sessionMD);
if (subInfo.getAttributeSubscription()) {
attributeSubscriptions.put(sessionId, sessionInfo);
sessionMD.setSubscribedToAttributes(true);
}
if (subInfo.getRpcSubscription()) {
rpcSubscriptions.put(sessionId, sessionInfo);
sessionMD.setSubscribedToRPC(true);
}
log.debug("[{}] Restored session: {}", deviceId, sessionMD);
}
log.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
}
private void dumpSessions() {
if (systemContext.isLocalCacheType()) {
return;
}
log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
List<SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
sessions.forEach((uuid, sessionMD) -> {
if (sessionMD.getSessionInfo().getType() == SessionType.SYNC) {
return;
}
SessionInfo sessionInfo = sessionMD.getSessionInfo();
SubscriptionInfoProto subscriptionInfoProto = SubscriptionInfoProto.newBuilder()
.setLastActivityTime(sessionMD.getLastActivityTime())
.setAttributeSubscription(sessionMD.isSubscribedToAttributes())
.setRpcSubscription(sessionMD.isSubscribedToRPC()).build();
SessionInfoProto sessionInfoProto = SessionInfoProto.newBuilder()
.setSessionIdMSB(uuid.getMostSignificantBits())
.setSessionIdLSB(uuid.getLeastSignificantBits())
.setNodeId(sessionInfo.getNodeId()).build();
sessionsList.add(SessionSubscriptionInfoProto.newBuilder()
.setSessionInfo(sessionInfoProto)
.setSubscriptionInfo(subscriptionInfoProto).build());
log.debug("[{}] Dumping session: {}", deviceId, sessionMD);
});
systemContext.getDeviceSessionCacheService()
.put(deviceId, DeviceSessionsCacheEntry.newBuilder()
.addAllSessions(sessionsList).build());
}
void init(TbActorCtx ctx) {
PageLink pageLink = new PageLink(1024, 0, null, new SortOrder("createdTime"));
PageData<Rpc> pageData;
do {
pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink);
pageData.getData().forEach(rpc -> {
ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class);
long timeout = rpc.getExpirationTime() - System.currentTimeMillis();
if (timeout <= 0) {
rpc.setStatus(RpcStatus.EXPIRED);
systemContext.getTbRpcService().save(tenantId, rpc);
} else {
registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout);
}
});
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
}
} while (pageData.hasNext());
}
void checkSessionsTimeout() {
final long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout();
List<UUID> expiredIds = null;
for (Map.Entry<UUID, SessionInfoMetaData> kv : sessions.entrySet()) { //entry set are cached for stable sessions
if (kv.getValue().getLastActivityTime() < expTime) {
final UUID id = kv.getKey();
if (expiredIds == null) {
expiredIds = new ArrayList<>(1); //most of the expired sessions is a single event
}
expiredIds.add(id);
}
}
if (expiredIds != null) {
int removed = 0;
for (UUID id : expiredIds) {
final SessionInfoMetaData session = sessions.remove(id);
rpcSubscriptions.remove(id);
attributeSubscriptions.remove(id);
if (session != null) {
removed++;
notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE);
}
}
if (removed != 0) {
dumpSessions();
}
}
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.device;
import lombok.Data;
import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
/**
* @author Andrew Shvayka
*/
@Data
public class SessionInfo {
private final SessionType type;
private final String nodeId;
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.device;
import lombok.Data;
import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
/**
* @author Andrew Shvayka
*/
@Data
class SessionInfoMetaData {
private final SessionInfo sessionInfo;
private long lastActivityTime;
private boolean subscribedToAttributes;
private boolean subscribedToRPC;
SessionInfoMetaData(SessionInfo sessionInfo) {
this(sessionInfo, System.currentTimeMillis());
}
SessionInfoMetaData(SessionInfo sessionInfo, long lastActivityTime) {
this.sessionInfo = sessionInfo;
this.lastActivityTime = lastActivityTime;
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.device;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
/**
* Created by ashvayka on 29.10.18.
*/
public class SessionTimeoutCheckMsg implements TbActorMsg {
private static final SessionTimeoutCheckMsg INSTANCE = new SessionTimeoutCheckMsg();
private SessionTimeoutCheckMsg() {
}
public static SessionTimeoutCheckMsg instance() {
return INSTANCE;
}
@Override
public MsgType getMsgType() {
return MsgType.SESSION_TIMEOUT_MSG;
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.device;
import lombok.Data;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
/**
* @author Andrew Shvayka
*/
@Data
public class ToDeviceRpcRequestMetadata {
private final ToDeviceRpcRequestActorMsg msg;
private final boolean sent;
private int retries;
private boolean delivered;
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.device;
import lombok.Data;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.UUID;
/**
* @author Andrew Shvayka
*/
@Data
public class ToServerRpcRequestMetadata {
private final UUID sessionId;
private final TransportProtos.SessionType type;
private final String nodeId;
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.channel.EventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.bouncycastle.util.Arrays;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService;
import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.ScriptEngine;
import org.thingsboard.rule.engine.api.SmsService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
import org.thingsboard.rule.engine.util.TenantIdLoader;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.HasRuleEngineProfile;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.common.data.script.ScriptLanguage;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.TbMsgProcessingStackItem;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.asset.AssetProfileService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeEventService;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
import org.thingsboard.server.dao.nosql.TbResultSetFuture;
import org.thingsboard.server.dao.ota.OtaPackageService;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
import org.thingsboard.server.service.script.RuleNodeTbelScriptEngine;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
* Created by ashvayka on 19.03.18.
*/
@Slf4j
class DefaultTbContext implements TbContext {
public final static ObjectMapper mapper = new ObjectMapper();
private final ActorSystemContext mainCtx;
private final String ruleChainName;
private final RuleNodeCtx nodeCtx;
public DefaultTbContext(ActorSystemContext mainCtx, String ruleChainName, RuleNodeCtx nodeCtx) {
this.mainCtx = mainCtx;
this.ruleChainName = ruleChainName;
this.nodeCtx = nodeCtx;
}
@Override
public void tellSuccess(TbMsg msg) {
tellNext(msg, Collections.singleton(TbRelationTypes.SUCCESS), null);
}
@Override
public void tellNext(TbMsg msg, String relationType) {
tellNext(msg, Collections.singleton(relationType), null);
}
@Override
public void tellNext(TbMsg msg, Set<String> relationTypes) {
tellNext(msg, relationTypes, null);
}
private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
if (nodeCtx.getSelf().isDebugMode()) {
relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
}
msg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
}
@Override
public void tellSelf(TbMsg msg, long delayMs) {
//TODO: add persistence layer
scheduleMsgWithDelay(new RuleNodeToSelfMsg(this, msg), delayMs, nodeCtx.getSelfActor());
}
@Override
public void input(TbMsg msg, RuleChainId ruleChainId) {
msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
nodeCtx.getChainActor().tell(new RuleChainInputMsg(ruleChainId, msg));
}
@Override
public void output(TbMsg msg, String relationType) {
TbMsgProcessingStackItem item = msg.popFormStack();
if (item == null) {
ack(msg);
} else {
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType);
}
nodeCtx.getChainActor().tell(new RuleChainOutputMsg(item.getRuleChainId(), item.getRuleNodeId(), relationType, msg));
}
}
@Override
public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator());
enqueue(tpi, tbMsg, onFailure, onSuccess);
}
@Override
public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
enqueue(tpi, tbMsg, onFailure, onSuccess);
}
private void enqueue(TopicPartitionInfo tpi, TbMsg tbMsg, Consumer<Throwable> onFailure, Runnable onSuccess) {
if (!tbMsg.isValid()) {
log.trace("[{}] Skip invalid message: {}", getTenantId(), tbMsg);
if (onFailure != null) {
onFailure.accept(new IllegalArgumentException("Source message is no longer valid!"));
}
return;
}
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(onSuccess, onFailure));
}
@Override
public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
TopicPartitionInfo tpi = resolvePartition(tbMsg);
enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String relationType) {
TopicPartitionInfo tpi = resolvePartition(tbMsg);
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) {
TopicPartitionInfo tpi = resolvePartition(tbMsg);
enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = resolvePartition(tbMsg);
enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = resolvePartition(tbMsg);
enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
enqueueForTellNext(tpi, queueName, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
enqueueForTellNext(tpi, queueName, tbMsg, relationTypes, null, onSuccess, onFailure);
}
private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) {
return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
}
private TopicPartitionInfo resolvePartition(TbMsg tbMsg) {
return resolvePartition(tbMsg, tbMsg.getQueueName());
}
private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
enqueueForTellNext(tpi, source.getQueueName(), source, relationTypes, failureMessage, onSuccess, onFailure);
}
private void enqueueForTellNext(TopicPartitionInfo tpi, String queueName, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
if (!source.isValid()) {
log.trace("[{}] Skip invalid message: {}", getTenantId(), source);
if (onFailure != null) {
onFailure.accept(new IllegalArgumentException("Source message is no longer valid!"));
}
return;
}
RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
TbMsg tbMsg = TbMsg.newMsg(source, queueName, ruleChainId, ruleNodeId);
TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg))
.addAllRelationTypes(relationTypes);
if (failureMessage != null) {
msg.setFailureMessage(failureMessage);
}
if (nodeCtx.getSelf().isDebugMode()) {
relationTypes.forEach(relationType ->
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, relationType, null, failureMessage));
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(onSuccess, onFailure));
}
@Override
public void ack(TbMsg tbMsg) {
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "ACK", null);
}
tbMsg.getCallback().onProcessingEnd(nodeCtx.getSelf().getId());
tbMsg.getCallback().onSuccess();
}
@Override
public boolean isLocalEntity(EntityId entityId) {
return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), entityId).isMyPartition();
}
private void scheduleMsgWithDelay(TbActorMsg msg, long delayInMs, TbActorRef target) {
mainCtx.scheduleMsgWithDelay(target, msg, delayInMs);
}
@Override
public void tellFailure(TbMsg msg, Throwable th) {
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, TbRelationTypes.FAILURE, th);
}
String failureMessage;
if (th != null) {
if (!StringUtils.isEmpty(th.getMessage())) {
failureMessage = th.getMessage();
} else {
failureMessage = th.getClass().getSimpleName();
}
} else {
failureMessage = null;
}
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getRuleChainId(),
nodeCtx.getSelf().getId(), Collections.singleton(TbRelationTypes.FAILURE),
msg, failureMessage));
}
public void updateSelf(RuleNode self) {
nodeCtx.setSelf(self);
}
@Override
public TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
return newMsg(queueName, type, originator, null, metaData, data);
}
@Override
public TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
return TbMsg.newMsg(queueName, type, originator, customerId, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
}
@Override
public TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
return TbMsg.transformMsg(origMsg, type, originator, metaData, data);
}
public TbMsg customerCreatedMsg(Customer customer, RuleNodeId ruleNodeId) {
return entityActionMsg(customer, customer.getId(), ruleNodeId, DataConstants.ENTITY_CREATED);
}
public TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId) {
DeviceProfile deviceProfile = null;
if (device.getDeviceProfileId() != null) {
deviceProfile = mainCtx.getDeviceProfileCache().find(device.getDeviceProfileId());
}
return entityActionMsg(device, device.getId(), ruleNodeId, DataConstants.ENTITY_CREATED, deviceProfile);
}
public TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId) {
AssetProfile assetProfile = null;
if (asset.getAssetProfileId() != null) {
assetProfile = mainCtx.getAssetProfileCache().find(asset.getAssetProfileId());
}
return entityActionMsg(asset, asset.getId(), ruleNodeId, DataConstants.ENTITY_CREATED, assetProfile);
}
public TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action) {
HasRuleEngineProfile profile = null;
if (EntityType.DEVICE.equals(alarm.getOriginator().getEntityType())) {
DeviceId deviceId = new DeviceId(alarm.getOriginator().getId());
profile = mainCtx.getDeviceProfileCache().get(getTenantId(), deviceId);
} else if (EntityType.ASSET.equals(alarm.getOriginator().getEntityType())) {
AssetId assetId = new AssetId(alarm.getOriginator().getId());
profile = mainCtx.getAssetProfileCache().get(getTenantId(), assetId);
}
return entityActionMsg(alarm, alarm.getOriginator(), ruleNodeId, action, profile);
}
public TbMsg attributesUpdatedActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, List<AttributeKvEntry> attributes) {
ObjectNode entityNode = JacksonUtil.newObjectNode();
if (attributes != null) {
attributes.forEach(attributeKvEntry -> JacksonUtil.addKvEntry(entityNode, attributeKvEntry));
}
return attributesActionMsg(originator, ruleNodeId, scope, DataConstants.ATTRIBUTES_UPDATED, JacksonUtil.toString(entityNode));
}
public TbMsg attributesDeletedActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, List<String> keys) {
ObjectNode entityNode = JacksonUtil.newObjectNode();
ArrayNode attrsArrayNode = entityNode.putArray("attributes");
if (keys != null) {
keys.forEach(attrsArrayNode::add);
}
return attributesActionMsg(originator, ruleNodeId, scope, DataConstants.ATTRIBUTES_DELETED, JacksonUtil.toString(entityNode));
}
private TbMsg attributesActionMsg(EntityId originator, RuleNodeId ruleNodeId, String scope, String action, String msgData) {
TbMsgMetaData tbMsgMetaData = getActionMetaData(ruleNodeId);
tbMsgMetaData.putValue("scope", scope);
HasRuleEngineProfile profile = null;
if (EntityType.DEVICE.equals(originator.getEntityType())) {
DeviceId deviceId = new DeviceId(originator.getId());
profile = mainCtx.getDeviceProfileCache().get(getTenantId(), deviceId);
} else if (EntityType.ASSET.equals(originator.getEntityType())) {
AssetId assetId = new AssetId(originator.getId());
profile = mainCtx.getAssetProfileCache().get(getTenantId(), assetId);
}
return entityActionMsg(originator, tbMsgMetaData, msgData, action, profile);
}
@Override
public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) {
mainCtx.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
}
public <E, I extends EntityId> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action) {
return entityActionMsg(entity, id, ruleNodeId, action, null);
}
public <E, I extends EntityId, K extends HasRuleEngineProfile> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action, K profile) {
try {
return entityActionMsg(id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), action, profile);
} catch (JsonProcessingException | IllegalArgumentException e) {
throw new RuntimeException("Failed to process " + id.getEntityType().name().toLowerCase() + " " + action + " msg: " + e);
}
}
private <I extends EntityId, K extends HasRuleEngineProfile> TbMsg entityActionMsg(I id, TbMsgMetaData msgMetaData, String msgData, String action, K profile) {
String defaultQueueName = null;
RuleChainId defaultRuleChainId = null;
if (profile != null) {
defaultQueueName = profile.getDefaultQueueName();
defaultRuleChainId = profile.getDefaultRuleChainId();
}
return TbMsg.newMsg(defaultQueueName, action, id, msgMetaData, msgData, defaultRuleChainId, null);
}
@Override
public RuleNodeId getSelfId() {
return nodeCtx.getSelf().getId();
}
@Override
public RuleNode getSelf() {
return nodeCtx.getSelf();
}
@Override
public String getRuleChainName() {
return ruleChainName;
}
@Override
public TenantId getTenantId() {
return nodeCtx.getTenantId();
}
@Override
public ListeningExecutor getMailExecutor() {
return mainCtx.getMailExecutor();
}
@Override
public ListeningExecutor getSmsExecutor() {
return mainCtx.getSmsExecutor();
}
@Override
public ListeningExecutor getDbCallbackExecutor() {
return mainCtx.getDbCallbackExecutor();
}
@Override
public ListeningExecutor getExternalCallExecutor() {
return mainCtx.getExternalCallExecutorService();
}
@Override
@Deprecated
public ScriptEngine createJsScriptEngine(String script, String... argNames) {
return new RuleNodeJsScriptEngine(getTenantId(), mainCtx.getJsInvokeService(), script, argNames);
}
private ScriptEngine createTbelScriptEngine(String script, String... argNames) {
if (mainCtx.getTbelInvokeService() == null) {
throw new RuntimeException("TBEL execution is disabled!");
}
return new RuleNodeTbelScriptEngine(getTenantId(), mainCtx.getTbelInvokeService(), script, argNames);
}
@Override
public ScriptEngine createScriptEngine(ScriptLanguage scriptLang, String script, String... argNames) {
if (scriptLang == null) {
scriptLang = ScriptLanguage.JS;
}
if (StringUtils.isBlank(script)) {
throw new RuntimeException(scriptLang.name() + " script is blank!");
}
switch (scriptLang) {
case JS:
return createJsScriptEngine(script, argNames);
case TBEL:
if (Arrays.isNullOrEmpty(argNames)) {
return createTbelScriptEngine(script, "msg", "metadata", "msgType");
} else {
return createTbelScriptEngine(script, argNames);
}
default:
throw new RuntimeException("Unsupported script language: " + scriptLang.name());
}
}
@Override
public void logJsEvalRequest() {
if (mainCtx.isStatisticsEnabled()) {
mainCtx.getJsInvokeStats().incrementRequests();
}
}
@Override
public void logJsEvalResponse() {
if (mainCtx.isStatisticsEnabled()) {
mainCtx.getJsInvokeStats().incrementResponses();
}
}
@Override
public void logJsEvalFailure() {
if (mainCtx.isStatisticsEnabled()) {
mainCtx.getJsInvokeStats().incrementFailures();
}
}
@Override
public String getServiceId() {
return mainCtx.getServiceInfoProvider().getServiceId();
}
@Override
public AttributesService getAttributesService() {
return mainCtx.getAttributesService();
}
@Override
public CustomerService getCustomerService() {
return mainCtx.getCustomerService();
}
@Override
public TenantService getTenantService() {
return mainCtx.getTenantService();
}
@Override
public UserService getUserService() {
return mainCtx.getUserService();
}
@Override
public AssetService getAssetService() {
return mainCtx.getAssetService();
}
@Override
public DeviceService getDeviceService() {
return mainCtx.getDeviceService();
}
@Override
public DeviceProfileService getDeviceProfileService() {
return mainCtx.getDeviceProfileService();
}
@Override
public AssetProfileService getAssetProfileService() {
return mainCtx.getAssetProfileService();
}
@Override
public DeviceCredentialsService getDeviceCredentialsService() {
return mainCtx.getDeviceCredentialsService();
}
@Override
public TbClusterService getClusterService() {
return mainCtx.getClusterService();
}
@Override
public DashboardService getDashboardService() {
return mainCtx.getDashboardService();
}
@Override
public RuleEngineAlarmService getAlarmService() {
return mainCtx.getAlarmService();
}
@Override
public RuleChainService getRuleChainService() {
return mainCtx.getRuleChainService();
}
@Override
public TimeseriesService getTimeseriesService() {
return mainCtx.getTsService();
}
@Override
public RuleEngineTelemetryService getTelemetryService() {
return mainCtx.getTsSubService();
}
@Override
public RelationService getRelationService() {
return mainCtx.getRelationService();
}
@Override
public EntityViewService getEntityViewService() {
return mainCtx.getEntityViewService();
}
@Override
public ResourceService getResourceService() {
return mainCtx.getResourceService();
}
@Override
public OtaPackageService getOtaPackageService() {
return mainCtx.getOtaPackageService();
}
@Override
public RuleEngineDeviceProfileCache getDeviceProfileCache() {
return mainCtx.getDeviceProfileCache();
}
@Override
public RuleEngineAssetProfileCache getAssetProfileCache() {
return mainCtx.getAssetProfileCache();
}
@Override
public EdgeService getEdgeService() {
return mainCtx.getEdgeService();
}
@Override
public EdgeEventService getEdgeEventService() {
return mainCtx.getEdgeEventService();
}
@Override
public QueueService getQueueService() {
return mainCtx.getQueueService();
}
@Override
public EventLoopGroup getSharedEventLoop() {
return mainCtx.getSharedEventLoopGroupService().getSharedEventLoopGroup();
}
@Override
public MailService getMailService(boolean isSystem) {
if (!isSystem || mainCtx.isAllowSystemMailService()) {
return mainCtx.getMailService();
} else {
throw new RuntimeException("Access to System Mail Service is forbidden!");
}
}
@Override
public SmsService getSmsService() {
if (mainCtx.isAllowSystemSmsService()) {
return mainCtx.getSmsService();
} else {
throw new RuntimeException("Access to System SMS Service is forbidden!");
}
}
@Override
public SmsSenderFactory getSmsSenderFactory() {
return mainCtx.getSmsSenderFactory();
}
@Override
public RuleEngineRpcService getRpcService() {
return mainCtx.getTbRuleEngineDeviceRpcService();
}
@Override
public CassandraCluster getCassandraCluster() {
return mainCtx.getCassandraCluster();
}
@Override
public TbResultSetFuture submitCassandraReadTask(CassandraStatementTask task) {
return mainCtx.getCassandraBufferedRateReadExecutor().submit(task);
}
@Override
public TbResultSetFuture submitCassandraWriteTask(CassandraStatementTask task) {
return mainCtx.getCassandraBufferedRateWriteExecutor().submit(task);
}
@Override
public PageData<RuleNodeState> findRuleNodeStates(PageLink pageLink) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Fetch Rule Node States.", getTenantId(), getSelfId());
}
return mainCtx.getRuleNodeStateService().findByRuleNodeId(getTenantId(), getSelfId(), pageLink);
}
@Override
public RuleNodeState findRuleNodeStateForEntity(EntityId entityId) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}][{}] Fetch Rule Node State for entity.", getTenantId(), getSelfId(), entityId);
}
return mainCtx.getRuleNodeStateService().findByRuleNodeIdAndEntityId(getTenantId(), getSelfId(), entityId);
}
@Override
public RuleNodeState saveRuleNodeState(RuleNodeState state) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}][{}] Persist Rule Node State for entity: {}", getTenantId(), getSelfId(), state.getEntityId(), state.getStateData());
}
state.setRuleNodeId(getSelfId());
return mainCtx.getRuleNodeStateService().save(getTenantId(), state);
}
@Override
public void clearRuleNodeStates() {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Going to clear rule node states", getTenantId(), getSelfId());
}
mainCtx.getRuleNodeStateService().removeByRuleNodeId(getTenantId(), getSelfId());
}
@Override
public void removeRuleNodeStateForEntity(EntityId entityId) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}][{}] Remove Rule Node State for entity.", getTenantId(), getSelfId(), entityId);
}
mainCtx.getRuleNodeStateService().removeByRuleNodeIdAndEntityId(getTenantId(), getSelfId(), entityId);
}
@Override
public void addTenantProfileListener(Consumer<TenantProfile> listener) {
mainCtx.getTenantProfileCache().addListener(getTenantId(), getSelfId(), listener);
}
@Override
public void addDeviceProfileListeners(Consumer<DeviceProfile> profileListener, BiConsumer<DeviceId, DeviceProfile> deviceListener) {
mainCtx.getDeviceProfileCache().addListener(getTenantId(), getSelfId(), profileListener, deviceListener);
}
@Override
public void addAssetProfileListeners(Consumer<AssetProfile> profileListener, BiConsumer<AssetId, AssetProfile> assetListener) {
mainCtx.getAssetProfileCache().addListener(getTenantId(), getSelfId(), profileListener, assetListener);
}
@Override
public void removeListeners() {
mainCtx.getDeviceProfileCache().removeListener(getTenantId(), getSelfId());
mainCtx.getAssetProfileCache().removeListener(getTenantId(), getSelfId());
mainCtx.getTenantProfileCache().removeListener(getTenantId(), getSelfId());
}
@Override
public TenantProfile getTenantProfile() {
return mainCtx.getTenantProfileCache().get(getTenantId());
}
@Override
public WidgetsBundleService getWidgetBundleService() {
return mainCtx.getWidgetsBundleService();
}
@Override
public WidgetTypeService getWidgetTypeService() {
return mainCtx.getWidgetTypeService();
}
@Override
public RuleEngineApiUsageStateService getRuleEngineApiUsageStateService() {
return mainCtx.getApiUsageStateService();
}
private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("ruleNodeId", ruleNodeId.toString());
return metaData;
}
@Override
public void checkTenantEntity(EntityId entityId) {
if (!this.getTenantId().equals(TenantIdLoader.findTenantId(this, entityId))) {
throw new RuntimeException("Entity with id: '" + entityId + "' specified in the configuration doesn't belong to the current tenant.");
}
}
private class SimpleTbQueueCallback implements TbQueueCallback {
private final Runnable onSuccess;
private final Consumer<Throwable> onFailure;
public SimpleTbQueueCallback(Runnable onSuccess, Consumer<Throwable> onFailure) {
this.onSuccess = onSuccess;
this.onFailure = onFailure;
}
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
if (onSuccess != null) {
onSuccess.run();
}
}
@Override
public void onFailure(Throwable t) {
if (onFailure != null) {
onFailure.accept(t);
} else {
log.debug("[{}] Failed to put item into queue", nodeCtx.getTenantId(), t);
}
}
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorId;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.service.ComponentActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
public class RuleChainActor extends ComponentActor<RuleChainId, RuleChainActorMessageProcessor> {
private final RuleChain ruleChain;
private RuleChainActor(ActorSystemContext systemContext, TenantId tenantId, RuleChain ruleChain) {
super(systemContext, tenantId, ruleChain.getId());
this.ruleChain = ruleChain;
}
@Override
protected RuleChainActorMessageProcessor createProcessor(TbActorCtx ctx) {
return new RuleChainActorMessageProcessor(tenantId, ruleChain, systemContext,
ctx.getParentRef(), ctx);
}
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case QUEUE_TO_RULE_ENGINE_MSG:
processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
break;
case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
break;
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
break;
case RULE_CHAIN_INPUT_MSG:
processor.onRuleChainInputMsg((RuleChainInputMsg) msg);
break;
case RULE_CHAIN_OUTPUT_MSG:
processor.onRuleChainOutputMsg((RuleChainOutputMsg) msg);
break;
case PARTITION_CHANGE_MSG:
processor.onPartitionChangeMsg((PartitionChangeMsg) msg);
break;
case STATS_PERSIST_TICK_MSG:
onStatsPersistTick(id);
break;
default:
return false;
}
return true;
}
public static class ActorCreator extends ContextBasedCreator {
private static final long serialVersionUID = 1L;
private final TenantId tenantId;
private final RuleChain ruleChain;
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChain ruleChain) {
super(context);
this.tenantId = tenantId;
this.ruleChain = ruleChain;
}
@Override
public TbActorId createActorId() {
return new TbEntityActorId(ruleChain.getId());
}
@Override
public TbActor createActor() {
return new RuleChainActor(context, tenantId, ruleChain);
}
}
@Override
protected long getErrorPersistFrequency() {
return systemContext.getRuleChainErrorPersistFrequency();
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.TbRelationTypes;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.plugin.RuleNodeUpdatedMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.common.MultipleTbQueueTbMsgCallbackWrapper;
import org.thingsboard.server.queue.common.TbQueueTbMsgCallbackWrapper;
import org.thingsboard.server.cluster.TbClusterService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* @author Andrew Shvayka
*/
@Slf4j
public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleChainId> {
private static final String NA_RELATION_TYPE = "";
private final TbActorRef parent;
private final TbActorRef self;
private final Map<RuleNodeId, RuleNodeCtx> nodeActors;
private final Map<RuleNodeId, List<RuleNodeRelation>> nodeRoutes;
private final RuleChainService service;
private final TbClusterService clusterService;
private final TbApiUsageReportClient apiUsageClient;
private String ruleChainName;
private RuleNodeId firstId;
private RuleNodeCtx firstNode;
private boolean started;
RuleChainActorMessageProcessor(TenantId tenantId, RuleChain ruleChain, ActorSystemContext systemContext, TbActorRef parent, TbActorRef self) {
super(systemContext, tenantId, ruleChain.getId());
this.apiUsageClient = systemContext.getApiUsageClient();
this.ruleChainName = ruleChain.getName();
this.parent = parent;
this.self = self;
this.nodeActors = new HashMap<>();
this.nodeRoutes = new HashMap<>();
this.service = systemContext.getRuleChainService();
this.clusterService = systemContext.getClusterService();
}
@Override
public String getComponentName() {
return null;
}
@Override
public void start(TbActorCtx context) {
if (!started) {
RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
if (ruleChain != null && RuleChainType.CORE.equals(ruleChain.getType())) {
List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
// Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) {
log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
initRoutes(ruleChain, ruleNodeList);
started = true;
}
} else {
onUpdate(context);
}
}
@Override
public void onUpdate(TbActorCtx context) {
RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
if (ruleChain != null && RuleChainType.CORE.equals(ruleChain.getType())) {
ruleChainName = ruleChain.getName();
List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
log.trace("[{}][{}] Updating rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
for (RuleNode ruleNode : ruleNodeList) {
RuleNodeCtx existing = nodeActors.get(ruleNode.getId());
if (existing == null) {
log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
} else {
log.trace("[{}][{}] Updating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
existing.setSelf(ruleNode);
existing.getSelfActor().tellWithHighPriority(new RuleNodeUpdatedMsg(tenantId, existing.getSelf().getId()));
}
}
Set<RuleNodeId> existingNodes = ruleNodeList.stream().map(RuleNode::getId).collect(Collectors.toSet());
List<RuleNodeId> removedRules = nodeActors.keySet().stream().filter(node -> !existingNodes.contains(node)).collect(Collectors.toList());
removedRules.forEach(ruleNodeId -> {
log.trace("[{}][{}] Removing rule node [{}]", tenantId, entityId, ruleNodeId);
RuleNodeCtx removed = nodeActors.remove(ruleNodeId);
removed.getSelfActor().tellWithHighPriority(new ComponentLifecycleMsg(tenantId, removed.getSelf().getId(), ComponentLifecycleEvent.DELETED));
});
initRoutes(ruleChain, ruleNodeList);
}
}
@Override
public void stop(TbActorCtx ctx) {
log.trace("[{}][{}] Stopping rule chain with {} nodes", tenantId, entityId, nodeActors.size());
nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).map(TbActorRef::getActorId).forEach(ctx::stop);
nodeActors.clear();
nodeRoutes.clear();
started = false;
}
@Override
public void onPartitionChangeMsg(PartitionChangeMsg msg) {
nodeActors.values().stream().map(RuleNodeCtx::getSelfActor).forEach(actorRef -> actorRef.tellWithHighPriority(msg));
}
private TbActorRef createRuleNodeActor(TbActorCtx ctx, RuleNode ruleNode) {
return ctx.getOrCreateChildActor(new TbEntityActorId(ruleNode.getId()),
() -> DefaultActorService.RULE_DISPATCHER_NAME,
() -> new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleChainName, ruleNode.getId()));
}
private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) {
nodeRoutes.clear();
// Populating the routes map;
for (RuleNode ruleNode : ruleNodeList) {
List<EntityRelation> relations = service.getRuleNodeRelations(TenantId.SYS_TENANT_ID, ruleNode.getId());
log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
if (relations.size() == 0) {
nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
} else {
for (EntityRelation relation : relations) {
log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo());
if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
if (ruleNodeCtx == null) {
throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
}
}
nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
.add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
}
}
}
firstId = ruleChain.getFirstRuleNodeId();
firstNode = nodeActors.get(firstId);
state = ComponentLifecycleState.ACTIVE;
}
void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
TbMsg msg = envelope.getMsg();
if (!checkMsgValid(msg)) {
return;
}
log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
onTellNext(msg, true);
} else {
onTellNext(msg, envelope.getMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
}
}
private void onTellNext(TbMsg msg, boolean useRuleNodeIdFromMsg) {
try {
checkComponentStateActive(msg);
RuleNodeId targetId = useRuleNodeIdFromMsg ? msg.getRuleNodeId() : null;
RuleNodeCtx targetCtx;
if (targetId == null) {
targetCtx = firstNode;
msg = msg.copyWithRuleChainId(entityId);
} else {
targetCtx = nodeActors.get(targetId);
}
if (targetCtx != null) {
log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId);
pushMsgToNode(targetCtx, msg, NA_RELATION_TYPE);
} else {
log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);
msg.getCallback().onSuccess();
}
} catch (RuleNodeException rne) {
msg.getCallback().onFailure(rne);
} catch (Exception e) {
msg.getCallback().onFailure(new RuleEngineException(e.getMessage()));
}
}
public void onRuleChainInputMsg(RuleChainInputMsg envelope) {
var msg = envelope.getMsg();
if (!checkMsgValid(msg)) {
return;
}
if (entityId.equals(envelope.getRuleChainId())) {
onTellNext(envelope.getMsg(), false);
} else {
parent.tell(envelope);
}
}
public void onRuleChainOutputMsg(RuleChainOutputMsg envelope) {
var msg = envelope.getMsg();
if (!checkMsgValid(msg)) {
return;
}
if (entityId.equals(envelope.getRuleChainId())) {
var originatorNodeId = envelope.getTargetRuleNodeId();
RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
if (ruleNodeCtx != null && ruleNodeCtx.getSelf().isDebugMode()) {
systemContext.persistDebugOutput(tenantId, originatorNodeId, envelope.getMsg(), envelope.getRelationType());
}
onTellNext(envelope.getMsg(), originatorNodeId, Collections.singleton(envelope.getRelationType()), RuleNodeException.UNKNOWN);
} else {
parent.tell(envelope);
}
}
void onRuleChainToRuleChainMsg(RuleChainToRuleChainMsg envelope) {
var msg = envelope.getMsg();
if (!checkMsgValid(msg)) {
return;
}
try {
checkComponentStateActive(envelope.getMsg());
if (firstNode != null) {
pushMsgToNode(firstNode, envelope.getMsg(), envelope.getFromRelationType());
} else {
envelope.getMsg().getCallback().onSuccess();
}
} catch (RuleNodeException e) {
log.debug("Rule Chain is not active. Current state [{}] for processor [{}][{}] tenant [{}]", state, entityId.getEntityType(), entityId, tenantId);
}
}
void onTellNext(RuleNodeToRuleChainTellNextMsg envelope) {
var msg = envelope.getMsg();
if (checkMsgValid(msg)) {
onTellNext(msg, envelope.getOriginator(), envelope.getRelationTypes(), envelope.getFailureMessage());
}
}
private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {
try {
checkComponentStateActive(msg);
EntityId entityId = msg.getOriginator();
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
List<RuleNodeRelation> ruleNodeRelations = nodeRoutes.get(originatorNodeId);
if (ruleNodeRelations == null) { // When unchecked, this will cause NullPointerException when rule node doesn't exist anymore
log.warn("[{}][{}][{}] No outbound relations (null). Probably rule node does not exist. Probably old message.", tenantId, entityId, msg.getId());
ruleNodeRelations = Collections.emptyList();
}
List<RuleNodeRelation> relationsByTypes = ruleNodeRelations.stream()
.filter(r -> contains(relationTypes, r.getType()))
.collect(Collectors.toList());
int relationsCount = relationsByTypes.size();
if (relationsCount == 0) {
log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
if (relationTypes.contains(TbRelationTypes.FAILURE)) {
RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
if (ruleNodeCtx != null) {
msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
} else {
log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
}
} else {
msg.getCallback().onSuccess();
}
} else if (relationsCount == 1) {
for (RuleNodeRelation relation : relationsByTypes) {
log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
pushToTarget(tpi, msg, relation.getOut(), relation.getType());
}
} else {
MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());
log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relationsByTypes);
for (RuleNodeRelation relation : relationsByTypes) {
EntityId target = relation.getOut();
putToQueue(tpi, msg, callbackWrapper, target);
}
}
} catch (RuleNodeException rne) {
msg.getCallback().onFailure(rne);
} catch (Exception e) {
log.warn("[" + tenantId + "]" + "[" + entityId + "]" + "[" + msg.getId() + "]" + " onTellNext failure", e);
msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
}
}
private void putToQueue(TopicPartitionInfo tpi, TbMsg msg, TbQueueCallback callbackWrapper, EntityId target) {
switch (target.getEntityType()) {
case RULE_NODE:
putToQueue(tpi, msg.copyWithRuleNodeId(entityId, new RuleNodeId(target.getId()), UUID.randomUUID()), callbackWrapper);
break;
case RULE_CHAIN:
putToQueue(tpi, msg.copyWithRuleChainId(new RuleChainId(target.getId()), UUID.randomUUID()), callbackWrapper);
break;
}
}
private void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) {
if (tpi.isMyPartition()) {
switch (target.getEntityType()) {
case RULE_NODE:
pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);
break;
case RULE_CHAIN:
parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType));
break;
}
} else {
putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target);
}
}
private void putToQueue(TopicPartitionInfo tpi, TbMsg newMsg, TbQueueCallback callbackWrapper) {
ToRuleEngineMsg toQueueMsg = ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(newMsg))
.build();
clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper);
}
private boolean contains(Set<String> relationTypes, String type) {
if (relationTypes == null) {
return true;
}
for (String relationType : relationTypes) {
if (relationType.equalsIgnoreCase(type)) {
return true;
}
}
return false;
}
private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
if (nodeCtx != null) {
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, ruleChainName, nodeCtx), msg, fromRelationType));
} else {
log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);
msg.getCallback().onFailure(new RuleEngineException("Rule Node CTX is empty"));
}
}
@Override
protected RuleNodeException getInactiveException() {
RuleNode firstRuleNode = firstNode != null ? firstNode.getSelf() : null;
return new RuleNodeException("Rule Chain is not active! Failed to initialize.", ruleChainName, firstRuleNode);
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbMsg;
/**
* Created by ashvayka on 19.03.18.
*/
@EqualsAndHashCode(callSuper = true)
@ToString
public final class RuleChainInputMsg extends TbToRuleChainActorMsg {
public RuleChainInputMsg(RuleChainId target, TbMsg tbMsg) {
super(tbMsg, target);
}
@Override
public MsgType getMsgType() {
return MsgType.RULE_CHAIN_INPUT_MSG;
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.TbEntityTypeActorIdPredicate;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.dao.rule.RuleChainService;
import java.util.function.Function;
/**
* Created by ashvayka on 15.03.18.
*/
@Slf4j
public abstract class RuleChainManagerActor extends ContextAwareActor {
protected final TenantId tenantId;
private final RuleChainService ruleChainService;
@Getter
protected RuleChain rootChain;
@Getter
protected TbActorRef rootChainActor;
public RuleChainManagerActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext);
this.tenantId = tenantId;
this.ruleChainService = systemContext.getRuleChainService();
}
protected void initRuleChains() {
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
RuleChainId ruleChainId = ruleChain.getId();
log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
visit(ruleChain, actorRef);
log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
}
}
protected void destroyRuleChains() {
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.CORE, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
ctx.stop(new TbEntityActorId(ruleChain.getId()));
}
}
protected void visit(RuleChain entity, TbActorRef actorRef) {
if (entity != null && entity.isRoot() && entity.getType().equals(RuleChainType.CORE)) {
rootChain = entity;
rootChainActor = actorRef;
}
}
protected TbActorRef getOrCreateActor(RuleChainId ruleChainId) {
return getOrCreateActor(ruleChainId, eId -> ruleChainService.findRuleChainById(TenantId.SYS_TENANT_ID, eId));
}
protected TbActorRef getOrCreateActor(RuleChainId ruleChainId, Function<RuleChainId, RuleChain> provider) {
return ctx.getOrCreateChildActor(new TbEntityActorId(ruleChainId),
() -> DefaultActorService.RULE_DISPATCHER_NAME,
() -> {
RuleChain ruleChain = provider.apply(ruleChainId);
return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain);
});
}
protected TbActorRef getEntityActorRef(EntityId entityId) {
TbActorRef target = null;
if (entityId.getEntityType() == EntityType.RULE_CHAIN) {
target = getOrCreateActor((RuleChainId) entityId);
}
return target;
}
protected void broadcast(TbActorMsg msg) {
ctx.broadcastToChildren(msg, new TbEntityTypeActorIdPredicate(EntityType.RULE_CHAIN));
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbMsg;
/**
* Created by ashvayka on 19.03.18.
*/
@EqualsAndHashCode(callSuper = true)
@ToString
public final class RuleChainOutputMsg extends TbToRuleChainActorMsg {
@Getter
private final RuleNodeId targetRuleNodeId;
@Getter
private final String relationType;
public RuleChainOutputMsg(RuleChainId target, RuleNodeId targetRuleNodeId, String relationType, TbMsg tbMsg) {
super(tbMsg, target);
this.targetRuleNodeId = targetRuleNodeId;
this.relationType = relationType;
}
@Override
public MsgType getMsgType() {
return MsgType.RULE_CHAIN_OUTPUT_MSG;
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorStopReason;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbRuleEngineActorMsg;
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
/**
* Created by ashvayka on 19.03.18.
*/
@EqualsAndHashCode(callSuper = true)
@ToString
public final class RuleChainToRuleChainMsg extends TbToRuleChainActorMsg {
@Getter
private final RuleChainId source;
@Getter
private final String fromRelationType;
public RuleChainToRuleChainMsg(RuleChainId target, RuleChainId source, TbMsg tbMsg, String fromRelationType) {
super(tbMsg, target);
this.source = source;
this.fromRelationType = fromRelationType;
}
@Override
public MsgType getMsgType() {
return MsgType.RULE_CHAIN_TO_RULE_CHAIN_MSG;
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbMsg;
/**
* Created by ashvayka on 19.03.18.
*/
@EqualsAndHashCode(callSuper = true)
@ToString
final class RuleChainToRuleNodeMsg extends TbToRuleNodeActorMsg {
@Getter
private final String fromRelationType;
public RuleChainToRuleNodeMsg(TbContext ctx, TbMsg tbMsg, String fromRelationType) {
super(ctx, tbMsg);
this.fromRelationType = fromRelationType;
}
@Override
public MsgType getMsgType() {
return MsgType.RULE_CHAIN_TO_RULE_MSG;
}
}
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.ruleChain;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActor;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorId;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.service.ComponentActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
@Slf4j
public class RuleNodeActor extends ComponentActor<RuleNodeId, RuleNodeActorMessageProcessor> {
private final String ruleChainName;
private final RuleChainId ruleChainId;
private final RuleNodeId ruleNodeId;
private RuleNodeActor(ActorSystemContext systemContext, TenantId tenantId, RuleChainId ruleChainId, String ruleChainName, RuleNodeId ruleNodeId) {
super(systemContext, tenantId, ruleNodeId);
this.ruleChainName = ruleChainName;
this.ruleChainId = ruleChainId;
this.ruleNodeId = ruleNodeId;
}
@Override
protected RuleNodeActorMessageProcessor createProcessor(TbActorCtx ctx) {
return new RuleNodeActorMessageProcessor(tenantId, this.ruleChainName, ruleNodeId, systemContext, ctx.getParentRef(), ctx);
}
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case COMPONENT_LIFE_CYCLE_MSG:
case RULE_NODE_UPDATED_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case RULE_CHAIN_TO_RULE_MSG:
onRuleChainToRuleNodeMsg((RuleChainToRuleNodeMsg) msg);
break;
case RULE_TO_SELF_MSG:
onRuleNodeToSelfMsg((RuleNodeToSelfMsg) msg);
break;
case STATS_PERSIST_TICK_MSG:
onStatsPersistTick(id);
break;
case PARTITION_CHANGE_MSG:
onClusterEventMsg((PartitionChangeMsg) msg);
break;
default:
return false;
}
return true;
}
private void onRuleNodeToSelfMsg(RuleNodeToSelfMsg msg) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}][{}] Going to process rule msg: {}", ruleChainId, id, processor.getComponentName(), msg.getMsg());
}
try {
processor.onRuleToSelfMsg(msg);
increaseMessagesProcessedCount();
} catch (Exception e) {
logAndPersist("onRuleMsg", e);
}
}
private void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg envelope) {
TbMsg msg = envelope.getMsg();
if (!msg.isValid()) {
if (log.isTraceEnabled()) {
log.trace("Skip processing of message: {} because it is no longer valid!", msg);
}
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}][{}] Going to process rule engine msg: {}", ruleChainId, id, processor.getComponentName(), msg);
}
try {
processor.onRuleChainToRuleNodeMsg(envelope);
increaseMessagesProcessedCount();
} catch (Exception e) {
logAndPersist("onRuleMsg", e);
}
}
public static class ActorCreator extends ContextBasedCreator {
private final TenantId tenantId;
private final RuleChainId ruleChainId;
private final String ruleChainName;
private final RuleNodeId ruleNodeId;
public ActorCreator(ActorSystemContext context, TenantId tenantId, RuleChainId ruleChainId, String ruleChainName, RuleNodeId ruleNodeId) {
super(context);
this.tenantId = tenantId;
this.ruleChainId = ruleChainId;
this.ruleChainName = ruleChainName;
this.ruleNodeId = ruleNodeId;
}
@Override
public TbActorId createActorId() {
return new TbEntityActorId(ruleNodeId);
}
@Override
public TbActor createActor() {
return new RuleNodeActor(context, tenantId, ruleChainId, ruleChainName, ruleNodeId);
}
}
@Override
protected long getErrorPersistFrequency() {
return systemContext.getRuleNodeErrorPersistFrequency();
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment