package com.edu.classroom.message.repo.datasource;

import androidx.lifecycle.LiveData;
import androidx.lifecycle.ab;
import com.edu.classroom.base.network.h;
import com.edu.classroom.base.settings.q;
import com.edu.classroom.message.MsgFetchException;
import com.edu.classroom.message.MsgParseException;
import com.edu.classroom.message.MsgPersistException;
import com.edu.classroom.message.NoStatusMsgException;
import com.edu.classroom.message.k;
import com.edu.classroom.message.repo.fetcher.f;
import com.squareup.wire.ProtoReader;
import edu.classroom.channel.ChannelMessage;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Single;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.o;
import java.io.InputStream;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import javax.inject.Inject;
import javax.inject.Named;
import kotlin.Metadata;
import kotlin.e;
import kotlin.jvm.internal.Ref;
import kotlin.jvm.internal.t;
import okio.Okio;
import org.json.JSONObject;

@Metadata
/* loaded from: classes4.dex */
public final class b implements a {

    /* renamed from: a, reason: collision with root package name */
    @Inject
    public com.edu.classroom.message.repo.b.a.d f24036a;

    /* renamed from: b, reason: collision with root package name */
    @Inject
    public com.edu.classroom.message.repo.b.a.b f24037b;

    /* renamed from: c, reason: collision with root package name */
    @Inject
    public com.edu.classroom.message.repo.fetcher.c f24038c;

    @Inject
    public h d;

    @Inject
    public Set<k> e;
    private final String f;
    private final String g;
    private final f h;
    private final ab<Boolean> i;
    private final LiveData<Boolean> j;
    private final kotlin.d k;
    private com.edu.classroom.message.repo.a.c l;
    private com.edu.classroom.message.repo.a.b m;

