消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出。如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的Handler进行处理;如果队列中没有消息,消息循环线程就会进入空闲等待状态,等待下一个消息的到来。在编写Android应用程序时,当程序执行的任务比较繁重时,为了不阻塞UI主线程而导致ANR的发生,我们通常的做法的创建一个子线程来完成特定的任务。在创建子线程时,有两种选择,一种通过创建Thread对象来创建一个无消息循环的子线程;还有一种就是创建一个带有消息循环的子线程,而创建带有消息循环的子线程由于两种实现方法,一种是直接利用Android给我们封装好的HandlerThread类来直接生成一个带有消息循环的线程对象,另一种方法是在实现线程的run()方法内使用以下方式启动一个消息循环:
一、消息机制使用 通常消息都是有一个消息线程和一个Handler组成,下面我们看PowerManagerService中的一个消息Handler:
mHandlerThread = new ServiceThread(TAG,Process.THREAD_PRIORITY_DISPLAY, false /*allowIo*/);mHandlerThread.start();mHandler = new PowerManagerHandler(mHandlerThread.getLooper());
这里的ServiceThread就是一个HandlerThread,创建Handler的时候,必须把HandlerThread的looper传进去,否则就是默认当前线程的looper。
而每个handler,大致如下:
private final class PowerManagerHandler extends Handler {public PowerManagerHandler(Looper looper) {super(looper, null, true /*async*/);}@Overridepublic void handleMessage(Message msg) {switch (msg.what) {case MSG_USER_ACTIVITY_TIMEOUT:handleUserActivityTimeout();break;case MSG_SANDMAN:handleSandman();break;case MSG_SCREEN_BRIGHTNESS_BOOST_TIMEOUT:handleScreenBrightnessBoostTimeout();break;case MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT:checkWakeLockAquireTooLong();Message m = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT);m.setAsynchronous(true);mHandler.sendMessageDelayed(m, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);break;}}}
二、消息机制原理
那我们先来看下HandlerThread的主函数run函数:
public void run() {mTid = Process.myTid();Looper.prepare();synchronized (this) {mLooper = Looper.myLooper();//赋值后notifyall,主要是getLooper函数返回的是mLoopernotifyAll();}Process.setThreadPriority(mPriority);onLooperPrepared();Looper.loop();mTid = -1;}
再来看看Lopper的prepare函数,最后新建了一个Looper对象,并且放在线程的局部变量中。
public static void prepare() {prepare(true);}private static void prepare(boolean quitAllowed) {if (sThreadLocal.get() != null) {throw new RuntimeException("Only one Looper may be created per thread");}sThreadLocal.set(new Looper(quitAllowed));}
Looper的构造函数中创建了MessageQueue
private Looper(boolean quitAllowed) {mQueue = new MessageQueue(quitAllowed);mThread = Thread.currentThread();}
我们再来看下MessageQueue的构造函数,其中nativeInit是一个native方法,并且把返回值保存在mPtr显然是用long型变量保存的指针
MessageQueue(boolean quitAllowed) {mQuitAllowed = quitAllowed;mPtr = nativeInit();}
native函数中主要创建了NativeMessageQueue对象,并且把指针变量返回了。
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();if (!nativeMessageQueue) {jniThrowRuntimeException(env, "Unable to allocate native queue");return 0;}nativeMessageQueue->incStrong(env);return reinterpret_cast<jlong>(nativeMessageQueue);}
NativeMessageQueue构造函数就是获取mLooper,如果没有就是新建一个Looper
NativeMessageQueue::NativeMessageQueue() :mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {mLooper = Looper::getForThread();if (mLooper == NULL) {mLooper = new Looper(false);Looper::setForThread(mLooper);}}
然后我们再看下Looper的构造函数,显示调用了eventfd创建了一个fd,eventfd它的主要是用于进程或者线程间的通信,我们可以看下这篇博客eventfd介绍
Looper::Looper(bool allowNonCallbacks) :mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {mWakeEventFd = eventfd(0, EFD_NONBLOCK);LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno);AutoMutex _l(mLock);rebuildEpollLocked();}
2.1 c层创建epoll 我们再来看下rebuildEpollLocked函数,创建了epoll,并且把mWakeEventFd加入epoll,而且把mRequests的fd也加入epoll
void Looper::rebuildEpollLocked() {// Close old epoll instance if we have one.if (mEpollFd >= 0) {#if DEBUG_CALLBACKSALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);#endifclose(mEpollFd);}// Allocate the new epoll instance and register the wake pipe.mEpollFd = epoll_create(EPOLL_SIZE_HINT);LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);struct epoll_event eventItem;memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field unioneventItem.events = EPOLLIN;eventItem.data.fd = mWakeEventFd;int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d",errno);for (size_t i = 0; i < mRequests.size(); i++) {const Request& request = mRequests.valueAt(i);struct epoll_event eventItem;request.initEventItem(&eventItem);int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);if (epollResult < 0) {ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",request.fd, errno);}}}
继续回到HandlerThread的run函数,我们继续分析Looper的loop函数
public void run() {mTid = Process.myTid();Looper.prepare();synchronized (this) {mLooper = Looper.myLooper();notifyAll();}Process.setThreadPriority(mPriority);onLooperPrepared();Looper.loop();mTid = -1;}
我们看看Looper的loop函数:
public static void loop() {final Looper me = myLooper();if (me == null) {throw new RuntimeException("No Looper; Looper.prepare() wasn"t called on this thread.");}final MessageQueue queue = me.mQueue;//得到Looper的mQueue// Make sure the identity of this thread is that of the local process,// and keep track of what that identity token actually is.Binder.clearCallingIdentity();final long ident = Binder.clearCallingIdentity();for (;;) {Message msg = queue.next(); // might block这个函数会阻塞,阻塞主要是epoll_waitif (msg == null) {// No message indicates that the message queue is quitting.return;}// This must be in a local variable, in case a UI event sets the loggerPrinter logging = me.mLogging;//自己打的打印if (logging != null) {logging.println(">>>>> Dispatching to " + msg.target + " " +msg.callback + ": " + msg.what);}msg.target.dispatchMessage(msg);if (logging != null) {logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);}// Make sure that during the course of dispatching the// identity of the thread wasn"t corrupted.final long newIdent = Binder.clearCallingIdentity();if (ident != newIdent) {Log.wtf(TAG, "Thread identity changed from 0x"+ Long.toHexString(ident) + " to 0x"+ Long.toHexString(newIdent) + " while dispatching to "+ msg.target.getClass().getName() + " "+ msg.callback + " what=" + msg.what);}msg.recycleUnchecked();}}
MessageQueue类的next函数主要是调用了nativePollOnce函数,后面就是从消息队列中取出一个Message
Message next() {// Return here if the message loop has already quit and been disposed.// This can happen if the application tries to restart a looper after quit// which is not supported.final long ptr = mPtr;//之前保留的指针if (ptr == 0) {return null;}int pendingIdleHandlerCount = -1; // -1 only during first iterationint nextPollTimeoutMillis = 0;for (;;) {if (nextPollTimeoutMillis != 0) {Binder.flushPendingCommands();}nativePollOnce(ptr, nextPollTimeoutMillis);
下面我们主要看下nativePollOnce这个native函数,把之前的指针强制转换成NativeMessageQueue,然后调用其pollOnce函数
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->pollOnce(env, obj, timeoutMillis);}
2.2 c层epoll_wait阻塞 pollOnce函数,这个函数前面的while一般都没有只是处理了indent大于0的情况,这种情况一般没有,所以我们可以直接看pollInner函数
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {int result = 0;for (;;) {while (mResponseIndex < mResponses.size()) {const Response& response = mResponses.itemAt(mResponseIndex++);int ident = response.request.ident;if (ident >= 0) {int fd = response.request.fd;int events = response.events;void* data = response.request.data;#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning signalled identifier %d: ""fd=%d, events=0x%x, data=%p",this, ident, fd, events, data);#endifif (outFd != NULL) *outFd = fd;if (outEvents != NULL) *outEvents = events;if (outData != NULL) *outData = data;return ident;}}if (result != 0) {#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - returning result %d", this, result);#endifif (outFd != NULL) *outFd = 0;if (outEvents != NULL) *outEvents = 0;if (outData != NULL) *outData = NULL;return result;}result = pollInner(timeoutMillis);}}
pollInner函数主要就是调用epoll_wait阻塞,并且java层会计算每次阻塞的时间传到c层,等待有mWakeEventFd或者之前addFd的fd有事件过来,才会epoll_wait返回。
int Looper::pollInner(int timeoutMillis) {#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);#endif// Adjust the timeout based on when the next message is due.if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);if (messageTimeoutMillis >= 0&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {timeoutMillis = messageTimeoutMillis;}#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",this, mNextMessageUptime - now, timeoutMillis);#endif}// Poll.int result = POLL_WAKE;mResponses.clear();//清空mResponsesmResponseIndex = 0;// We are about to idle.mPolling = true;struct epoll_event eventItems[EPOLL_MAX_EVENTS];int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);//epoll_wait主要线程阻塞在这,这个阻塞的时间也是有java层传过来的// No longer idling.mPolling = false;// Acquire lock.mLock.lock();// Rebuild epoll set if needed.if (mEpollRebuildRequired) {mEpollRebuildRequired = false;rebuildEpollLocked();goto Done;}// Check for poll error.if (eventCount < 0) {if (errno == EINTR) {goto Done;}ALOGW("Poll failed with an unexpected error, errno=%d", errno);result = POLL_ERROR;goto Done;}// Check for poll timeout.if (eventCount == 0) {#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - timeout", this);#endifresult = POLL_TIMEOUT;goto Done;}// Handle all events.#if DEBUG_POLL_AND_WAKEALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);#endiffor (int i = 0; i < eventCount; i++) {int fd = eventItems[i].data.fd;uint32_t epollEvents = eventItems[i].events;if (fd == mWakeEventFd) {//通知唤醒线程的事件if (epollEvents & EPOLLIN) {awoken();} else {ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);}} else {ssize_t requestIndex = mRequests.indexOfKey(fd);//之前addFd的事件if (requestIndex >= 0) {int events = 0;if (epollEvents & EPOLLIN) events |= EVENT_INPUT;if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;if (epollEvents & EPOLLERR) events |= EVENT_ERROR;if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;pushResponse(events, mRequests.valueAt(requestIndex));//放在mResponses中} else {ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is ""no longer registered.", epollEvents, fd);}}}Done: ;// Invoke pending message callbacks.mNextMessageUptime = LLONG_MAX;while (mMessageEnvelopes.size() != 0) {// 这块主要是c层的消息,java层的消息是自己管理的nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);if (messageEnvelope.uptime <= now) {// Remove the envelope from the list.// We keep a strong reference to the handler until the call to handleMessage// finishes. Then we drop it so that the handler can be deleted *before*// we reacquire our lock.{ // obtain handlersp<MessageHandler> handler = messageEnvelope.handler;Message message = messageEnvelope.message;mMessageEnvelopes.removeAt(0);mSendingMessage = true;mLock.unlock();#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",this, handler.get(), message.what);#endifhandler->handleMessage(message);} // release handlermLock.lock();mSendingMessage = false;result = POLL_CALLBACK;} else {// The last message left at the head of the queue determines the next wakeup time.mNextMessageUptime = messageEnvelope.uptime;break;}}// Release lock.mLock.unlock();// Invoke all response callbacks.for (size_t i = 0; i < mResponses.size(); i++) {//这是之前addFd的事件的处理,主要是遍历mResponses,然后调用其回调Response& response = mResponses.editItemAt(i);if (response.request.ident == POLL_CALLBACK) {int fd = response.request.fd;int events = response.events;void* data = response.request.data;#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",this, response.request.callback.get(), fd, events, data);#endif// Invoke the callback. Note that the file descriptor may be closed by// the callback (and potentially even reused) before the function returns so// we need to be a little careful when removing the file descriptor afterwards.int callbackResult = response.request.callback->handleEvent(fd, events, data);if (callbackResult == 0) {removeFd(fd, response.request.seq);}// Clear the callback reference in the response structure promptly because we// will not clear the response vector itself until the next poll.response.request.callback.clear();result = POLL_CALLBACK;}}return result;}
继续分析Looper的loop函数,可以增加自己的打印来调试代码,之前调用Message的target的dispatchMessage来分配消息
for (;;) {Message msg = queue.next(); // might blockif (msg == null) {// No message indicates that the message queue is quitting.return;}// This must be in a local variable, in case a UI event sets the loggerPrinter logging = me.mLogging;//自己的打印if (logging != null) {logging.println(">>>>> Dispatching to " + msg.target + " " +msg.callback + ": " + msg.what);}msg.target.dispatchMessage(msg);if (logging != null) {logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);}// Make sure that during the course of dispatching the// identity of the thread wasn"t corrupted.final long newIdent = Binder.clearCallingIdentity();if (ident != newIdent) {Log.wtf(TAG, "Thread identity changed from 0x"+ Long.toHexString(ident) + " to 0x"+ Long.toHexString(newIdent) + " while dispatching to "+ msg.target.getClass().getName() + " "+ msg.callback + " what=" + msg.what);}msg.recycleUnchecked();}}
2.3 增加调试打印 我们先来看自己添加打印,可以通过Lopper的setMessageLogging函数来打印
public void setMessageLogging(@Nullable Printer printer) {mLogging = printer;} Printer就是一个interface public interface Printer {/** * Write a line of text to the output. There is no need to terminate * the given string with a newline. */void println(String x);}
2.4 java层消息分发处理 再来看消息的分发,先是调用Handler的obtainMessage函数
Message msg = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); msg.setAsynchronous(true); mHandler.sendMessageDelayed(msg, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);
先看obtainMessage调用了Message的obtain函数
public final Message obtainMessage(int what){return Message.obtain(this, what);}
Message的obtain函数就是新建一个Message,然后其target就是设置成其Handler
public static Message obtain(Handler h, int what) {Message m = obtain();//就是新建一个Messagem.target = h;m.what = what;return m;}
我们再联系之前分发消息
msg.target.dispatchMessage(msg);最后就是调用Handler的dispatchMessage函数,最后在Handler中,最后会根据不同的情况对消息进行处理。
public void dispatchMessage(Message msg) {if (msg.callback != null) {handleCallback(msg);//这种就是用post形式发送,带Runnable的} else {if (mCallback != null) {//这种是handler传参的时候就是传入了mCallback回调了if (mCallback.handleMessage(msg)) {return;}}handleMessage(msg);//最后就是在自己实现的handleMessage处理}}
2.3 java层 消息发送
我们再看下java层的消息发送,主要也是调用Handler的sendMessage post之类函数,最终都会调用下面这个函数
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {MessageQueue queue = mQueue;if (queue == null) {RuntimeException e = new RuntimeException(this + " sendMessageAtTime() called with no mQueue");Log.w("Looper", e.getMessage(), e);return false;}return enqueueMessage(queue, msg, uptimeMillis);}
我们再来看java层发送消息最终都会调用enqueueMessage函数
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {msg.target = this;if (mAsynchronous) {msg.setAsynchronous(true);}return queue.enqueueMessage(msg, uptimeMillis);}
最终在enqueueMessage中,把消息加入消息队列,然后需要的话就调用c层的nativeWake函数
boolean enqueueMessage(Message msg, long when) {if (msg.target == null) {throw new IllegalArgumentException("Message must have a target.");}if (msg.isInUse()) {throw new IllegalStateException(msg + " This message is already in use.");}synchronized (this) {if (mQuitting) {IllegalStateException e = new IllegalStateException(msg.target + " sending message to a Handler on a dead thread");Log.w(TAG, e.getMessage(), e);msg.recycle();return false;}msg.markInUse();msg.when = when;Message p = mMessages;boolean needWake;if (p == null || when == 0 || when < p.when) {// New head, wake up the event queue if blocked.msg.next = p;mMessages = msg;needWake = mBlocked;} else {// Inserted within the middle of the queue. Usually we don"t have to wake// up the event queue unless there is a barrier at the head of the queue// and the message is the earliest asynchronous message in the queue.needWake = mBlocked && p.target == null && msg.isAsynchronous();Message prev;for (;;) {prev = p;p = p.next;if (p == null || when < p.when) {break;}if (needWake && p.isAsynchronous()) {needWake = false;}}msg.next = p; // invariant: p == prev.nextprev.next = msg;}// We can assume mPtr != 0 because mQuitting is false.if (needWake) {nativeWake(mPtr);}}return true;}
我们看下这个native方法,最后也是调用了Looper的wake函数
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->wake();}void NativeMessageQueue::wake() {mLooper->wake();}
Looper类的wake,函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似pipe,最后会把epoll_wait唤醒,线程就不阻塞了继续先发送c层消息,然后处理之前addFd的事件,然后处理java层的消息。
void Looper::wake() {#if DEBUG_POLL_AND_WAKEALOGD("%p ~ wake", this);#endifuint64_t inc = 1;ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));if (nWrite != sizeof(uint64_t)) {if (errno != EAGAIN) {ALOGW("Could not write wake signal, errno=%d", errno);}}}
2.4 c层发送消息
在c层也是可以发送消息的,主要是调用Looper的sendMessageAtTime函数,参数有有一个handler是一个回调,我们把消息放在mMessageEnvelopes中。
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,const Message& message) {#if DEBUG_CALLBACKSALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",this, uptime, handler.get(), message.what);#endifsize_t i = 0;{ // acquire lockAutoMutex _l(mLock);size_t messageCount = mMessageEnvelopes.size();while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {i += 1;}MessageEnvelope messageEnvelope(uptime, handler, message);mMessageEnvelopes.insertAt(messageEnvelope, i, 1);// Optimization: If the Looper is currently sending a message, then we can skip// the call to wake() because the next thing the Looper will do after processing// messages is to decide when the next wakeup time should be. In fact, it does// not even matter whether this code is running on the Looper thread.if (mSendingMessage) {return;}} // release lock// Wake the poll loop only when we enqueue a new message at the head.if (i == 0) {wake();}}
当在pollOnce中,在epoll_wait之后,会遍历mMessageEnvelopes中的消息,然后调用其handler的handleMessage函数
while (mMessageEnvelopes.size() != 0) {nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);if (messageEnvelope.uptime <= now) {// Remove the envelope from the list.// We keep a strong reference to the handler until the call to handleMessage// finishes. Then we drop it so that the handler can be deleted *before*// we reacquire our lock.{ // obtain handlersp<MessageHandler> handler = messageEnvelope.handler;Message message = messageEnvelope.message;mMessageEnvelopes.removeAt(0);mSendingMessage = true;mLock.unlock();#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",this, handler.get(), message.what);#endifhandler->handleMessage(message);} // release handlermLock.lock();mSendingMessage = false;result = POLL_CALLBACK;} else {// The last message left at the head of the queue determines the next wakeup time.mNextMessageUptime = messageEnvelope.uptime;break;}}
有一个Looper_test.cpp文件,里面介绍了很多Looper的使用方法,我们来看下
sp<StubMessageHandler> handler = new StubMessageHandler();mLooper->sendMessageAtTime(now + ms2ns(100), handler, Message(MSG_TEST1)); StubMessageHandler继承MessageHandler就必须实现handleMessage方法 class StubMessageHandler : public MessageHandler {public:Vector<Message> messages;virtual void handleMessage(const Message& message) {messages.push(message);}};
我们再顺便看下Message和MessageHandler类
struct Message {Message() : what(0) { }Message(int what) : what(what) { }/* The message type. (interpretation is left up to the handler) */int what;};/** * Interface for a Looper message handler. * * The Looper holds a strong reference to the message handler whenever it has * a message to deliver to it. Make sure to call Looper::removeMessages * to remove any pending messages destined for the handler so that the handler * can be destroyed. */class MessageHandler : public virtual RefBase {protected:virtual ~MessageHandler() { }public:/** * Handles a message. */virtual void handleMessage(const Message& message) = 0;};
2.5 c层addFd 我们也可以在Looper.cpp的addFd中增加fd放入线程epoll中,当fd有数据来我们也可以处理相应的数据,下面我们先来看下addFd函数,我们注意其中有一个callBack回调
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);}int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {#if DEBUG_CALLBACKSALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,events, callback.get(), data);#endifif (!callback.get()) {if (! mAllowNonCallbacks) {ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");return -1;}if (ident < 0) {ALOGE("Invalid attempt to set NULL callback with ident < 0.");return -1;}} else {ident = POLL_CALLBACK;}{ // acquire lockAutoMutex _l(mLock);Request request;request.fd = fd;request.ident = ident;request.events = events;request.seq = mNextRequestSeq++;request.callback = callback;request.data = data;if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1struct epoll_event eventItem;request.initEventItem(&eventItem);ssize_t requestIndex = mRequests.indexOfKey(fd);if (requestIndex < 0) {int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);//加入epollif (epollResult < 0) {ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);return -1;}mRequests.add(fd, request);//放入mRequests中} else {int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);//更新if (epollResult < 0) {if (errno == ENOENT) {// Tolerate ENOENT because it means that an older file descriptor was// closed before its callback was unregistered and meanwhile a new// file descriptor with the same number has been created and is now// being registered for the first time. This error may occur naturally// when a callback has the side-effect of closing the file descriptor// before returning and unregistering itself. Callback sequence number// checks further ensure that the race is benign.//// Unfortunately due to kernel limitations we need to rebuild the epoll// set from scratch because it may contain an old file handle that we are// now unable to remove since its file descriptor is no longer valid.// No such problem would have occurred if we were using the poll system// call instead, but that approach carries others disadvantages.#if DEBUG_CALLBACKSALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor ""being recycled, falling back on EPOLL_CTL_ADD, errno=%d",this, errno);#endifepollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);if (epollResult < 0) {ALOGE("Error modifying or adding epoll events for fd %d, errno=%d",fd, errno);return -1;}scheduleEpollRebuildLocked();} else {ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);return -1;}}mRequests.replaceValueAt(requestIndex, request);}} // release lockreturn 1;}
在pollOnce函数中,我们先寻找mRequests中匹配的fd,然后在pushResponse中新建一个Response,然后把Response和Request匹配起来。
} else {ssize_t requestIndex = mRequests.indexOfKey(fd);if (requestIndex >= 0) {int events = 0;if (epollEvents & EPOLLIN) events |= EVENT_INPUT;if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;if (epollEvents & EPOLLERR) events |= EVENT_ERROR;if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;pushResponse(events, mRequests.valueAt(requestIndex));} else {ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is ""no longer registered.", epollEvents, fd);}}
下面我们就会遍历mResponses中的Response,然后调用其request中的回调
for (size_t i = 0; i < mResponses.size(); i++) {Response& response = mResponses.editItemAt(i);if (response.request.ident == POLL_CALLBACK) {int fd = response.request.fd;int events = response.events;void* data = response.request.data;#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKSALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",this, response.request.callback.get(), fd, events, data);#endif// Invoke the callback. Note that the file descriptor may be closed by// the callback (and potentially even reused) before the function returns so// we need to be a little careful when removing the file descriptor afterwards.int callbackResult = response.request.callback->handleEvent(fd, events, data);if (callbackResult == 0) {removeFd(fd, response.request.seq);}// Clear the callback reference in the response structure promptly because we// will not clear the response vector itself until the next poll.response.request.callback.clear();result = POLL_CALLBACK;}}
同样我们再来看看Looper_test.cpp是如何使用的?
Pipe pipe;StubCallbackHandler handler(true);handler.setCallback(mLooper, pipe.receiveFd, Looper::EVENT_INPUT);
我们看下handler的setCallback函数
class CallbackHandler {public:void setCallback(const sp<Looper>& looper, int fd, int events) {looper->addFd(fd, 0, events, staticHandler, this);//就是调用了looper的addFd函数,并且回调}protected:virtual ~CallbackHandler() { }virtual int handler(int fd, int events) = 0;private:static int staticHandler(int fd, int events, void* data) {//这个就是回调函数return static_cast<CallbackHandler*>(data)->handler(fd, events);}};class StubCallbackHandler : public CallbackHandler {public:int nextResult;int callbackCount;int fd;int events;StubCallbackHandler(int nextResult) : nextResult(nextResult),callbackCount(0), fd(-1), events(-1) {}protected:virtual int handler(int fd, int events) {//这个是通过回调函数再调到这里的callbackCount += 1;this->fd = fd;this->events = events;return nextResult;}};
我们结合Looper的addFd一起来看,当callback是有的,我们新建一个SimpleLooperCallback
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);}
这里的Looper_callbackFunc是一个typedef
typedef int (*Looper_callbackFunc)(int fd, int events, void* data);
我们再来看SimpleLooperCallback
class SimpleLooperCallback : public LooperCallback {protected:virtual ~SimpleLooperCallback();public:SimpleLooperCallback(Looper_callbackFunc callback);virtual int handleEvent(int fd, int events, void* data);private:Looper_callbackFunc mCallback;};SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunc callback) :mCallback(callback) {}SimpleLooperCallback::~SimpleLooperCallback() {}int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {return mCallback(fd, events, data);}
最后我们是调用callback->handleEvent(fd, events, data),而callback就是SimpleLooperCallback,这里的data,之前传进来的就是CallbackHandler 的this指针
因此最后就是调用了staticHandler,而data->handler,就是this->handler,最后是虚函数就调用到了StubCallbackHandler 的handler函数中了。
当然我们也可以不用这么复杂,直接使用第二个addFd函数,当然callBack我们需要自己定义一个类来实现LooperCallBack类就行了,这样就简单多了。
int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);
2.6 java层addFd 一直以为只能在c层的Looper中才能addFd,原来在java层也通过jni做了这个功能。
我们可以在MessageQueue中的addOnFileDescriptorEventListener来实现这个功能
public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd,@OnFileDescriptorEventListener.Events int events,@NonNull OnFileDescriptorEventListener listener) {if (fd == null) {throw new IllegalArgumentException("fd must not be null");}if (listener == null) {throw new IllegalArgumentException("listener must not be null");}synchronized (this) {updateOnFileDescriptorEventListenerLocked(fd, events, listener);}}
我们再来看看OnFileDescriptorEventListener 这个回调
public interface OnFileDescriptorEventListener {public static final int EVENT_INPUT = 1 << 0;public static final int EVENT_OUTPUT = 1 << 1;public static final int EVENT_ERROR = 1 << 2;/** @hide */@Retention(RetentionPolicy.SOURCE)@IntDef(flag=true, value={EVENT_INPUT, EVENT_OUTPUT, EVENT_ERROR})public @interface Events {}@Events int onFileDescriptorEvents(@NonNull FileDescriptor fd, @Events int events);}
接着调用了updateOnFileDescriptorEventListenerLocked函数
private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events,OnFileDescriptorEventListener listener) {final int fdNum = fd.getInt$();int index = -1;FileDescriptorRecord record = null;if (mFileDescriptorRecords != null) {index = mFileDescriptorRecords.indexOfKey(fdNum);if (index >= 0) {record = mFileDescriptorRecords.valueAt(index);if (record != null && record.mEvents == events) {return;}}}if (events != 0) {events |= OnFileDescriptorEventListener.EVENT_ERROR;if (record == null) {if (mFileDescriptorRecords == null) {mFileDescriptorRecords = new SparseArray<FileDescriptorRecord>();}record = new FileDescriptorRecord(fd, events, listener);//fd保存在FileDescriptorRecord对象mFileDescriptorRecords.put(fdNum, record);//mFileDescriptorRecords然后保存在} else {record.mListener = listener;record.mEvents = events;record.mSeq += 1;}nativeSetFileDescriptorEvents(mPtr, fdNum, events);//调用native函数} else if (record != null) {record.mEvents = 0;mFileDescriptorRecords.removeAt(index);}}
native最后调用了NativeMessageQueue的setFileDescriptorEvents函数
static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz,jlong ptr, jint fd, jint events) {NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->setFileDescriptorEvents(fd, events);}
setFileDescriptorEvents函数,这个addFd就是调用的第二个addFd,因此我们可以肯定NativeMessageQueue继承了LooperCallback
void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {if (events) {int looperEvents = 0;if (events & CALLBACK_EVENT_INPUT) {looperEvents |= Looper::EVENT_INPUT;}if (events & CALLBACK_EVENT_OUTPUT) {looperEvents |= Looper::EVENT_OUTPUT;}mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,reinterpret_cast<void*>(events));} else {mLooper->removeFd(fd);}}
果然是,需要实现handleEvent函数
class NativeMessageQueue : public MessageQueue, public LooperCallback {public:NativeMessageQueue();virtual ~NativeMessageQueue();virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);void wake();void setFileDescriptorEvents(int fd, int events);virtual int handleEvent(int fd, int events, void* data);
handleEvent就是在looper中epoll_wait之后,当我们增加的fd有数据就会调用这个函数
int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {int events = 0;if (looperEvents & Looper::EVENT_INPUT) {events |= CALLBACK_EVENT_INPUT;}if (looperEvents & Looper::EVENT_OUTPUT) {events |= CALLBACK_EVENT_OUTPUT;}if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) {events |= CALLBACK_EVENT_ERROR;}int oldWatchedEvents = reinterpret_cast<intptr_t>(data);int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,gMessageQueueClassInfo.dispatchEvents, fd, events); //调用回调if (!newWatchedEvents) {return 0; // unregister the fd}if (newWatchedEvents != oldWatchedEvents) {setFileDescriptorEvents(fd, newWatchedEvents);}return 1;}
最后在java的MessageQueue中的dispatchEvents就是在jni层反调过来的,然后调用之前注册的回调函数
// Called from native code.private int dispatchEvents(int fd, int events) {// Get the file descriptor record and any state that might change.final FileDescriptorRecord record;final int oldWatchedEvents;final OnFileDescriptorEventListener listener;final int seq;synchronized (this) {record = mFileDescriptorRecords.get(fd);//通过fd得到FileDescriptorRecord if (record == null) {return 0; // spurious, no listener registered}oldWatchedEvents = record.mEvents;events &= oldWatchedEvents; // filter events based on current watched setif (events == 0) {return oldWatchedEvents; // spurious, watched events changed}listener = record.mListener;seq = record.mSeq;}// Invoke the listener outside of the lock.int newWatchedEvents = listener.onFileDescriptorEvents(//listener回调record.mDescriptor, events);if (newWatchedEvents != 0) {newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR;}// Update the file descriptor record if the listener changed the set of// events to watch and the listener itself hasn"t been updated since.if (newWatchedEvents != oldWatchedEvents) {synchronized (this) {int index = mFileDescriptorRecords.indexOfKey(fd);if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record&& record.mSeq == seq) {record.mEvents = newWatchedEvents;if (newWatchedEvents == 0) {mFileDescriptorRecords.removeAt(index);}}}}// Return the new set of events to watch for native code to take care of.return newWatchedEvents;}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。