package com.sankuai.sjst.rms.ls.push;

import com.sankuai.sjst.local.server.utils.CollectionUtils;
import com.sankuai.sjst.local.server.utils.DateUtils;
import com.sankuai.sjst.local.server.utils.ThreadUtil;
import com.sankuai.sjst.local.server.xm.UserInfo;
import com.sankuai.sjst.rms.ls.common.constant.AppCodeEnum;
import com.sankuai.sjst.rms.ls.common.context.MasterPosContext;
import com.sankuai.sjst.rms.ls.common.event.EventListener;
import com.sankuai.sjst.rms.ls.common.event.IEventService;
import com.sankuai.sjst.rms.ls.common.msg.constants.DeviceType;
import com.sankuai.sjst.rms.ls.common.push.AckMessage;
import com.sankuai.sjst.rms.ls.common.push.DeviceConnectStatus;
import com.sankuai.sjst.rms.ls.common.push.Message;
import com.sankuai.sjst.rms.ls.common.push.RetryType;
import com.sankuai.sjst.rms.ls.common.thirdparty.ThirdPartyConfig;
import com.sankuai.sjst.rms.ls.push.constant.PushCatCommand;
import com.sankuai.sjst.rms.ls.push.constant.PushChannelEnum;
import com.sankuai.sjst.rms.ls.push.db.MessageDao;
import com.sankuai.sjst.rms.ls.push.db.MessageRecord;
import com.sankuai.sjst.rms.ls.push.event.ConnectedEvent;
import com.sankuai.sjst.rms.ls.push.util.PushCatUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.c;
import org.slf4j.d;

@Singleton
/* loaded from: classes10.dex */
public class PushServer implements AckHandler {

    @Inject
    public IEventService eventService;

    @Inject
    public MessageDao messageDao;

    @Inject
    public LsPikeIotSocketServer pikeIotSocketServer;

    @Inject
    public ReverseWebSocketServer reverseWebSocketServer;

    @Inject
    public LsWebSocketServer webSocketServer;

    @Inject
    public LsXmSocketServer xmSocketServer;
    private static final c log = d.a((Class<?>) PushServer.class);
    private static Map<String, MessageRecord> cache = new ConcurrentHashMap();
    private volatile boolean isStarted = false;
    private volatile boolean init = false;
    private List<AbstractPushServer> pushServers = new ArrayList();

    @Inject
    public PushServer() {
    }

    private MessageRecord addRetryCache(Message message, PushClient pushClient) {
        MessageRecord messageRecord = null;
        if (pushClient == null) {
            log.warn("cannot send cause pushClient is null, send ignore");
        } else if (canSend(message, pushClient)) {
            messageRecord = new MessageRecord(message, pushClient);
            if (RetryType.ONCE != message.getRetryType()) {
                cache.put(messageRecord.getMsgId(), messageRecord);
                if (RetryType.DATABASE == message.getRetryType()) {
                    this.messageDao.save(messageRecord);
                }
            }
        } else {
            log.info("cannot send to self, send ignore");
        }
        return messageRecord;
    }

