Interacting with a node

Overview

To interact with your node, you need to write a client in a JVM-compatible language using the CordaRPCClient class. This class allows you to connect to your node via a message queue protocol and provides a simple RPC interface for interacting with the node. You make calls on a JVM object as normal, and the marshalling back-and-forth is handled for you.

Connecting to a node via RPC

To use CordaRPCClient, you must add net.corda:corda-rpc:$corda_release_version as a cordaCompile dependency in your client’s build.gradle file.

CordaRPCClient has a start method that takes the node’s RPC address and returns a CordaRPCConnection. CordaRPCConnection has a proxy method that takes an RPC username and password and returns a CordaRPCOps object that you can use to interact with the node.

Here is an example of using CordaRPCClient to connect to a node and log the current time on its internal clock:

import net.corda.client.rpc.CordaRPCClient
import net.corda.core.utilities.NetworkHostAndPort.Companion.parse
import net.corda.core.utilities.loggerFor
import org.slf4j.Logger

class ClientRpcExample {
    companion object {
        val logger: Logger = loggerFor<ClientRpcExample>()
    }

    fun main(args: Array<String>) {
        require(args.size == 3) { "Usage: TemplateClient <node address> <username> <password>" }
        val nodeAddress = parse(args[0])
        val username = args[1]
        val password = args[2]

        val client = CordaRPCClient(nodeAddress)
        val connection = client.start(username, password)
        val cordaRPCOperations = connection.proxy

        logger.info(cordaRPCOperations.currentNodeTime().toString())

        connection.notifyServerAndClose()
    }
}

import net.corda.client.rpc.CordaRPCClient;
import net.corda.client.rpc.CordaRPCConnection;
import net.corda.core.messaging.CordaRPCOps;
import net.corda.core.utilities.NetworkHostAndPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ClientRpcExample {
    private static final Logger logger = LoggerFactory.getLogger(ClientRpcExample.class);

    public static void main(String[] args) {
        if (args.length != 3) {
            throw new IllegalArgumentException("Usage: TemplateClient <node address> <username> <password>");
        }
        final NetworkHostAndPort nodeAddress = NetworkHostAndPort.parse(args[0]);
        String username = args[1];
        String password = args[2];

        final CordaRPCClient client = new CordaRPCClient(nodeAddress);
        final CordaRPCConnection connection = client.start(username, password);
        final CordaRPCOps cordaRPCOperations = connection.getProxy();

        logger.info(cordaRPCOperations.currentNodeTime().toString());

        connection.notifyServerAndClose();
    }
}

For further information on using the RPC API, see Using the client RPC API.

RPC permissions

For a node’s owner to interact with their node via RPC, they must define one or more RPC users. Each user is authenticated with a username and password, and is assigned a set of permissions that control which RPC operations they can perform. Permissions are not required to interact with the node via the shell, unless the shell is being accessed via SSH.

RPC users are created by adding them to the rpcUsers list in the node’s node.conf file:

rpcUsers=[
    {
        username=exampleUser
        password=examplePass
        permissions=[]
    }
    ...
]

By default, RPC users are not permissioned to perform any RPC operations.

Granting flow permissions

You provide an RPC user with the permission to start a specific flow using the syntax StartFlow.<fully qualified flow name>:

rpcUsers=[
    {
        username=exampleUser
        password=examplePass
        permissions=[
            "StartFlow.net.corda.flows.ExampleFlow1",
            "StartFlow.net.corda.flows.ExampleFlow2"
        ]
    }
    ...
]

You can also provide an RPC user with the permission to start any flow using the syntax InvokeRpc.startFlow:

rpcUsers=[
    {
        username=exampleUser
        password=examplePass
        permissions=[
            "InvokeRpc.startFlow"
        ]
    }
    ...
]

Granting other RPC permissions

You provide an RPC user with the permission to perform a specific RPC operation using the syntax InvokeRpc.<rpc method name>:

rpcUsers=[
    {
        username=exampleUser
        password=examplePass
        permissions=[
            "InvokeRpc.nodeInfo",
            "InvokeRpc.networkMapSnapshot"
        ]
    }
    ...
]

Granting all permissions

You can provide an RPC user with the permission to perform any RPC operation (including starting any flow) using the ALL permission:

rpcUsers=[
    {
        username=exampleUser
        password=examplePass
        permissions=[
            "ALL"
        ]
    }
    ...
]

RPC security management

Setting rpcUsers provides a simple way of granting RPC permissions to a fixed set of users, but has some obvious shortcomings. To support use cases aiming for higher security and flexibility, Corda offers additional security features such as:

  • Fetching users credentials and permissions from an external data source (e.g.: a remote RDBMS), with optional in-memory caching. In particular, this allows credentials and permissions to be updated externally without requiring nodes to be restarted.
  • Password stored in hash-encrypted form. This is regarded as must-have when security is a concern. Corda currently supports a flexible password hash format conforming to the Modular Crypt Format provided by the Apache Shiro framework

These features are controlled by a set of options nested in the security field of node.conf. The following example shows how to configure retrieval of users credentials and permissions from a remote database with passwords in hash-encrypted format and enable in-memory caching of users data:

security = {
    authService = {
        dataSource = {
            type = "DB",
            passwordEncryption = "SHIRO_1_CRYPT",
            connection = {
               jdbcUrl = "<jdbc connection string>"
               username = "<db username>"
               password = "<db user password>"
               driverClassName = "<JDBC driver>"
            }
        }
        options = {
             cache = {
                expireAfterSecs = 120
                maxEntries = 10000
             }
        }
    }
}

It is also possible to have a static list of users embedded in the security structure by specifying a dataSource of INMEMORY type:

security = {
    authService = {
        dataSource = {
            type = "INMEMORY",
            users = [
                {
                    username = "<username>",
                    password = "<password>",
                    permissions = ["<permission 1>", "<permission 2>", ...]
                },
                ...
            ]
        }
    }
}

Authentication/authorisation data

The dataSource structure defines the data provider supplying credentials and permissions for users. There exist two supported types of such data source, identified by the dataSource.type field:

  • INMEMORY: A static list of user credentials and permissions specified by the users field.

  • DB: An external RDBMS accessed via the JDBC connection described by connection. Note that, unlike the INMEMORY case, in a user database permissions are assigned to roles rather than individual users. The current implementation expects the database to store data according to the following schema:

  • Table users containing columns username and password. The username column must have unique values.

  • Table user_roles containing columns username and role_name associating a user to a set of roles.

  • Table roles_permissions containing columns role_name and permission associating a role to a set of permission strings.

Password encryption

Storing passwords in plain text is discouraged in applications where security is critical. Passwords are assumed to be in plain format by default, unless a different format is specified by the passwordEncryption field, like:

passwordEncryption = SHIRO_1_CRYPT

SHIRO_1_CRYPT identifies the Apache Shiro fully reversible Modular Crypt Format, it is currently the only non-plain password hash-encryption format supported. Hash-encrypted passwords in this format can be produced by using the Apache Shiro Hasher command line tool.

Caching user accounts data

A cache layer on top of the external data source of users credentials and permissions can significantly improve performances in some cases, with the disadvantage of causing a (controllable) delay in picking up updates to the underlying data. Caching is disabled by default, it can be enabled by defining the options.cache field in security.authService, for example:

options = {
     cache = {
        expireAfterSecs = 120
        maxEntries = 10000
     }
}

This will enable a non-persistent cache contained in the node’s memory with maximum number of entries set to maxEntries where entries are expired and refreshed after expireAfterSecs seconds.

Observables

The RPC system handles observables in a special way. When a method returns an observable, whether directly or as a sub-object of the response object graph, an observable is created on the client to match the one on the server. Objects emitted by the server-side observable are pushed onto a queue which is then drained by the client. The returned observable may even emit object graphs with even more observables in them, and it all works as you would expect.

This feature comes with a cost: the server must queue up objects emitted by the server-side observable until you download them. Note that the server side observation buffer is bounded, once it fills up the client is considered slow and will be disconnected. You are expected to subscribe to all the observables returned, otherwise client-side memory starts filling up as observations come in. If you don’t want an observable then subscribe then unsubscribe immediately to clear the client-side buffers and to stop the server from streaming. For Kotlin users there is a convenience extension method called notUsed() which can be called on an observable to automate this step.

If your app quits then server side resources will be freed automatically.

Futures

A method can also return a CordaFuture in its object graph and it will be treated in a similar manner to observables. Calling the cancel method on the future will unsubscribe it from any future value and release any resources.

Versioning

The client RPC protocol is versioned using the node’s platform version number (see Versioning). When a proxy is created the server is queried for its version, and you can specify your minimum requirement. Methods added in later versions are tagged with the @RPCSinceVersion annotation. If you try to use a method that the server isn’t advertising support of, an UnsupportedOperationException is thrown. If you want to know the version of the server, just use the protocolVersion property (i.e. getProtocolVersion in Java).

The RPC client library defaults to requiring the platform version it was built with. That means if you use the client library released as part of Corda N, then the node it connects to must be of version N or above. This is checked when the client first connects. If you want to override this behaviour, you can alter the minimumServerProtocolVersion field in the CordaRPCClientConfiguration object passed to the client. Alternatively, just link your app against an older version of the library.

Thread safety