    @Inject
    public b(@Named String roomId, @Named String userId, f messageNetworkFetcher) {
        t.d(roomId, "roomId");
        t.d(userId, "userId");
        t.d(messageNetworkFetcher, "messageNetworkFetcher");
        this.f = roomId;
        this.g = userId;
        this.h = messageNetworkFetcher;
        ab<Boolean> abVar = new ab<>();
        abVar.b((ab<Boolean>) Boolean.valueOf(!q.f22821a.b().getClassroomPlaybackSettings().a()));
        kotlin.t tVar = kotlin.t.f36712a;
        this.i = abVar;
        this.j = abVar;
        this.k = e.a(new kotlin.jvm.a.a<com.edu.classroom.message.repo.a.d>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$cache$2
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(0);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // kotlin.jvm.a.a
            public final com.edu.classroom.message.repo.a.d invoke() {
                String str;
                String str2;
                str = b.this.f;
                str2 = b.this.g;
                return new com.edu.classroom.message.repo.a.d(str, str2, b.this.d(), new LinkedBlockingQueue());
            }
        });
    }

    private final com.edu.classroom.message.repo.b.b.a a(ChannelMessage channelMessage) {
        long j;
        try {
            String str = channelMessage.msg_id;
            t.b(str, "msg.msg_id");
            j = Long.parseLong(str);
        } catch (Throwable unused) {
            j = 0;
        }
        String msg_type = channelMessage.msg_type;
        t.b(msg_type, "msg_type");
        Long send_timestamp = channelMessage.send_timestamp;
        t.b(send_timestamp, "send_timestamp");
        long longValue = send_timestamp.longValue();
        String room_id = channelMessage.room_id;
        t.b(room_id, "room_id");
        return new com.edu.classroom.message.repo.b.b.a(j, msg_type, longValue, room_id, channelMessage.payload.toByteArray(), null, 32, null);
    }

    private final Observable<ChannelMessage> a(final InputStream inputStream) {
        Observable<ChannelMessage> create = Observable.create(new ObservableOnSubscribe() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$5Ye0B0lBiYqcMPw-JtsKj2uHBs0
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(ObservableEmitter observableEmitter) {
                b.a(inputStream, observableEmitter);
            }
        });
        t.b(create, "create<ChannelMessage> {…mitter.onComplete()\n    }");
        return create;
    }

    private final Observable<List<com.edu.classroom.message.repo.b.b.a>> a(final List<com.edu.classroom.message.repo.b.b.a> list) {
        Observable<List<com.edu.classroom.message.repo.b.b.a>> fromCallable = Observable.fromCallable(new Callable() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$DxA-xwtEYQY5vLm5fOtagHzuUgQ
            @Override // java.util.concurrent.Callable
            public final Object call() {
                List d;
                d = b.d(b.this, list);
                return d;
            }
        });
        t.b(fromCallable, "fromCallable {\n        v…()\n        messages\n    }");
        return fromCallable;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ObservableSource a(b this$0, InputStream it) {
        t.d(this$0, "this$0");
        t.d(it, "it");
        return this$0.a(it);
    }

    private final Single<List<com.edu.classroom.channel.a.b.a>> a(final long j, final com.edu.classroom.message.repo.a.a aVar) {
        Single<List<com.edu.classroom.channel.a.b.a>> list = Observable.create(new ObservableOnSubscribe() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$YsTN8k-XibgCnsz8Yxjv_QUyvNs
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(ObservableEmitter observableEmitter) {
                b.a(com.edu.classroom.message.repo.a.a.this, j, observableEmitter);
            }
        }).toList();
        t.b(list, "create<ClassroomMessage>…lete()\n        }.toList()");
        return list;
    }

    private final io.reactivex.a a(final InputStream inputStream, final com.edu.classroom.message.repo.b.b.b bVar, final boolean z) {
        io.reactivex.a a2 = io.reactivex.a.a(new io.reactivex.d() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$AEy7ZEJ7OeR0XVau_cS0aqheLxk
            @Override // io.reactivex.d
            public final void subscribe(io.reactivex.b bVar2) {
                b.a(b.this, inputStream, bVar, z, bVar2);
            }
        });
        t.b(a2, "create { emitter ->\n    …)\n                }\n    }");
        return a2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final io.reactivex.e a(b this$0, com.edu.classroom.message.repo.b.b.b entity, InputStream it) {
        t.d(this$0, "this$0");
        t.d(entity, "$entity");
        t.d(it, "it");
        return this$0.a(it, entity, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final io.reactivex.e a(b this$0, List it) {
        t.d(this$0, "this$0");
        t.d(it, "it");
        return this$0.b((List<com.edu.classroom.message.repo.b.b.a>) it);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void a(com.edu.classroom.message.repo.a.a cache, long j, ObservableEmitter emitter) {
        t.d(cache, "$cache");
        t.d(emitter, "emitter");
        com.edu.classroom.channel.a.b.a c2 = cache.c();
        while (c2 != null && c2.m() <= j) {
            com.edu.classroom.channel.a.b.a b2 = cache.b();
            t.a(b2);
            emitter.onNext(b2);
            c2 = cache.c();
        }
        cache.b(j);
        emitter.onComplete();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void a(com.edu.classroom.message.repo.b.b.b entity, b this$0) {
        t.d(entity, "$entity");
        t.d(this$0, "this$0");
        entity.a(true);
        this$0.f(entity);
        this$0.e(entity);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void a(b this$0, long j, ObservableEmitter emitter) {
        t.d(this$0, "this$0");
        t.d(emitter, "emitter");
        com.edu.classroom.message.repo.b.b.a a2 = this$0.b().a(this$0.f, "fsm", j);
        if (a2 == null) {
            throw new NoStatusMsgException(j);
        }
        emitter.onNext(a2);
        com.edu.classroom.message.repo.b.b.a a3 = this$0.b().a(this$0.f, "user_state", j);
        if (a3 != null) {
            emitter.onNext(a3);
        }
        emitter.onComplete();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void a(b this$0, com.edu.classroom.message.repo.b.b.a aVar) {
        t.d(this$0, "this$0");
        aVar.a(this$0.g);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void a(b this$0, ChannelMessage msg) {
        t.d(this$0, "this$0");
        Set<k> f = this$0.f();
        ArrayList arrayList = new ArrayList(kotlin.collections.t.a(f, 10));
        for (k kVar : f) {
            t.b(msg, "msg");
            kVar.a(msg);
            arrayList.add(kotlin.t.f36712a);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void a(final b this$0, InputStream r, final com.edu.classroom.message.repo.b.b.b entity, final boolean z, final io.reactivex.b emitter) {
        t.d(this$0, "this$0");
        t.d(r, "$r");
        t.d(entity, "$entity");
        t.d(emitter, "emitter");
        final Ref.BooleanRef booleanRef = new Ref.BooleanRef();
        Observable doOnNext = this$0.a(r).filter(new o() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$WR9sFopUoX1FzefGP6YIAkEHvbk
            @Override // io.reactivex.functions.o
            public final boolean test(Object obj) {
                boolean d;
                d = b.d((ChannelMessage) obj);
                return d;
            }
        }).map(new Function() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$kH1__E5-2OEoTM1CMfVh3e6YnAk
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                com.edu.classroom.message.repo.b.b.a e;
                e = b.e(b.this, (ChannelMessage) obj);
                return e;
            }
        }).buffer(50).concatMap(new Function() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$CB1qi55PAVIWTy4B3Mo5bvDMYWw
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                ObservableSource c2;
                c2 = b.c(b.this, (List) obj);
                return c2;
            }
        }).doOnNext(new Consumer() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$o-KmACCPyq4Gi0eXRIA6h22fOa4
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                b.a(Ref.BooleanRef.this, entity, emitter, (List) obj);
            }
        });
        t.b(doOnNext, "parseMessages(r).filter{…      }\n                }");
        com.edu.classroom.base.f.b.a(doOnNext, new kotlin.jvm.a.b<Long, kotlin.t>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$parseAndPersistMessages$1$5
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            @Override // kotlin.jvm.a.b
            public /* synthetic */ kotlin.t invoke(Long l) {
                invoke(l.longValue());
                return kotlin.t.f36712a;
            }

            public final void invoke(long j) {
                if (!Ref.BooleanRef.this.element) {
                    emitter.onComplete();
                }
                if (z) {
                    entity.a(true);
                    com.edu.classroom.base.sdkmonitor.b.a(com.edu.classroom.base.sdkmonitor.b.f22757a, "classroom_playback_service", null, new JSONObject().put("playback_message_database_duration", j), null, 8, null);
                } else {
                    entity.b(true);
                    com.edu.classroom.base.sdkmonitor.b.a(com.edu.classroom.base.sdkmonitor.b.f22757a, "classroom_playback_service", null, new JSONObject().put("playback_self_message_database_duration", j), null, 8, null);
                }
                this$0.f(entity);
                this$0.e(entity);
                com.edu.classroom.base.log.c.i$default(com.edu.classroom.playback.c.a.f24081a, t.a("playback messages download finish completely: ", (Object) Long.valueOf(j)), null, 2, null);
            }
        }).subscribe(new Consumer() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$GTN14dsNOPhAs94GMcL6FD9hShs
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                b.c((List) obj);
            }
        }, new Consumer() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$SIWoab_dRMBlqqOmQiepMnRwCBw
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                b.a(Ref.BooleanRef.this, emitter, (Throwable) obj);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void a(InputStream inputStream, ObservableEmitter emitter) {
        t.d(inputStream, "$inputStream");
        t.d(emitter, "emitter");
        ProtoReader protoReader = new ProtoReader(Okio.buffer(Okio.source(inputStream)));
        try {
            long beginMessage = protoReader.beginMessage();
            while (protoReader.nextTag() != -1) {
                emitter.onNext(ChannelMessage.ADAPTER.decode(protoReader));
            }
            protoReader.endMessageAndGetUnknownFields(beginMessage);
            emitter.onComplete();
        } catch (Throwable th) {
            if (!(th instanceof ProtocolException)) {
                throw new MsgFetchException(th);
            }
            throw new MsgParseException(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void a(Ref.BooleanRef completed, com.edu.classroom.message.repo.b.b.b entity, io.reactivex.b emitter, List it) {
        t.d(completed, "$completed");
        t.d(entity, "$entity");
        t.d(emitter, "$emitter");
        if (completed.element) {
            return;
        }
        t.b(it, "it");
        if (((com.edu.classroom.message.repo.b.b.a) kotlin.collections.t.i(it)).c() > entity.i() + 60000) {
            emitter.onComplete();
            completed.element = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void a(Ref.BooleanRef completed, io.reactivex.b emitter, Throwable th) {
        t.d(completed, "$completed");
        t.d(emitter, "$emitter");
        com.edu.classroom.base.log.c.e$default(com.edu.classroom.playback.c.a.f24081a, "playback messages download failed", th, null, 4, null);
        if (completed.element) {
            return;
        }
        emitter.onError(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final com.edu.classroom.message.repo.b.b.a b(b this$0, ChannelMessage it) {
        t.d(this$0, "this$0");
        t.d(it, "it");
        return this$0.a(it);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ObservableSource b(b this$0, InputStream it) {
        t.d(this$0, "this$0");
        t.d(it, "it");
        return this$0.a(it);
    }

    private final Single<List<com.edu.classroom.message.repo.b.b.a>> b(final long j) {
        com.edu.classroom.channel.a.a.f22972a.d(t.a("PlaybackMessageDataSourceImpl.queryLatestMessage ts=", (Object) Long.valueOf(j)));
        Single<List<com.edu.classroom.message.repo.b.b.a>> list = Observable.create(new ObservableOnSubscribe() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$4Mg0rVXGHfmIu4t996VDjjlZUGk
            @Override // io.reactivex.ObservableOnSubscribe
            public final void subscribe(ObservableEmitter observableEmitter) {
                b.a(b.this, j, observableEmitter);
            }
        }).toList();
        t.b(list, "create<MessageEntity> { …lete()\n        }.toList()");
        return list;
    }

    private final io.reactivex.a b(final List<com.edu.classroom.message.repo.b.b.a> list) {
        io.reactivex.a a2 = io.reactivex.a.a(new io.reactivex.functions.a() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$taoGznTGiXzIgtPx_e9CPut0NsY
            @Override // io.reactivex.functions.a
            public final void run() {
                b.e(b.this, list);
            }
        });
        t.b(a2, "fromAction {\n        val…gPersistException()\n    }");
        return a2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final io.reactivex.e b(b this$0, com.edu.classroom.message.repo.b.b.b entity, InputStream it) {
        t.d(this$0, "this$0");
        t.d(entity, "$entity");
        t.d(it, "it");
        return this$0.a(it, entity, false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final io.reactivex.e b(b this$0, List it) {
        t.d(this$0, "this$0");
        t.d(it, "it");
        return this$0.b((List<com.edu.classroom.message.repo.b.b.a>) it);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final List b(List t1, List t2) {
        t.d(t1, "t1");
        t.d(t2, "t2");
        return kotlin.collections.t.c((Collection) t1, (Iterable) t2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void b(com.edu.classroom.message.repo.b.b.b entity, b this$0) {
        t.d(entity, "$entity");
        t.d(this$0, "this$0");
        entity.b(true);
        this$0.f(entity);
        this$0.e(entity);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final boolean b(ChannelMessage it) {
        t.d(it, "it");
        return (t.a((Object) it.msg_type, (Object) "fsm_version") || t.a((Object) it.msg_type, (Object) "user_state_version")) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final ObservableSource c(b this$0, List it) {
        t.d(this$0, "this$0");
        t.d(it, "it");
        return this$0.a((List<com.edu.classroom.message.repo.b.b.a>) it);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final List c(List t1, List t2) {
        t.d(t1, "t1");
        t.d(t2, "t2");
        return kotlin.collections.t.c((Collection) t1, (Iterable) t2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void c(b this$0) {
        t.d(this$0, "this$0");
        Set<k> f = this$0.f();
        ArrayList arrayList = new ArrayList(kotlin.collections.t.a(f, 10));
        Iterator<T> it = f.iterator();
        while (it.hasNext()) {
            ((k) it.next()).a();
            arrayList.add(kotlin.t.f36712a);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void c(b this$0, ChannelMessage msg) {
        t.d(this$0, "this$0");
        Set<k> f = this$0.f();
        ArrayList arrayList = new ArrayList(kotlin.collections.t.a(f, 10));
        for (k kVar : f) {
            t.b(msg, "msg");
            kVar.a(msg);
            arrayList.add(kotlin.t.f36712a);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void c(List list) {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final boolean c(ChannelMessage it) {
        t.d(it, "it");
        return (t.a((Object) it.msg_type, (Object) "fsm_version") || t.a((Object) it.msg_type, (Object) "user_state_version")) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final com.edu.classroom.message.repo.b.b.a d(b this$0, ChannelMessage it) {
        t.d(this$0, "this$0");
        t.d(it, "it");
        return this$0.a(it);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final List d(b this$0, List messages) {
        t.d(this$0, "this$0");
        t.d(messages, "$messages");
        if (this$0.b().a((List<com.edu.classroom.message.repo.b.b.a>) messages).length == messages.size()) {
            return messages;
        }
        throw new MsgPersistException();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final List d(List list) {
        t.d(list, "list");
        List list2 = list;
        ArrayList arrayList = new ArrayList(kotlin.collections.t.a((Iterable) list2, 10));
        Iterator it = list2.iterator();
        while (it.hasNext()) {
            arrayList.add(com.edu.classroom.message.repo.fetcher.d.a((com.edu.classroom.message.repo.b.b.a) it.next()));
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final boolean d(ChannelMessage it) {
        t.d(it, "it");
        return (t.a((Object) it.msg_type, (Object) "fsm_version") || t.a((Object) it.msg_type, (Object) "user_state_version")) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final com.edu.classroom.message.repo.b.b.a e(b this$0, ChannelMessage it) {
        t.d(this$0, "this$0");
        t.d(it, "it");
        return this$0.a(it);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void e(com.edu.classroom.message.repo.b.b.b bVar) {
        if (bVar.h() || bVar.g()) {
            c().a(bVar).c();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void e(b this$0, List messages) {
        t.d(this$0, "this$0");
        t.d(messages, "$messages");
        if (this$0.b().a((List<com.edu.classroom.message.repo.b.b.a>) messages).length != messages.size()) {
            throw new MsgPersistException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void f(com.edu.classroom.message.repo.b.b.b bVar) {
        if (bVar.g()) {
            if (!bVar.h()) {
                if (!(bVar.d().length() == 0)) {
                    return;
                }
            }
            this.i.a((ab<Boolean>) true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void f(b this$0, List list) {
        t.d(this$0, "this$0");
        this$0.g().c(((com.edu.classroom.message.repo.b.b.a) list.get(0)).c());
        com.edu.classroom.message.repo.a.c cVar = this$0.l;
        com.edu.classroom.message.repo.a.b bVar = null;
        if (cVar == null) {
            t.b("chatCache");
            cVar = null;
        }
        cVar.c(((com.edu.classroom.message.repo.b.b.a) list.get(0)).c());
        com.edu.classroom.message.repo.a.b bVar2 = this$0.m;
        if (bVar2 == null) {
            t.b("boardCache");
        } else {
            bVar = bVar2;
        }
        bVar.c(((com.edu.classroom.message.repo.b.b.a) list.get(0)).c());
    }

    private final com.edu.classroom.message.repo.a.d g() {
        return (com.edu.classroom.message.repo.a.d) this.k.getValue();
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    public Single<List<com.edu.classroom.channel.a.b.a>> a(long j) {
        com.edu.classroom.base.log.c.i$default(com.edu.classroom.channel.a.a.f22972a, t.a("prefetch message to cache: ", (Object) Long.valueOf(j)), null, 2, null);
        Single<List<com.edu.classroom.channel.a.b.a>> map = com.edu.classroom.base.f.b.a(b(j)).doOnSuccess(new Consumer() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$95N9TYbWfUfBKTmxQmg1eo3SoeU
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                b.f(b.this, (List) obj);
            }
        }).map(new Function() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$jW1XoJoQp557fMcA3HotSOGcs9w
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                List d;
                d = b.d((List) obj);
                return d;
            }
        });
        t.b(map, "queryLatestMessage(times…)\n            }\n        }");
        return map;
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    public Single<List<com.edu.classroom.channel.a.b.a>> a(long j, long j2) {
        com.edu.classroom.channel.a.a.f22972a.d("getMessages start:" + j + " end:" + j2);
        Single<List<com.edu.classroom.channel.a.b.a>> a2 = a(j2, g());
        com.edu.classroom.message.repo.a.c cVar = this.l;
        com.edu.classroom.message.repo.a.b bVar = null;
        if (cVar == null) {
            t.b("chatCache");
            cVar = null;
        }
        Single<R> zipWith = a2.zipWith(a(j2, cVar), new io.reactivex.functions.c() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$J50kNhyWekB_fAWQHiT04MO26ls
            @Override // io.reactivex.functions.c
            public final Object apply(Object obj, Object obj2) {
                List b2;
                b2 = b.b((List) obj, (List) obj2);
                return b2;
            }
        });
        com.edu.classroom.message.repo.a.b bVar2 = this.m;
        if (bVar2 == null) {
            t.b("boardCache");
        } else {
            bVar = bVar2;
        }
        Single<List<com.edu.classroom.channel.a.b.a>> zipWith2 = zipWith.zipWith(a(j2, bVar), new io.reactivex.functions.c() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$x8lSO9tCc19sQWLURws8U6-yyuA
            @Override // io.reactivex.functions.c
            public final Object apply(Object obj, Object obj2) {
                List c2;
                c2 = b.c((List) obj, (List) obj2);
                return c2;
            }
        });
        t.b(zipWith2, "getMessagesFromCache(end…on { t1, t2 -> t1 + t2 })");
        return zipWith2;
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    public io.reactivex.a a() {
        io.reactivex.a a2 = io.reactivex.a.a(new io.reactivex.functions.a() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$Eijr6fxwJ6goTXghnQbZoxutEpo
            @Override // io.reactivex.functions.a
            public final void run() {
                b.c(b.this);
            }
        });
        t.b(a2, "fromAction {\n        pro…p { it.complete() }\n    }");
        return a2;
    }

    public final io.reactivex.a a(final com.edu.classroom.message.repo.b.b.b entity) {
        t.d(entity, "entity");
        io.reactivex.a b2 = this.h.a(entity.c()).flatMapCompletable(new Function() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$pA2eiR81LzXUokwEUn-mw1iIMLk
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                io.reactivex.e a2;
                a2 = b.a(b.this, entity, (InputStream) obj);
                return a2;
            }
        }).b(io.reactivex.schedulers.a.b());
        t.b(b2, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b2;
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    public io.reactivex.a a(com.edu.classroom.message.repo.b.b.b entity, String url) {
        t.d(entity, "entity");
        t.d(url, "url");
        if (!entity.g() || !t.a((Object) entity.c(), (Object) url)) {
            entity.a(url);
            return q.f22821a.b().getClassroomPlaybackSettings().a() ? a(entity) : c(entity);
        }
        f(entity);
        io.reactivex.a a2 = io.reactivex.a.a();
        t.b(a2, "complete()");
        return a2;
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    public void a(List<com.edu.classroom.message.repo.c.b> chatInfoBlocks, List<com.edu.classroom.message.repo.c.a> boardInfoBlocks) {
        t.d(chatInfoBlocks, "chatInfoBlocks");
        t.d(boardInfoBlocks, "boardInfoBlocks");
        com.edu.classroom.message.repo.fetcher.b bVar = new com.edu.classroom.message.repo.fetcher.b(e(), chatInfoBlocks);
        com.edu.classroom.message.repo.fetcher.a aVar = new com.edu.classroom.message.repo.fetcher.a(e(), boardInfoBlocks);
        this.l = new com.edu.classroom.message.repo.a.c(bVar, new LinkedBlockingQueue());
        this.m = new com.edu.classroom.message.repo.a.b(aVar, new LinkedBlockingQueue());
    }

    public final com.edu.classroom.message.repo.b.a.d b() {
        com.edu.classroom.message.repo.b.a.d dVar = this.f24036a;
        if (dVar != null) {
            return dVar;
        }
        t.b("messageDao");
        return null;
    }

    public final io.reactivex.a b(final com.edu.classroom.message.repo.b.b.b entity) {
        t.d(entity, "entity");
        io.reactivex.a b2 = this.h.a(entity.d()).flatMapCompletable(new Function() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$sp-PFEE0jFAY0Ngncx1OdZYmumI
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                io.reactivex.e b3;
                b3 = b.b(b.this, entity, (InputStream) obj);
                return b3;
            }
        }).b(io.reactivex.schedulers.a.b());
        t.b(b2, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b2;
    }

    @Override // com.edu.classroom.message.repo.datasource.a
    public io.reactivex.a b(com.edu.classroom.message.repo.b.b.b entity, String url) {
        t.d(entity, "entity");
        t.d(url, "url");
        if (!entity.h() || !t.a((Object) entity.d(), (Object) url)) {
            if (!(url.length() == 0)) {
                entity.b(url);
                return q.f22821a.b().getClassroomPlaybackSettings().a() ? b(entity) : d(entity);
            }
        }
        f(entity);
        io.reactivex.a a2 = io.reactivex.a.a();
        t.b(a2, "complete()");
        return a2;
    }

    public final com.edu.classroom.message.repo.b.a.b c() {
        com.edu.classroom.message.repo.b.a.b bVar = this.f24037b;
        if (bVar != null) {
            return bVar;
        }
        t.b("playbackInfoDao");
        return null;
    }

    public final io.reactivex.a c(final com.edu.classroom.message.repo.b.b.b entity) {
        t.d(entity, "entity");
        io.reactivex.a b2 = this.h.a(entity.c()).flatMapObservable(new Function() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$3ADbfoJaaZecJrpRhNGoh-Gyr1o
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                ObservableSource a2;
                a2 = b.a(b.this, (InputStream) obj);
                return a2;
            }
        }).doOnNext(new Consumer() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$sPTrYDje2N-iWqAF73FyKp9EqtE
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                b.a(b.this, (ChannelMessage) obj);
            }
        }).filter(new o() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$M9SQEm6iXrBqcxzKmYI9COLXjo0
            @Override // io.reactivex.functions.o
            public final boolean test(Object obj) {
                boolean b3;
                b3 = b.b((ChannelMessage) obj);
                return b3;
            }
        }).map(new Function() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$AuHb363rAYn4OFdoy3zsrXOKezo
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                com.edu.classroom.message.repo.b.b.a b3;
                b3 = b.b(b.this, (ChannelMessage) obj);
                return b3;
            }
        }).buffer(50).flatMapCompletable(new Function() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$NAS7Eu4uXdHiCg8EcXDCoqBXpno
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                io.reactivex.e a2;
                a2 = b.a(b.this, (List) obj);
                return a2;
            }
        }).b(new io.reactivex.functions.a() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$P_eT8-nr0ilEOw8ywHnbH7gZOP8
            @Override // io.reactivex.functions.a
            public final void run() {
                b.a(com.edu.classroom.message.repo.b.b.b.this, this);
            }
        });
        t.b(b2, "messageNetworkFetcher.fe…ty)\n                    }");
        io.reactivex.a b3 = com.edu.classroom.base.f.b.a(b2, new kotlin.jvm.a.b<Long, kotlin.t>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchRoomMessagesOld$7
            @Override // kotlin.jvm.a.b
            public /* synthetic */ kotlin.t invoke(Long l) {
                invoke(l.longValue());
                return kotlin.t.f36712a;
            }

            public final void invoke(long j) {
                com.edu.classroom.base.sdkmonitor.b.a(com.edu.classroom.base.sdkmonitor.b.f22757a, "classroom_playback_service", null, new JSONObject().put("playback_message_database_duration", j), null, 8, null);
            }
        }).b(io.reactivex.schedulers.a.b());
        t.b(b3, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b3;
    }

    public final com.edu.classroom.message.repo.fetcher.c d() {
        com.edu.classroom.message.repo.fetcher.c cVar = this.f24038c;
        if (cVar != null) {
            return cVar;
        }
        t.b("messageDbFetcher");
        return null;
    }

    public final io.reactivex.a d(final com.edu.classroom.message.repo.b.b.b entity) {
        t.d(entity, "entity");
        io.reactivex.a b2 = this.h.a(entity.d()).flatMapObservable(new Function() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$W50LmsnV7NRTExsX5idzFF52_ss
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                ObservableSource b3;
                b3 = b.b(b.this, (InputStream) obj);
                return b3;
            }
        }).doOnNext(new Consumer() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$30mwL9dQL8_175gxj9bDmi_XHAw
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                b.c(b.this, (ChannelMessage) obj);
            }
        }).filter(new o() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$lwi8cUYxU1AXi11j_RIpjwIV38E
            @Override // io.reactivex.functions.o
            public final boolean test(Object obj) {
                boolean c2;
                c2 = b.c((ChannelMessage) obj);
                return c2;
            }
        }).map(new Function() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$qyj8qENqkxBuesOYypEOS-MCPF8
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                com.edu.classroom.message.repo.b.b.a d;
                d = b.d(b.this, (ChannelMessage) obj);
                return d;
            }
        }).doOnNext(new Consumer() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$7GtdCkqnTNJaEJHSNQBzKruWwNk
            @Override // io.reactivex.functions.Consumer
            public final void accept(Object obj) {
                b.a(b.this, (com.edu.classroom.message.repo.b.b.a) obj);
            }
        }).buffer(50).flatMapCompletable(new Function() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$CpOPM18Jj1vFZ4uR7-plmzWqK5g
            @Override // io.reactivex.functions.Function
            public final Object apply(Object obj) {
                io.reactivex.e b3;
                b3 = b.b(b.this, (List) obj);
                return b3;
            }
        }).b(new io.reactivex.functions.a() { // from class: com.edu.classroom.message.repo.datasource.-$$Lambda$b$ckLv9uoRcV3VlEDgntuy-OPtIvQ
            @Override // io.reactivex.functions.a
            public final void run() {
                b.b(com.edu.classroom.message.repo.b.b.b.this, this);
            }
        });
        t.b(b2, "messageNetworkFetcher.fe…ty)\n                    }");
        io.reactivex.a b3 = com.edu.classroom.base.f.b.a(b2, new kotlin.jvm.a.b<Long, kotlin.t>() { // from class: com.edu.classroom.message.repo.datasource.PlaybackMessageDataSourceImpl$fetchSelfMessagesOld$8
            @Override // kotlin.jvm.a.b
            public /* synthetic */ kotlin.t invoke(Long l) {
                invoke(l.longValue());
                return kotlin.t.f36712a;
            }

            public final void invoke(long j) {
                com.edu.classroom.base.sdkmonitor.b.a(com.edu.classroom.base.sdkmonitor.b.f22757a, "classroom_playback_service", null, new JSONObject().put("playback_self_message_database_duration", j), null, 8, null);
            }
        }).b(io.reactivex.schedulers.a.b());
        t.b(b3, "messageNetworkFetcher.fe…scribeOn(Schedulers.io())");
        return b3;
    }

    public final h e() {
        h hVar = this.d;
        if (hVar != null) {
            return hVar;
        }
        t.b("retrofit");
        return null;
    }

    public final Set<k> f() {
        Set<k> set = this.e;
        if (set != null) {
            return set;
        }
        t.b("processors");
        return null;
    }
}
