package com.sankuai.sjst.lmq.broker;

import com.dianping.cat.message.Transaction;
import com.sankuai.ng.common.network.b;
import com.sankuai.ng.common.network.g;
import com.sankuai.ng.common.network.provider.AbstractConfigProvider;
import com.sankuai.ng.commonutils.e;
import com.sankuai.rmslocalserver.lsvirtual.sdk.a;
import com.sankuai.sjst.lmq.base.net.UrlUniqueKey;
import com.sankuai.sjst.lmq.broker.bean.TaskBean;
import com.sankuai.sjst.lmq.broker.channel.WebSocketMessageChannel;
import com.sankuai.sjst.lmq.broker.dao.MsgDao;
import com.sankuai.sjst.lmq.broker.dao.TaskDao;
import com.sankuai.sjst.lmq.broker.handlers.DispatchTaskHandler;
import com.sankuai.sjst.lmq.broker.manager.ChannelManager;
import com.sankuai.sjst.lmq.broker.manager.SubscriptionManager;
import com.sankuai.sjst.lmq.broker.manager.TaskManager;
import com.sankuai.sjst.lmq.broker.processor.BatchSendProcessor;
import com.sankuai.sjst.lmq.broker.processor.RetryAndCleanProcessor;
import com.sankuai.sjst.lmq.broker.processor.RetryTask;
import com.sankuai.sjst.lmq.broker.processor.StorageClearTask;
import com.sankuai.sjst.lmq.common.ILmqProducerEnvironment;
import com.sankuai.sjst.lmq.common.bean.LmqMessage;
import com.sankuai.sjst.lmq.common.queue.MessageQueue;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import org.slf4j.c;
import org.slf4j.d;

/* loaded from: classes9.dex */
public class LmqBroker {
    private static final c log = d.a((Class<?>) LmqBroker.class);
    private BatchSendProcessor batchSendProcessor;
    private ChannelManager channelManager;
    private ILmqProducerEnvironment env;
    private MessageQueue<LmqMessage> messageQueue;
    private MsgDao msgDao;
    private RetryAndCleanProcessor retryAndCleanProcessor;
    private volatile boolean running;
    private SubscriptionManager subscriptionManager;
    private TaskDao taskDao;
    private DispatchTaskHandler taskDispatcher;
    private TaskManager taskManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes9.dex */
    public static final class HOLDER {
        private static final LmqBroker INSTANCE = new LmqBroker();

        private HOLDER() {
        }
    }

    private LmqBroker() {
    }

    private void closeBatchSendProcessor() {
        if (this.batchSendProcessor != null) {
            this.batchSendProcessor.close();
            this.batchSendProcessor = null;
        }
    }

    private void closeChannelManager() {
        if (this.channelManager != null) {
            this.channelManager.close();
            this.channelManager = null;
        }
    }

    private void closeRetryAndCleanProcessor() {
        if (this.retryAndCleanProcessor != null) {
            this.retryAndCleanProcessor.close();
            this.retryAndCleanProcessor = null;
        }
    }

    public static LmqBroker getInstance() {
        return HOLDER.INSTANCE;
    }

    private void initChannelManager() {
        this.channelManager = new ChannelManager();
        this.channelManager.registerChannel(new WebSocketMessageChannel());
        this.channelManager.init(this.env);
    }

    private void initDB() throws Exception {
        DBHelper dBHelper = DBHelper.getInstance();
        dBHelper.init(this.env.getConnectionSource());
        this.msgDao = dBHelper.getMsgDao();
        this.taskDao = dBHelper.getTaskDao();
    }

    private void initNetwork() {
        AbstractConfigProvider apiConfigProvider = this.env.getApiConfigProvider();
        if (apiConfigProvider == null) {
            throw new RuntimeException("init LmqBroker error! api config provider is null!");
        }
        g.a(UrlUniqueKey.KEY_LMQ, new b(apiConfigProvider));
    }

    private void initProcessor() {
        this.batchSendProcessor = new BatchSendProcessor(this);
        this.batchSendProcessor.start();
        this.retryAndCleanProcessor = new RetryAndCleanProcessor();
        this.retryAndCleanProcessor.addTask(new RetryTask(this));
        this.retryAndCleanProcessor.addTask(new StorageClearTask(this.taskManager));
        this.retryAndCleanProcessor.start();
    }

    private void initQueue() {
        this.messageQueue = new MessageQueue<>(this.env.getBatchSize(), this.env.getMsgQueueSize());
    }

    private void initStructure() throws Exception {
        initQueue();
        initNetwork();
        initDB();
        initSubscriptionManager();
        initChannelManager();
        initTaskDispatcher();
        initTaskManager();
    }

    private void initSubscriptionManager() throws SQLException {
        SubscriptionManager.getInstance().init(this.env);
        this.subscriptionManager = SubscriptionManager.getInstance();
    }

    private void initTaskDispatcher() {
        this.taskDispatcher = new DispatchTaskHandler(this.channelManager);
    }

    private void initTaskManager() {
        this.taskManager = new TaskManager(this.env, this.channelManager, this.subscriptionManager, this.msgDao, this.taskDao);
        this.channelManager.registerTaskListener(this.taskManager.taskListener());
    }

    private void reportSize(int i) {
        log.info("retry size:{}", Integer.valueOf(i));
        Transaction a = a.a("LS_LMQ", "retryTaskSize");
        a.setDurationInMicros(i);
        a.complete();
    }

    public void close() {
        this.running = false;
        closeBatchSendProcessor();
        closeRetryAndCleanProcessor();
        closeChannelManager();
    }

    public MessageQueue<LmqMessage> connect() {
        return this.messageQueue;
    }

    public BatchSendProcessor getBatchSendProcessor() {
        return this.batchSendProcessor;
    }

    public ChannelManager getChannelManager() {
        return this.channelManager;
    }

    public ILmqProducerEnvironment getEnv() {
        return this.env;
    }

    public MessageQueue<LmqMessage> getMessageQueue() {
        return this.messageQueue;
    }

    public MsgDao getMsgDao() {
        return this.msgDao;
    }

    public RetryAndCleanProcessor getRetryAndCleanProcessor() {
        return this.retryAndCleanProcessor;
    }

    public SubscriptionManager getSubscriptionManager() {
        return this.subscriptionManager;
    }

    public TaskDao getTaskDao() {
        return this.taskDao;
    }

    public DispatchTaskHandler getTaskDispatcher() {
        return this.taskDispatcher;
    }

    public TaskManager getTaskManager() {
        return this.taskManager;
    }

    public void init(ILmqProducerEnvironment iLmqProducerEnvironment) throws Exception {
        if (this.running) {
            throw new RuntimeException("LmqBroker is running!");
        }
        this.env = iLmqProducerEnvironment;
        initStructure();
        initProcessor();
        this.running = true;
    }

    public boolean isRunning() {
        return this.running;
    }

    public void resend() {
        List<TaskBean> loadReSendTasks = this.taskManager.loadReSendTasks();
        if (e.a((Collection) loadReSendTasks)) {
            return;
        }
        log.info("resend bean, size {}, detail {}", Integer.valueOf(loadReSendTasks.size()), loadReSendTasks);
        reportSize(loadReSendTasks.size());
        this.taskDispatcher.handle(loadReSendTasks);
    }

    public void send(List<LmqMessage> list) {
        List<TaskBean> createTasks = this.taskManager.createTasks(list);
        log.info("ls message queue, msg size:{}; task size:{}", Integer.valueOf(list.size()), Integer.valueOf(createTasks.size()));
        this.taskDispatcher.handle(createTasks);
    }
}