A proxy is thread safe, blocking, and allows multiple RPCs to be in flight at once. Any observables that are returned and you subscribe to will have objects emitted in order on a background thread pool. Each Observable stream is tied to a single thread, however note that two separate Observables may invoke their respective callbacks on different threads.

Error handling

If something goes wrong with the RPC infrastructure itself, an RPCException is thrown. If you call a method that requires a higher version of the protocol than the server supports, UnsupportedOperationException is thrown. Otherwise the behaviour depends on the devMode node configuration option.

In devMode, if the server implementation throws an exception, that exception is serialised and rethrown on the client side as if it was thrown from inside the called RPC method. These exceptions can be caught as normal.

When not in devMode, the server will mask exceptions not meant for clients and return an InternalNodeException instead. This does not expose internal information to clients, strengthening privacy and security. CorDapps can have exceptions implement ClientRelevantError to allow them to reach RPC clients.

Connection management

It is possible to not be able to connect to the server on the first attempt. In that case, the CordaRPCClient.start() method will throw an exception. The following code snippet is an example of how to write a simple retry mechanism for such situations:

fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {
    val retryInterval = 5.seconds

    do {
        val connection = try {
            logger.info("Connecting to: $nodeHostAndPort")
            val client = CordaRPCClient(
                    nodeHostAndPort,
                    object : CordaRPCClientConfiguration {
                        override val connectionMaxRetryInterval = retryInterval
                    }
            )
            val _connection = client.start(username, password)
            // Check connection is truly operational before returning it.
            val nodeInfo = _connection.proxy.nodeInfo()
            require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
            _connection
        } catch(secEx: ActiveMQSecurityException) {
            // Happens when incorrect credentials provided - no point to retry connecting.
            throw secEx
        }
        catch(ex: RPCException) {
            // Deliberately not logging full stack trace as it will be full of internal stacktraces.
            logger.info("Exception upon establishing connection: " + ex.message)
            null
        }

        if(connection != null) {
            logger.info("Connection successfully established with: $nodeHostAndPort")
            return connection
        }
        // Could not connect this time round - pause before giving another try.
        Thread.sleep(retryInterval.toMillis())
    } while (connection == null)
}

After a successful connection, it is possible for the server to become unavailable. In this case, all RPC calls will throw an exception and created observables will no longer receive observations. Below is an example of how to reconnect and back-fill any data that might have been missed while the connection was down. This is done by using the onError handler on the Observable returned by CordaRPCOps.

fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
    val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
    val proxy = connection.proxy

    val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()

    val retryableStateMachineUpdatesSubscription: AtomicReference<Subscription?> = AtomicReference(null)
    val subscription: Subscription = stateMachineUpdatesRaw
            .startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) })
            .subscribe({ clientCode(it) /* Client code here */ }, {
                // Terminate subscription such that nothing gets past this point to downstream Observables.
                retryableStateMachineUpdatesSubscription.get()?.unsubscribe()
                // It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
                // client and a new connection, so no going back to this one. Also the server might be down, so we are
                // force closing the connection to avoid propagation of notification to the server side.
                connection.forceClose()
                // Perform re-connect.
                performRpcReconnect(nodeHostAndPort, username, password)
            })

    retryableStateMachineUpdatesSubscription.set(subscription)
}

In this code snippet it is possible to see that function performRpcReconnect creates an RPC connection and implements the error handler upon subscription to an Observable. The call to this onError handler will be made when failover happens then the code will terminate existing subscription, closes RPC connection and recursively calls performRpcReconnect which will re-subscribe once RPC connection comes back online.

Client code if fed with instances of StateMachineInfo using call clientCode(it). Upon re-connecting, this code receives all the items. Some of these items might have already been delivered to client code prior to failover occurred. It is down to client code in this case handle those duplicate items as appropriate.

Wire security

If TLS communications to the RPC endpoint are required the node should be configured with rpcSettings.useSSL=true see Node configuration. The node admin should then create a node specific RPC certificate and key, by running the node once with generate-rpc-ssl-settings command specified (see Node command-line options). The generated RPC TLS trust root certificate will be exported to a certificates/export/rpcssltruststore.jks file which should be distributed to the authorised RPC clients.

The connecting CordaRPCClient code must then use one of the constructors with a parameter of type ClientRpcSslOptions ( JavaDoc) and set this constructor argument with the appropriate path for the rpcssltruststore.jks file. The client connection will then use this to validate the RPC server handshake.

Note that RPC TLS does not use mutual authentication, and delegates fine grained user authentication and authorisation to the RPC security features detailed above.

Whitelisting classes with the Corda node

CorDapps must whitelist any classes used over RPC with Corda’s serialization framework, unless they are whitelisted by default in DefaultWhitelist. The whitelisting is done either via the plugin architecture or by using the @CordaSerializable annotation. See Object serialization. An example is shown in Using the client RPC API.