package com.sankuai.sjst.lmq.broker.channel;

import com.corundumstudio.socketio.HandshakeData;
import com.corundumstudio.socketio.a;
import com.corundumstudio.socketio.g;
import com.corundumstudio.socketio.listener.b;
import com.corundumstudio.socketio.listener.e;
import com.corundumstudio.socketio.o;
import com.corundumstudio.socketio.protocol.f;
import com.corundumstudio.socketio.q;
import com.google.gson.Gson;
import com.sankuai.ng.commonutils.z;
import com.sankuai.sjst.lmq.common.ILmqProducerEnvironment;
import com.sankuai.sjst.lmq.common.bean.PackAck;
import com.sankuai.sjst.lmq.common.bean.PackMessage;
import com.sankuai.sjst.lmq.common.event.ChannelEvent;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.c;
import org.slf4j.d;

/* loaded from: classes9.dex */
public class WebSocketMessageChannel extends BaseMessageChannel {
    private static final c log = d.a((Class<?>) WebSocketMessageChannel.class);
    private final ConcurrentHashMap<String, o> clientMap = new ConcurrentHashMap<>();
    private q server;

    private g createConfiguration(ILmqProducerEnvironment iLmqProducerEnvironment) {
        g gVar = new g();
        gVar.setPort(iLmqProducerEnvironment.getPort());
        gVar.setJsonSupport(new f());
        gVar.getSocketConfig().c(true);
        gVar.getSocketConfig().b(true);
        gVar.setAuthorizationListener(new com.corundumstudio.socketio.c() { // from class: com.sankuai.sjst.lmq.broker.channel.WebSocketMessageChannel.5
            @Override // com.corundumstudio.socketio.c
            public boolean isAuthorized(HandshakeData handshakeData) {
                if (!z.a((CharSequence) handshakeData.getSingleUrlParam("clientId")) && !z.a((CharSequence) handshakeData.getSingleUrlParam("subscriber"))) {
                    return true;
                }
                WebSocketMessageChannel.log.info("HandshakeData:{} and isAuthorized is false", handshakeData.getUrlParams());
                return false;
            }
        });
        return gVar;
    }

    @Override // com.sankuai.sjst.lmq.broker.channel.BaseMessageChannel
    public void close() {
        if (this.server != null) {
            this.server.stop();
            this.server = null;
        }
        super.close();
    }

    @Override // com.sankuai.sjst.lmq.broker.channel.BaseMessageChannel
    protected void doBatchTask(final String str, final PackMessage packMessage, final long j) {
        o oVar = this.clientMap.get(str);
        if (oVar == null) {
            log.error("connection is unavailable:{}", str);
        } else {
            oVar.a("message", new a<Object>(Object.class) { // from class: com.sankuai.sjst.lmq.broker.channel.WebSocketMessageChannel.1
                @Override // com.corundumstudio.socketio.a
                public void onSuccess(Object obj) {
                    WebSocketMessageChannel.log.info("onSuccess! clientId= {}", str);
                    WebSocketMessageChannel.this.onBatchTaskSuccess(packMessage, j);
                }

                @Override // com.corundumstudio.socketio.a
                public void onTimeout() {
                    WebSocketMessageChannel.log.error("onTimeout clientId={}", str);
                    WebSocketMessageChannel.this.onBatchTaskTimeout(packMessage, j);
                }
            }, new Gson().toJson(packMessage));
        }
    }

    @Override // com.sankuai.sjst.lmq.broker.channel.BaseMessageChannel
    public String getChannelName() {
        return "websocket";
    }

    @Override // com.sankuai.sjst.lmq.broker.channel.BaseMessageChannel
    public void init(ILmqProducerEnvironment iLmqProducerEnvironment) {
        super.init(iLmqProducerEnvironment);
        this.server = new q(createConfiguration(iLmqProducerEnvironment));
        this.server.addConnectListener(new b() { // from class: com.sankuai.sjst.lmq.broker.channel.WebSocketMessageChannel.2
            @Override // com.corundumstudio.socketio.listener.b
            public void onConnect(o oVar) {
                HandshakeData a = oVar.a();
                Map<String, List<String>> headers = a.getHeaders();
                String singleUrlParam = a.getSingleUrlParam("subscriber");
                String singleUrlParam2 = a.getSingleUrlParam("clientId");
                WebSocketMessageChannel.log.info("[PUSH] @onConnect 开始建立长连接 url:{}, ip: {}, sessionId:{}", a.getUrl(), a.getAddress().getAddress().getHostAddress(), oVar.e().toString());
                WebSocketMessageChannel.log.info("receive new connection:{} ,subscriber:{} ,clientId:{}", headers, singleUrlParam, singleUrlParam2);
                WebSocketMessageChannel.this.clientMap.put(singleUrlParam2, oVar);
                WebSocketMessageChannel.this.registerSubscribeId(singleUrlParam, singleUrlParam2);
            }
        });
        this.server.addDisconnectListener(new e() { // from class: com.sankuai.sjst.lmq.broker.channel.WebSocketMessageChannel.3
            @Override // com.corundumstudio.socketio.listener.e
            public void onDisconnect(o oVar) {
                HandshakeData a = oVar.a();
                Map<String, List<String>> headers = a.getHeaders();
                String singleUrlParam = a.getSingleUrlParam("subscriber");
                String singleUrlParam2 = a.getSingleUrlParam("clientId");
                WebSocketMessageChannel.log.info("[PUSH] @onDisonnect 开始断开长连接 url:{}, ip: {}, sessionId:{}", a.getUrl(), a.getAddress().getAddress().getHostAddress(), oVar.e().toString());
                WebSocketMessageChannel.log.info("disconnect connection:{} ,subscriber:{} ,clientId:{}", headers, singleUrlParam, singleUrlParam2);
                WebSocketMessageChannel.this.unregisterSubscriberId(singleUrlParam, singleUrlParam2);
                WebSocketMessageChannel.this.clientMap.remove(singleUrlParam2);
            }
        });
        this.server.addEventListener(ChannelEvent.ack, PackAck.class, new com.corundumstudio.socketio.listener.c<PackAck>() { // from class: com.sankuai.sjst.lmq.broker.channel.WebSocketMessageChannel.4
            @Override // com.corundumstudio.socketio.listener.c
            public void onData(o oVar, PackAck packAck, com.corundumstudio.socketio.b bVar) {
                WebSocketMessageChannel.this.onAck(packAck);
            }
        });
        this.server.start();
    }
}
