`
wbj0110
  • 浏览: 1549065 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

深入浅出Zookeeper之四Create请求和处理

阅读更多

客户端接口

 

Java代码  收藏代码
  1. public String create(final String path, byte data[], List<ACL> acl,  
  2.             CreateMode createMode)  
  3.         throws KeeperException, InterruptedException  
  4.     {  
  5.         final String clientPath = path;  
  6.         PathUtils.validatePath(clientPath, createMode.isSequential());  
  7.   
  8.         final String serverPath = prependChroot(clientPath);  
  9.     //请求头  
  10.         RequestHeader h = new RequestHeader();  
  11.         h.setType(ZooDefs.OpCode.create);  
  12.     //请求体  
  13.         CreateRequest request = new CreateRequest();  
  14.     //CREATE请求需要server端响应  
  15.         CreateResponse response = new CreateResponse();  
  16.         request.setData(data);  
  17.     //node类型  
  18.         request.setFlags(createMode.toFlag());  
  19.         request.setPath(serverPath);  
  20.         if (acl != null && acl.size() == 0) {  
  21.             throw new KeeperException.InvalidACLException();  
  22.         }  
  23.         request.setAcl(acl);  
  24.     //同步提交  
  25.         ReplyHeader r = cnxn.submitRequest(h, request, response, null);  
  26.         //异常情况  
  27.     if (r.getErr() != 0) {  
  28.             throw KeeperException.create(KeeperException.Code.get(r.getErr()),  
  29.                     clientPath);  
  30.         }  
  31.     //真实路径,对于SEQUENTIAL NODE后面会加上序号  
  32.         if (cnxn.chrootPath == null) {  
  33.             return response.getPath();  
  34.         } else {  
  35.             return response.getPath().substring(cnxn.chrootPath.length());  
  36.         }  
  37.     }  

 请求提交过程和之前的exists一样,都是通过sendthread写出去,都会进入pengding队列等待server端返回。

 

    server端处理也是一样,变化的只是RequestProcessor的业务逻辑。也就是说transport层是通用的,变化的是上层的业务层。

    server端执行处理链,PrepRequestProcessor

 

Java代码  收藏代码
  1. switch (request.type) {  
  2.                 case OpCode.create:  
  3.         //反序列化的对象  
  4.                 CreateRequest createRequest = new CreateRequest();  
  5.         //zxid递增  
  6.                 pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);  
  7.                 break;  
  8.         ......  
  9.     request.zxid = zks.getZxid();  
  10.         nextProcessor.processRequest(request);  

 

 具体处理

 

Java代码  收藏代码
  1. protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)  
  2.         throws KeeperException, IOException, RequestProcessorException  
  3.     {  
  4.     //构造内部的事务头  
  5.         request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,  
  6.                                     zks.getTime(), type);  
  7.   
  8.         switch (type) {  
  9.             case OpCode.create:       
  10.         //检查session是否还有效  
  11.                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());  
  12.                 CreateRequest createRequest = (CreateRequest)record;    
  13.         //反序列化  
  14.                 if(deserialize)  
  15.                     ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);  
  16.                 String path = createRequest.getPath();  
  17.         //路径规则检查  
  18.                 int lastSlash = path.lastIndexOf('/');  
  19.                 if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {  
  20.                     LOG.info("Invalid path " + path + " with session 0x" +  
  21.                             Long.toHexString(request.sessionId));  
  22.                     throw new KeeperException.BadArgumentsException(path);  
  23.                 }  
  24.         //权限去重  
  25.                 List<ACL> listACL = removeDuplicates(createRequest.getAcl());  
  26.                 if (!fixupACL(request.authInfo, listACL)) {  
  27.                     throw new KeeperException.InvalidACLException(path);  
  28.                 }  
  29.                 String parentPath = path.substring(0, lastSlash);  
  30.         //父节点的ChangeRecord  
  31.                 ChangeRecord parentRecord = getRecordForPath(parentPath);  
  32.         //权限校验  
  33.                 checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,  
  34.                         request.authInfo);  
  35.                 int parentCVersion = parentRecord.stat.getCversion();  
  36.                 CreateMode createMode =  
  37.                     CreateMode.fromFlag(createRequest.getFlags());  
  38.             //SEQUENCE节点修改路径  
  39.                 if (createMode.isSequential()) {  
  40.                     path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);  
  41.                 }  
  42.                 try {  
  43.                     PathUtils.validatePath(path);  
  44.                 } catch(IllegalArgumentException ie) {  
  45.                     LOG.info("Invalid path " + path + " with session 0x" +  
  46.                             Long.toHexString(request.sessionId));  
  47.                     throw new KeeperException.BadArgumentsException(path);  
  48.                 }  
  49.         //当前节点ChangeRecord,一并校验节点是否已经存在  
  50.                 try {  
  51.                     if (getRecordForPath(path) != null) {  
  52.                         throw new KeeperException.NodeExistsException(path);  
  53.                     }  
  54.                 } catch (KeeperException.NoNodeException e) {  
  55.                     // ignore this one  
  56.                 }  
  57.         //EPHEMERAL节点不允许创建子节点  
  58.                 boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;  
  59.                 if (ephemeralParent) {  
  60.                     throw new KeeperException.NoChildrenForEphemeralsException(path);  
  61.                 }  
  62.         //递增Cversion  
  63.                 int newCversion = parentRecord.stat.getCversion()+1;  
  64.         //构造事务体,后续会被写入log  
  65.                 request.txn = new CreateTxn(path, createRequest.getData(),  
  66.                         listACL,  
  67.                         createMode.isEphemeral(), newCversion);  
  68.                 StatPersisted s = new StatPersisted();  
  69.         //如果是临时节点,则owner为sessionId  
  70.                 if (createMode.isEphemeral()) {  
  71.                     s.setEphemeralOwner(request.sessionId);  
  72.                 }  
  73.                 parentRecord = parentRecord.duplicate(request.hdr.getZxid());  
  74.         //递增parent的child数  
  75.                 parentRecord.childCount++;  
  76.         //递增parent的cversion  
  77.                 parentRecord.stat.setCversion(newCversion);  
  78.         //添加changeRecord到冗余队列  
  79.                 addChangeRecord(parentRecord);  
  80.                 addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,  
  81.                         0, listACL));  
  82.                 break;  
  83.         ......  

 SyncRequestProcessor处理,主要是log写入,和之前的分析类似,不赘述

 

FinalRequestProcessor处理,修改内存中的datatree结构

 

Java代码  收藏代码
  1. switch (header.getType()) {  
  2.                case OpCode.create:  
  3.                    CreateTxn createTxn = (CreateTxn) txn;  
  4.                    rc.path = createTxn.getPath();  
  5.                    createNode(  
  6.                            createTxn.getPath(),  
  7.                            createTxn.getData(),  
  8.                            createTxn.getAcl(),  
  9.                            createTxn.getEphemeral() ? header.getClientId() : 0,  
  10.                            createTxn.getParentCVersion(),  
  11.                            header.getZxid(), header.getTime());  
  12.                    break;  

 public String createNode(String path, byte data[], List<ACL> acl,

 

Java代码  收藏代码
  1.            long ephemeralOwner, int parentCVersion, long zxid, long time)  
  2.            throws KeeperException.NoNodeException,  
  3.            KeeperException.NodeExistsException {  
  4. //各种参数设置  
  5.        int lastSlash = path.lastIndexOf('/');  
  6.        String parentName = path.substring(0, lastSlash);  
  7.        String childName = path.substring(lastSlash + 1);  
  8.        StatPersisted stat = new StatPersisted();  
  9.        stat.setCtime(time);  
  10.        stat.setMtime(time);  
  11.        stat.setCzxid(zxid);  
  12.        stat.setMzxid(zxid);  
  13.        stat.setPzxid(zxid);  
  14.        stat.setVersion(0);  
  15.        stat.setAversion(0);  
  16.        stat.setEphemeralOwner(ephemeralOwner);  
  17.        DataNode parent = nodes.get(parentName);  
  18.        if (parent == null) {  
  19.            throw new KeeperException.NoNodeException();  
  20.        }  
  21. //修改parent节点内容  
  22.        synchronized (parent) {  
  23.            Set<String> children = parent.getChildren();  
  24.            if (children != null) {  
  25.                if (children.contains(childName)) {  
  26.                    throw new KeeperException.NodeExistsException();  
  27.                }  
  28.            }  
  29.              
  30.            if (parentCVersion == -1) {  
  31.                parentCVersion = parent.stat.getCversion();  
  32.                parentCVersion++;  
  33.            }      
  34.            parent.stat.setCversion(parentCVersion);  
  35.            parent.stat.setPzxid(zxid);  
  36.            Long longval = convertAcls(acl);  
  37.     //添加目标节点  
  38.            DataNode child = new DataNode(parent, data, longval, stat);  
  39.            parent.addChild(childName);  
  40.            nodes.put(path, child);  
  41.     //添加ephemeral类型节点  
  42.            if (ephemeralOwner != 0) {  
  43.                HashSet<String> list = ephemerals.get(ephemeralOwner);  
  44.                if (list == null) {  
  45.                    list = new HashSet<String>();  
  46.                    ephemerals.put(ephemeralOwner, list);  
  47.                }  
  48.                synchronized (list) {  
  49.                    list.add(path);  
  50.                }  
  51.            }  
  52.        }  
  53.        ......  
  54. //dataWatches和childWatches事件触发  
  55.        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);  
  56.        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,  
  57.                Event.EventType.NodeChildrenChanged);  
  58.        return path;  
  59.    }  

 事件触发过程

 

Java代码  收藏代码
  1. public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {  
  2.         WatchedEvent e = new WatchedEvent(type,  
  3.                 KeeperState.SyncConnected, path);  
  4.         HashSet<Watcher> watchers;  
  5.         synchronized (this) {  
  6.         //拿节点对应的watcher列表,只通知一次  
  7.             watchers = watchTable.remove(path);  
  8.             .......  
  9.             for (Watcher w : watchers) {  
  10.                 HashSet<String> paths = watch2Paths.get(w);  
  11.                 if (paths != null) {  
  12.                     paths.remove(path);  
  13.                 }  
  14.             }  
  15.         }  
  16.     //挨个通知  
  17.         for (Watcher w : watchers) {  
  18.             if (supress != null && supress.contains(w)) {  
  19.                 continue;  
  20.             }  
  21.             w.process(e);  
  22.         }  
  23.         return watchers;  
  24.     }  

 

 具体处理,NIOServerCncx

 

Java代码  收藏代码
  1.    synchronized public void process(WatchedEvent event) {  
  2. //事件通知的响应头  
  3.        ReplyHeader h = new ReplyHeader(-1, -1L, 0);  
  4.        ......  
  5.   
  6.        // Convert WatchedEvent to a type that can be sent over the wire  
  7.        WatcherEvent e = event.getWrapper();  
  8. //发送响应  
  9.        sendResponse(h, e, "notification");  
  10.    }  

 

 具体发送:

 

Java代码  收藏代码
  1. synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {  
  2.         try {  
  3.             ByteArrayOutputStream baos = new ByteArrayOutputStream();  
  4.             // Make space for length  
  5.             BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);  
  6.             try {  
  7.         //占位  
  8.                 baos.write(fourBytes);  
  9.         //序列化  
  10.                 bos.writeRecord(h, "header");  
  11.                 if (r != null) {  
  12.                     bos.writeRecord(r, tag);  
  13.                 }  
  14.                 baos.close();  
  15.             } catch (IOException e) {  
  16.                 LOG.error("Error serializing response");  
  17.             }  
  18.         //转换成byte数组  
  19.             byte b[] = baos.toByteArray();  
  20.             ByteBuffer bb = ByteBuffer.wrap(b);  
  21.         //最后写入长度  
  22.             bb.putInt(b.length - 4).rewind();  
  23.         //通过channel异步写  
  24.             sendBuffer(bb);  
  25.             if (h.getXid() > 0) {  
  26.                 synchronized(this){  
  27.                     outstandingRequests--;  
  28.                 }  
  29.                 // check throttling  
  30.                 synchronized (this.factory) {          
  31.                     if (zkServer.getInProcess() < outstandingLimit  
  32.                             || outstandingRequests < 1) {  
  33.                         sk.selector().wakeup();  
  34.                         enableRecv();  
  35.                     }  
  36.                 }  
  37.             }  
  38.          } catch(Exception e) {  
  39.             LOG.warn("Unexpected exception. Destruction averted.", e);  
  40.          }  
  41.     }  

 server端就处理完了,接下来client端SendThread收到响应

 

  if (replyHdr.getXid() == -1) {

Java代码  收藏代码
  1.               // -1 means notification  
  2.               if (LOG.isDebugEnabled()) {  
  3.                   LOG.debug("Got notification sessionid:0x"  
  4.                       + Long.toHexString(sessionId));  
  5.               }  
  6. //反序列化  
  7.               WatcherEvent event = new WatcherEvent();  
  8.               event.deserialize(bbia, "response");  
  9.   
  10.               // convert from a server path to a client path  
  11. //路径切换,如果client用了相对路径的话  
  12.               if (chrootPath != null) {  
  13.                   String serverPath = event.getPath();  
  14.                   if(serverPath.compareTo(chrootPath)==0)  
  15.                       event.setPath("/");  
  16.                   else if (serverPath.length() > chrootPath.length())  
  17.                       event.setPath(serverPath.substring(chrootPath.length()));  
  18.                   else {  
  19.                     LOG.warn("Got server path " + event.getPath()  
  20.                             + " which is too short for chroot path "  
  21.                             + chrootPath);  
  22.                   }  
  23.               }  
  24. //通过eventThread派发事件,通知之前注册的watcher  
  25.               WatchedEvent we = new WatchedEvent(event);  
  26.               if (LOG.isDebugEnabled()) {  
  27.                   LOG.debug("Got " + we + " for sessionid 0x"  
  28.                           + Long.toHexString(sessionId));  
  29.               }  
  30.   
  31.               eventThread.queueEvent( we );  
  32.               return;  
  33.           }  

 eventThread端处理和之前类似

 

Java代码  收藏代码
  1. case NodeCreated:  
  2.     //通知dataWatcher,只一次  
  3.                synchronized (dataWatches) {  
  4.                    addTo(dataWatches.remove(clientPath), result);  
  5.                }  
  6.     //通知existWatcher,只一次  
  7.                synchronized (existWatches) {  
  8.                    addTo(existWatches.remove(clientPath), result);  
  9.                }  
  10.                break;  
Java代码  收藏代码
  1. if (event instanceof WatcherSetEventPair) {  
  2.                   // each watcher will process the event  
  3.                   WatcherSetEventPair pair = (WatcherSetEventPair) event;  
  4.             //挨个通知  
  5.                   for (Watcher watcher : pair.watchers) {  
  6.                       try {  
  7.                           watcher.process(pair.event);  
  8.                       } catch (Throwable t) {  
  9.                           LOG.error("Error while calling watcher ", t);  
  10.                       }  
  11.                   }  
  12.               }  

http://iwinit.iteye.com/blog/1759908

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics