From 16e9aaa25cc9f76a06ecee45a36b48fdc1137bd0 Mon Sep 17 00:00:00 2001 From: Lingyu Zhu Date: Wed, 23 Jan 2019 19:41:43 +0800 Subject: [PATCH] fix issue#41 introduced isResourceAllocated and isError. Support reconnect. --- .../ibm/disni/RdmaActiveEndpointGroup.java | 10 ++- src/main/java/com/ibm/disni/RdmaEndpoint.java | 80 ++++++++++--------- 2 files changed, 49 insertions(+), 41 deletions(-) diff --git a/src/main/java/com/ibm/disni/RdmaActiveEndpointGroup.java b/src/main/java/com/ibm/disni/RdmaActiveEndpointGroup.java index 7c4eb3f5..d276c0df 100644 --- a/src/main/java/com/ibm/disni/RdmaActiveEndpointGroup.java +++ b/src/main/java/com/ibm/disni/RdmaActiveEndpointGroup.java @@ -100,7 +100,7 @@ public IbvQP createQpProvider(C endpoint) throws IOException{ public void allocateResources(C endpoint) throws Exception { endpoint.allocateResources(); - } + } public void close() throws IOException, InterruptedException { super.close(); @@ -110,9 +110,11 @@ public void close() throws IOException, InterruptedException { } void close(RdmaEndpoint endpoint) throws IOException { - IbvContext context = endpoint.getIdPriv().getVerbs(); - RdmaActiveCqProcessor cqProcessor = cqMap.get(context.getCmd_fd()); - cqProcessor.unregister(endpoint); + if (endpoint.isResouceAllocated()) { + IbvContext context = endpoint.getIdPriv().getVerbs(); + RdmaActiveCqProcessor cqProcessor = cqMap.get(context.getCmd_fd()); + cqProcessor.unregister(endpoint); + } } public int getMaxWR() { diff --git a/src/main/java/com/ibm/disni/RdmaEndpoint.java b/src/main/java/com/ibm/disni/RdmaEndpoint.java index 4c2e61d9..4e56e237 100644 --- a/src/main/java/com/ibm/disni/RdmaEndpoint.java +++ b/src/main/java/com/ibm/disni/RdmaEndpoint.java @@ -68,13 +68,15 @@ public class RdmaEndpoint { private boolean isClosed; private boolean isInitialized; private boolean serverSide; - + private boolean isError; + protected RdmaEndpoint(RdmaEndpointGroup group, RdmaCmId idPriv, boolean serverSide) throws IOException{ this.endpointId = group.getNextId(); this.group = group; this.idPriv = idPriv; - this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; - + this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ; + this.isError = false; + this.qp = null; this.pd = null; this.cqProcessor = null; @@ -84,7 +86,7 @@ protected RdmaEndpoint(RdmaEndpointGroup group, RdmaCmId this.serverSide = serverSide; logger.info("new client endpoint, id " + endpointId + ", idPriv " + idPriv.getPs()); } - + /** /** /** @@ -94,41 +96,44 @@ protected RdmaEndpoint(RdmaEndpointGroup group, RdmaCmId * @param timeout connection timeout */ public synchronized void connect(SocketAddress dst, int timeout) throws Exception { - if (connState != CONN_STATE_INITIALIZED) { + if (connState != CONN_STATE_INITIALIZED && !isError) { throw new IOException("endpoint already connected"); } - idPriv.resolveAddr(null, dst, timeout); - while(connState < CONN_STATE_ADDR_RESOLVED){ - wait(); + isError = false; + if (connState < CONN_STATE_ADDR_RESOLVED) { + idPriv.resolveAddr(null, dst, timeout); + while (connState < CONN_STATE_ADDR_RESOLVED && !isError) { + wait(); + } + if (isError) + throw new IOException("resolve address failed"); } - if (connState != CONN_STATE_ADDR_RESOLVED){ - throw new IOException("resolve address failed"); + if (connState < CONN_STATE_ROUTE_RESOLVED) { + idPriv.resolveRoute(timeout); + while (connState < CONN_STATE_ROUTE_RESOLVED && !isError) { + wait(); + } + if (isError) + throw new IOException("resolve route failed"); } - - idPriv.resolveRoute(timeout); - while(connState < CONN_STATE_ROUTE_RESOLVED){ - wait(); + if (connState < CONN_STATE_RESOURCES_ALLOCATED) { + group.allocateResourcesRaw(this); + while (connState < CONN_STATE_RESOURCES_ALLOCATED && !isError) { + wait(); + } + if (isError) + throw new IOException("allocate resource failed"); } - if (connState != CONN_STATE_ROUTE_RESOLVED){ - throw new IOException("resolve route failed"); - } - - group.allocateResourcesRaw(this); - while(connState < CONN_STATE_RESOURCES_ALLOCATED){ - wait(); - } - if (connState != CONN_STATE_RESOURCES_ALLOCATED){ - throw new IOException("resources allocation failed"); - } - RdmaConnParam connParam = getConnParam(); idPriv.connect(connParam); - - while(connState < CONN_STATE_CONNECTED){ + + while(connState < CONN_STATE_CONNECTED && !isError){ wait(); - } - } - + } + if (isError) + throw new IOException("idPriv connect failed"); + } + /* (non-Javadoc) * @see com.ibm.jverbs.endpoints.ICmConsumer#dispatchCmEvent(com.ibm.jverbs.cm.RdmaCmEvent) */ @@ -138,28 +143,26 @@ public synchronized void dispatchCmEvent(RdmaCmEvent cmEvent) int eventType = cmEvent.getEvent(); if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_ADDR_RESOLVED.ordinal()) { connState = RdmaEndpoint.CONN_STATE_ADDR_RESOLVED; - notifyAll(); } else if (cmEvent.getEvent() == RdmaCmEvent.EventType.RDMA_CM_EVENT_ROUTE_RESOLVED.ordinal()) { connState = RdmaEndpoint.CONN_STATE_ROUTE_RESOLVED; - notifyAll(); } else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_ESTABLISHED.ordinal()) { logger.info("got event type + RDMA_CM_EVENT_ESTABLISHED, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr()); connState = CONN_STATE_CONNECTED; - notifyAll(); } else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_DISCONNECTED.ordinal()) { logger.info("got event type + RDMA_CM_EVENT_DISCONNECTED, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr()); connState = CONN_STATE_CLOSED; - notifyAll(); } else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_CONNECT_REQUEST.ordinal()) { logger.info("got event type + RDMA_CM_EVENT_CONNECT_REQUEST, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr()); } else { logger.info("got event type + UNKNOWN, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr()); + isError = true; } + notifyAll(); } catch (Exception e) { throw new IOException(e); } } - + public final synchronized void allocateResources() throws IOException { if (!isInitialized) { this.pd = group.createProtectionDomainRaw(this); @@ -200,7 +203,7 @@ public synchronized void close() throws IOException, InterruptedException { if (isClosed){ return; } - + logger.info("closing client endpoint"); if (connState == CONN_STATE_CONNECTED) { idPriv.disconnect(); @@ -223,6 +226,9 @@ public synchronized void close() throws IOException, InterruptedException { public synchronized boolean isConnected() { return (connState == CONN_STATE_CONNECTED); } + public synchronized boolean isResouceAllocated() { + return (connState == CONN_STATE_RESOURCES_ALLOCATED); + } /** * Checks if the endpoint is closed.