package com.sankuai.sjst.lmq.consumer.channel.pike;

import com.sankuai.ng.common.log.l;
import com.sankuai.sjst.lmq.base.pike.IPikeIoTClient;
import com.sankuai.sjst.lmq.base.pike.IPikeIoTHandler;
import com.sankuai.sjst.lmq.base.pike.PikeConnectContext;
import com.sankuai.sjst.lmq.common.bean.ConnectContext;
import com.sankuai.sjst.lmq.common.bean.PackAck;
import com.sankuai.sjst.lmq.common.bean.PackMessage;
import com.sankuai.sjst.lmq.common.bean.control.Control;
import com.sankuai.sjst.lmq.common.bean.control.ControlType;
import com.sankuai.sjst.lmq.common.bean.pike.PikeRequest;
import com.sankuai.sjst.lmq.common.constant.RMSConstant;
import com.sankuai.sjst.lmq.common.pikeIoT.PikeIoTTopicUtils;
import com.sankuai.sjst.lmq.common.utils.JsonUtils;
import com.sankuai.sjst.lmq.consumer.channel.BaseChannel;
import com.sankuai.sjst.lmq.consumer.env.ConsumerEnvironment;
import com.sankuai.sjst.lmq.pike.PikeIoTClient;
import java.util.ArrayList;
import java.util.List;

/* loaded from: classes4.dex */
public class PikeChannel extends BaseChannel {
    private static final List<String> TOPICS = new ArrayList();
    private PikeReconnectProcessor reconnectProcessor;
    private final PikeContext context = new PikeContext();
    private final IPikeIoTClient client = new PikeIoTClient();

    /* loaded from: classes4.dex */
    private class PikeIoTHandler implements IPikeIoTHandler {
        private PikeIoTHandler() {
        }

        @Override // com.sankuai.sjst.lmq.base.pike.IPikeIoTConnectionHandler
        public void onConnectionLost() {
        }

        @Override // com.sankuai.sjst.lmq.base.pike.IPikeIoTHandler
        public void onMessage(String str, byte[] bArr) {
            if (!PikeIoTTopicUtils.MESSAGE_TOPIC_2.equals(str)) {
                l.c(RMSConstant.LOG_TAG, "pike client receive messages from unexpected topic: {}" + str);
                return;
            }
            PackMessage packMessage = (PackMessage) JsonUtils.jsonByte2T(bArr, PackMessage.class);
            l.b(RMSConstant.LOG_TAG, "lmq pike receive message : " + packMessage);
            PikeChannel.this.onMessage(packMessage);
        }
    }

    static {
        TOPICS.add(PikeIoTTopicUtils.MESSAGE_TOPIC_2);
    }

    public PikeChannel() {
        this.client.init(new PikeIoTHandler());
    }

    private void startReconnectProcessor() {
        this.reconnectProcessor = new PikeReconnectProcessor(this);
        this.reconnectProcessor.start();
    }

    private void stopReconnectProcessor() {
        if (this.reconnectProcessor != null) {
            this.reconnectProcessor.close();
        }
    }

    @Override // com.sankuai.sjst.lmq.consumer.channel.BaseChannel
    public void ack(PackAck packAck) {
        PikeRequest pikeRequest = new PikeRequest();
        pikeRequest.setPayload(JsonUtils.toJson(packAck));
        pikeRequest.setPoiId(getEnv().getPoiId());
        pikeRequest.setToClientId(this.context.getLsClientId());
        try {
            this.client.publish(PikeIoTTopicUtils.ACK_TOPIC_2, JsonUtils.t2JsonByte(pikeRequest));
        } catch (Exception e) {
            l.d(RMSConstant.LOG_TAG, String.format("Failed to publish message to PikeIoT, packMessage=%s", packAck), e);
        }
    }

    @Override // com.sankuai.sjst.lmq.consumer.channel.BaseChannel
    public boolean available() {
        return this.client.isConnected();
    }

    @Override // com.sankuai.sjst.lmq.consumer.channel.BaseChannel
    public synchronized void close() {
        stopReconnectProcessor();
        this.client.close();
        super.close();
    }

    @Override // com.sankuai.sjst.lmq.consumer.channel.BaseChannel
    public String getChannelName() {
        return "pike";
    }

    @Override // com.sankuai.sjst.lmq.consumer.channel.BaseChannel
    public synchronized void init(ConsumerEnvironment consumerEnvironment) throws Exception {
        super.init(consumerEnvironment);
        this.context.reset();
        this.client.connect(PikeConnectContext.create(consumerEnvironment, TOPICS));
        startReconnectProcessor();
    }

    @Override // com.sankuai.sjst.lmq.consumer.channel.BaseChannel
    public int priority() {
        return 2;
    }

    @Override // com.sankuai.sjst.lmq.consumer.channel.BaseChannel
    public void pull(Control control) {
        PikeRequest pikeRequest = new PikeRequest();
        pikeRequest.setPayload(JsonUtils.toJson(control));
        pikeRequest.setPoiId(getEnv().getPoiId());
        pikeRequest.setToClientId(this.context.getLsClientId());
        pikeRequest.setControlType(ControlType.PULL);
        try {
            this.client.publish(PikeIoTTopicUtils.CONTROL_TOPIC_2, JsonUtils.t2JsonByte(pikeRequest));
        } catch (Exception e) {
            l.d(RMSConstant.LOG_TAG, String.format("Failed to publish message to PikeIoT, control=%s", control), e);
        }
    }

    public synchronized void reconnect() throws Exception {
        if (!isInit()) {
            l.c(RMSConstant.LOG_TAG, "pike channel has not been initialized");
        } else {
            this.client.close();
            this.client.connect(PikeConnectContext.create(getEnv(), TOPICS));
        }
    }

    @Override // com.sankuai.sjst.lmq.consumer.channel.BaseChannel
    public void refresh(ConnectContext connectContext) {
        this.context.refresh(connectContext);
    }
}
