package fan.concurrent;

import fan.concurrent.ThreadPool;
import fan.sys.ArgErr;
import fan.sys.Duration;
import fan.sys.Err;
import fan.sys.FanObj;
import fan.sys.Func;
import fan.sys.InterruptedErr;
import fan.sys.Locale;
import fan.sys.Map;
import fan.sys.NullErr;
import fan.sys.Sys;
import fan.sys.Type;
import java.util.HashMap;

/* loaded from: input_file:fan/concurrent/Actor.class */
public class Actor extends FanObj implements ThreadPool.Work {
    private static Type type;
    private static final ThreadLocal locals = new ThreadLocal() { // from class: fan.concurrent.Actor.1
        @Override // java.lang.ThreadLocal
        protected Object initialValue() {
            return new Map(Sys.StrType, Sys.ObjType.toNullable());
        }
    };
    private ActorPool pool;
    private Func receive;
    private Queue queue;
    private Object lock = new Object();
    private boolean submitted = false;
    final Context context = new Context(this);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:fan/concurrent/Actor$CoalescingQueue.class */
    public static class CoalescingQueue extends Queue {
        Func toKeyFunc;
        Func coalesceFunc;
        HashMap pending = new HashMap();

        CoalescingQueue(Func func, Func func2) {
            this.toKeyFunc = func;
            this.coalesceFunc = func2;
        }