    private boolean canSend(Message message, PushClient pushClient) {
        if (!message.isIncludeSelf() && message.getFromDeviceId() == pushClient.getDeviceId()) {
            return (message.getFromDeviceType() == 0 || message.getFromDeviceType() == pushClient.getDeviceType()) ? false : true;
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void refresh(Integer num) {
        if (this.isStarted && !CollectionUtils.isEmpty(cache)) {
            long time = DateUtils.getTime();
            Iterator<Map.Entry<String, MessageRecord>> it = cache.entrySet().iterator();
            while (it.hasNext()) {
                MessageRecord value = it.next().getValue();
                if (value != null && (num == null || value.getTargetDeviceId() == num.intValue())) {
                    if (value.getExpireTime() < time) {
                        it.remove();
                        this.messageDao.deleteById(value.getMsgId());
                    } else {
                        for (AbstractPushServer abstractPushServer : this.pushServers) {
                            Message message = value.getMessage();
                            PushCatUtil.markSendInfo(message, value);
                            abstractPushServer.send(message, value.getPoiId(), value.getTargetDeviceId(), value.getTargetDeviceType());
                            PushCatUtil.report(PushCatCommand.SEND_MSG, message.getCatExtraData(value.getAppCode(), abstractPushServer.getChannelType().getName()));
                        }
                    }
                }
            }
        }
    }

    private void send(AbstractPushServer abstractPushServer, Message message, Set<Integer> set, Integer num) {
        if (set.contains(Integer.valueOf(DeviceType.KDS_TV.getType())) && (abstractPushServer instanceof ReverseWebSocketServer)) {
            Iterator<PushClient> it = ((ReverseWebSocketServer) abstractPushServer).queryConnectByDeviceType(set).iterator();
            while (it.hasNext()) {
                send(message, abstractPushServer, it.next());
            }
            return;
        }
        Set<PushClient> pushClientsById = abstractPushServer.getPushClientsById(message.getPoiId(), num.intValue());
        if (CollectionUtils.isEmpty(pushClientsById)) {
            log.info("[PUSH] 目标设备 {} 客户端链接不存在。 消息原始id：{}，通道：{}", num, message.getOriginMsgId(), abstractPushServer.getChannelType().getName());
            return;
        }
        for (PushClient pushClient : pushClientsById) {
            if (set.contains(Integer.valueOf(pushClient.getDeviceType()))) {
                send(message, abstractPushServer, pushClient);
            }
        }
    }

    private boolean send(Message message, AbstractPushServer abstractPushServer, PushClient pushClient) {
        MessageRecord addRetryCache = addRetryCache(message, pushClient);
        if (addRetryCache == null) {
            return false;
        }
        Message message2 = addRetryCache.getMessage();
        PushCatUtil.markSendInfo(message2, addRetryCache);
        abstractPushServer.send(message2, pushClient.getClient(), pushClient);
        PushCatUtil.report(PushCatCommand.SEND_MSG, message2.getCatExtraData(pushClient.getAppCode(), abstractPushServer.getChannelType().getName()));
        return true;
    }

    private void sendByAppCodes(Message message, List<Integer> list) {
        for (AbstractPushServer abstractPushServer : this.pushServers) {
            if (abstractPushServer instanceof LsWebSocketServer) {
                List pushClientsByCodes = abstractPushServer.getPushClientsByCodes(list);
                if (CollectionUtils.isEmpty(pushClientsByCodes)) {
                    return;
                }
                Iterator it = pushClientsByCodes.iterator();
                while (it.hasNext()) {
                    send(message, abstractPushServer, (PushClient) it.next());
                }
            }
        }
    }

    private void sendByIdAndType(Message message) {
        Set<Integer> parseDeviceType = message.parseDeviceType();
        for (AbstractPushServer abstractPushServer : this.pushServers) {
            if (CollectionUtils.isNotEmpty(message.getTargetDeviceIds())) {
                Iterator<Integer> it = message.getTargetDeviceIds().iterator();
                while (it.hasNext()) {
                    send(abstractPushServer, message, parseDeviceType, it.next());
                }
            } else {
                send(abstractPushServer, message, parseDeviceType, Integer.valueOf(message.getTargetDeviceId()));
            }
        }
    }

    private void sendByType(Message message) {
        Set<Integer> deviceTypes = DeviceType.getDeviceTypes(message.getTargetDeviceType());
        if (CollectionUtils.isEmpty(deviceTypes)) {
            log.info("[PUSH] 根据类型推送消息失败，原因为：未解析到设备类型。待解析类型：{}, 消息原始id：{}", Integer.valueOf(message.getTargetDeviceType()), message.getOriginMsgId());
            return;
        }
        if (deviceTypes.contains(Integer.valueOf(DeviceType.KDS_TV.getType()))) {
            Iterator<PushClient> it = this.reverseWebSocketServer.queryConnectByDeviceType(deviceTypes).iterator();
            while (it.hasNext()) {
                send(message, this.reverseWebSocketServer, it.next());
            }
        }
        for (AbstractPushServer abstractPushServer : this.pushServers) {
            Iterator<Integer> it2 = deviceTypes.iterator();
            while (it2.hasNext()) {
                Set<PushClient> pushClientsByType = abstractPushServer.getPushClientsByType(message.getPoiId(), it2.next().intValue());
                if (CollectionUtils.isNotEmpty(pushClientsByType)) {
                    Iterator<PushClient> it3 = pushClientsByType.iterator();
                    while (it3.hasNext()) {
                        send(message, abstractPushServer, it3.next());
                    }
                }
            }
        }
    }

    @Override // com.sankuai.sjst.rms.ls.push.AckHandler
    public void ackMessage(AckMessage ackMessage, PushChannelEnum pushChannelEnum) {
        MessageRecord remove = cache.remove(ackMessage.getMsgId());
        if (remove == null) {
            return;
        }
        PushCatUtil.report(PushCatCommand.ACK_MSG, (int) (DateUtils.getTime() - remove.getStartSendTime()), remove.getMessage().getCatExtraData(remove.getAppCode(), pushChannelEnum.getName()));
        if (RetryType.DATABASE == remove.getMessage().getRetryType()) {
            this.messageDao.deleteById(ackMessage.getMsgId());
        }
        log.info("[PUSH]push msg current size={}", Integer.valueOf(cache.size()));
    }

    public synchronized void clearCache() {
        cache.clear();
    }

    public Map<PushDeviceKey, String> getActiveIps() {
        HashMap hashMap = new HashMap();
        Iterator<AbstractPushServer> it = this.pushServers.iterator();
        while (it.hasNext()) {
            hashMap.putAll(it.next().getPoiConnection(MasterPosContext.getPoiId()).deviceIpMap);
        }
        return hashMap;
    }

    public DeviceConnectStatus getThirdPartyConnectStatus(int i) {
        if (AppCodeEnum.SHUTUO_KDS_TY.getCode() == i || AppCodeEnum.YUNXI_SHUTUO_KDS_TV.getCode() == i) {
            return this.pushServers.get(0).getThirdPartyConnectStatus(i);
        }
        log.error("PushServer#getThirdPartyConnectStatus error, appCode not third party. appCode:{}", Integer.valueOf(i));
        return null;
    }

    public UserInfo getXmUser(int i, int i2, int i3) {
        return this.xmSocketServer.getClient(i, i2, i3);
    }

    public boolean haveConnectedDevice(int i, Set<Integer> set) {
        if (CollectionUtils.isEmpty(set)) {
            return false;
        }
        for (AbstractPushServer abstractPushServer : this.pushServers) {
            Iterator<Integer> it = set.iterator();
            while (it.hasNext()) {
                if (abstractPushServer.havePushClient(i, it.next().intValue())) {
                    return true;
                }
            }
        }
        return false;
    }

    public synchronized void init() {
        if (!this.init) {
            this.pushServers.add(this.webSocketServer);
            this.pushServers.add(this.xmSocketServer);
            this.pushServers.add(this.pikeIotSocketServer);
            this.pushServers.add(this.reverseWebSocketServer);
            this.webSocketServer.init();
            this.webSocketServer.setAckHandler(this);
            this.xmSocketServer.init();
            this.xmSocketServer.setAckHandler(this);
            this.pikeIotSocketServer.init();
            this.pikeIotSocketServer.setAckHandler(this);
            this.reverseWebSocketServer.init();
            this.reverseWebSocketServer.setAckHandler(this);
            this.eventService.addEventListener(ConnectedEvent.class, new EventListener<ConnectedEvent>() { // from class: com.sankuai.sjst.rms.ls.push.PushServer.1
                @Override // com.sankuai.sjst.rms.ls.common.event.EventListener
                public void onEvent(ConnectedEvent connectedEvent) {
                    PushServer.this.refresh(connectedEvent.getDeviceId());
                }
            });
            this.init = true;
        }
    }

    public boolean isInit() {
        return this.init;
    }

    public boolean isStarted() {
        return this.isStarted;
    }

    public void load() {
        List<MessageRecord> queryAll = this.messageDao.queryAll();
        log.info("[PUSH]load messages from db is {}", queryAll);
        if (CollectionUtils.isEmpty(queryAll)) {
            return;
        }
        for (MessageRecord messageRecord : queryAll) {
            cache.put(messageRecord.getMsgId(), messageRecord);
        }
        refresh(null);
    }

    public void reset() {
        Iterator<AbstractPushServer> it = this.pushServers.iterator();
        while (it.hasNext()) {
            it.next().reset();
        }
    }

    public void send(Message message) {
        if (!this.isStarted) {
            log.warn("[PUSH] 推送服务未启动, 忽略本次消息推送。消息原始id：{}", message.getOriginMsgId());
            return;
        }
        List<Integer> appCodesByMsgType = ThirdPartyConfig.getAppCodesByMsgType(message.getMsgType());
        if (CollectionUtils.isNotEmpty(appCodesByMsgType)) {
            sendByAppCodes(message, appCodesByMsgType);
        }
        if (CollectionUtils.isNotEmpty(message.getTargetDeviceIds())) {
            sendByIdAndType(message);
        } else {
            sendByType(message);
        }
    }

    public void send(Message message, PushDeviceKey pushDeviceKey) {
        boolean z = false;
        for (AbstractPushServer abstractPushServer : this.pushServers) {
            PushClient pushClient = abstractPushServer.getPushClient(message.getPoiId(), pushDeviceKey);
            if (pushClient == null) {
                log.info("[PUSH] 根据设备key推送消息，未获取客户端；消息原始id：{}, 设备key：{}, 通道类型：{}", message.getOriginMsgId(), pushDeviceKey, abstractPushServer.getChannelType().getName());
            } else {
                z = send(message, abstractPushServer, pushClient);
            }
        }
        if (z || addRetryCache(message, PushClient.builder().deviceId(pushDeviceKey.getDeviceId().intValue()).deviceType(pushDeviceKey.getDeviceType().intValue()).build()) == null) {
            return;
        }
        log.info("[PUSH] 消息通道不存在客户端当前客户端 {}，消息放入缓存，定时重试。消息原始id：{}", pushDeviceKey, message.getOriginMsgId());
    }

    public synchronized void start() {
        if (!this.isStarted) {
            Iterator<AbstractPushServer> it = this.pushServers.iterator();
            while (it.hasNext()) {
                it.next().start();
            }
            ThreadUtil.getScheduledThreadPool().scheduleAtFixedRate(new Runnable() { // from class: com.sankuai.sjst.rms.ls.push.PushServer.2
                @Override // java.lang.Runnable
                public void run() {
                    PushServer.this.refresh(null);
                }
            }, 0L, 1L, TimeUnit.MINUTES);
            this.isStarted = true;
        }
    }

    public synchronized void stop() {
        if (this.isStarted) {
            Iterator<AbstractPushServer> it = this.pushServers.iterator();
            while (it.hasNext()) {
                it.next().stop();
            }
            this.isStarted = false;
        }
    }
}