        @Override // fan.concurrent.Actor.Queue
        public Future get() {
            Future future = super.get();
            if (future != null) {
                try {
                    Object key = toKey(future.msg);
                    if (key != null) {
                        this.pending.remove(key);
                    }
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
            return future;
        }

        @Override // fan.concurrent.Actor.Queue
        public void add(Future future) {
            try {
                Object key = toKey(future.msg);
                if (key != null) {
                    this.pending.put(key, future);
                }
            } catch (Throwable th) {
                th.printStackTrace();
            }
            super.add(future);
        }

        @Override // fan.concurrent.Actor.Queue
        public Future coalesce(Future future) {
            Future future2;
            Object key = toKey(future.msg);
            if (key == null || (future2 = (Future) this.pending.get(key)) == null) {
                return null;
            }
            future2.msg = coalesce(future2.msg, future.msg);
            return future2;
        }

        private Object toKey(Object obj) {
            return this.toKeyFunc == null ? obj : this.toKeyFunc.call(obj);
        }

        private Object coalesce(Object obj, Object obj2) {
            return this.coalesceFunc == null ? obj2 : this.coalesceFunc.call(obj, obj2);
        }
    }

    /* loaded from: input_file:fan/concurrent/Actor$Context.class */
    static final class Context {
        final Actor actor;
        final Map locals = new Map(Sys.StrType, Sys.ObjType.toNullable());
        Locale locale = Locale.cur();

        Context(Actor actor) {
            this.actor = actor;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:fan/concurrent/Actor$Queue.class */
    public static class Queue {
        Future head;
        Future tail;
        int size;

        Queue() {
        }

        public Future get() {
            if (this.head == null) {
                return null;
            }
            Future future = this.head;
            this.head = future.next;
            if (this.head == null) {
                this.tail = null;
            }
            future.next = null;
            this.size--;
            return future;
        }

        public void add(Future future) {
            if (this.tail == null) {
                this.tail = future;
                this.head = future;
                future.next = null;
            } else {
                this.tail.next = future;
                this.tail = future;
            }
            this.size++;
        }

        public Future coalesce(Future future) {
            return null;
        }
    }

    public static Actor make(ActorPool actorPool) {
        return make(actorPool, null);
    }

    public static Actor make(ActorPool actorPool, Func func) {
        Actor actor = new Actor();
        make$(actor, actorPool, func);
        return actor;
    }

    public static void make$(Actor actor, ActorPool actorPool, Func func) {
        if (actorPool == null) {
            throw NullErr.make("pool is null").val;
        }
        if (func == null && actor.typeof().qname().equals("concurrent::Actor")) {
            throw ArgErr.make("must supply receive func or subclass Actor").val;
        }
        if (func != null) {
            func = (Func) func.toImmutable();
        }
        actor.pool = actorPool;
        actor.receive = func;
        actor.queue = new Queue();
    }

    public static Actor makeCoalescing(ActorPool actorPool, Func func, Func func2) {
        return makeCoalescing(actorPool, func, func2, null);
    }

    public static Actor makeCoalescing(ActorPool actorPool, Func func, Func func2, Func func3) {
        Actor actor = new Actor();
        makeCoalescing$(actor, actorPool, func, func2, func3);
        return actor;
    }

    public static void makeCoalescing$(Actor actor, ActorPool actorPool, Func func, Func func2, Func func3) {
        if (func != null) {
            func = (Func) func.toImmutable();
        }
        if (func2 != null) {
            func2 = (Func) func2.toImmutable();
        }
        make$(actor, actorPool, func3);
        actor.queue = new CoalescingQueue(func, func2);
    }

    @Override // fan.sys.FanObj
    public Type typeof() {
        if (type == null) {
            type = Type.find("concurrent::Actor");
        }
        return type;
    }

    public final ActorPool pool() {
        return this.pool;
    }

    public final Future send(Object obj) {
        return _send(obj, null, null);
    }

    public final Future sendLater(Duration duration, Object obj) {
        return _send(obj, duration, null);
    }

    public final Future sendWhenDone(Future future, Object obj) {
        return _send(obj, null, future);
    }

    protected Object receive(Object obj) {
        if (this.receive != null) {
            return this.receive.call(obj);
        }
        System.out.println("WARNING: " + typeof() + ".receive not overridden");
        return null;
    }

    public static void sleep(Duration duration) {
        try {
            long j = duration.ticks;
            Thread.sleep(j / Duration.nsPerMilli, (int) (j % Duration.nsPerMilli));
        } catch (InterruptedException e) {
            throw InterruptedErr.make(e).val;
        }
    }

    public static Map locals() {
        return (Map) locals.get();
    }

    private Future _send(Object obj, Duration duration, Future future) {
        Object safe = Sys.safe(obj);
        if (this.pool.isStopped()) {
            throw Err.make("ActorPool is stopped").val;
        }
        Future future2 = new Future(safe);
        if (duration != null) {
            this.pool.schedule(this, duration, future2);
        } else if (future != null) {
            future.sendWhenDone(this, future2);
        } else {
            future2 = _enqueue(future2, true);
        }
        return future2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final Future _enqueue(Future future, boolean z) {
        synchronized (this.lock) {
            if (z) {
                Future coalesce = this.queue.coalesce(future);
                if (coalesce != null) {
                    return coalesce;
                }
            }
            this.queue.add(future);
            if (!this.submitted) {
                this.submitted = true;
                this.pool.submit(this);
            }
            return future;
        }
    }

    @Override // fan.concurrent.ThreadPool.Work
    public final void _work() {
        Future future;
        locals.set(this.context.locals);
        Locale.setCur(this.context.locale);
        for (int i = 0; i < 100; i++) {
            synchronized (this.lock) {
                future = this.queue.get();
            }
            if (future == null) {
                break;
            }
            _dispatch(future);
        }
        this.context.locale = Locale.cur();
        synchronized (this.lock) {
            if (this.queue.size == 0) {
                this.submitted = false;
            } else {
                this.submitted = true;
                this.pool.submit(this);
            }
        }
    }

    final void _dispatch(Future future) {
        try {
            if (future.isCancelled()) {
                return;
            }
            if (this.pool.killed) {
                future.cancel();
            } else {
                future.set(receive(future.msg));
            }
        } catch (Err.Val e) {
            future.err(e.err());
        } catch (Throwable th) {
            future.err(Err.make(th));
        }
    }

    @Override // fan.concurrent.ThreadPool.Work
    public void _kill() {
        Queue queue;
        synchronized (this.lock) {
            queue = this.queue;
            this.queue = new Queue();
        }
        while (true) {
            Future future = queue.get();
            if (future == null) {
                return;
            } else {
                future.cancel();
            }
        }
    }
}
