Client RPC API tutorial

In this tutorial we will build a simple command line utility that connects to a node, creates some Cash transactions and meanwhile dumps the transaction graph to the standard output. We will then put some simple visualisation on top. For an explanation on how the RPC works see Client RPC.

We start off by connecting to the node itself. For the purposes of the tutorial we will use the Driver to start up a notary and a node that issues/exits and moves Cash around for herself. To authenticate we will use the certificates of the nodes directly.

Note how we configure the node to create a user that has permission to start the CashFlow.

enum class PrintOrVisualise {
    Print,
    Visualise
}

fun main(args: Array<String>) {
    require(args.isNotEmpty()) { "Usage: <binary> [Print|Visualise]" }
    val printOrVisualise = PrintOrVisualise.valueOf(args[0])

    val baseDirectory = Paths.get("build/rpc-api-tutorial")
    val user = User("user", "password", permissions = setOf(startFlowPermission<CashIssueFlow>(),
            startFlowPermission<CashPaymentFlow>(),
            startFlowPermission<CashExitFlow>()))

    driver(driverDirectory = baseDirectory) {
        startNode(DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type)))
        val node = startNode(ALICE.name, rpcUsers = listOf(user)).get()

Now we can connect to the node itself using a valid RPC login. We login using the configured user.

        val client = node.rpcClientToNode()
        client.start("user", "password")
        val proxy = client.proxy()

        thread {
            generateTransactions(proxy)
        }

We start generating transactions in a different thread (generateTransactions to be defined later) using proxy, which exposes the full RPC interface of the node:

    /**
     * Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
     * to be present.
     */
    override val protocolVersion: Int get() = nodeIdentity().platformVersion

    /**
     * Returns a pair of currently in-progress state machine infos and an observable of future state machine adds/removes.
     */
    @RPCReturnsObservables
    fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>>

    /**
     * Returns a pair of head states in the vault and an observable of future updates to the vault.
     */
    @RPCReturnsObservables
    fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>>

    /**
     * Returns a pair of all recorded transactions and an observable of future recorded ones.
     */
    @RPCReturnsObservables
    fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>>

    /**
     * Returns a snapshot list of existing state machine id - recorded transaction hash mappings, and a stream of future
     * such mappings as well.
     */
    @RPCReturnsObservables
    fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>>

    /**
     * Returns all parties currently visible on the network with their advertised services and an observable of future updates to the network.
     */
    @RPCReturnsObservables
    fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>>

    /**
     * Start the given flow with the given arguments.
     */
    @RPCReturnsObservables
    fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T>

    /**
     * Start the given flow with the given arguments, returning an [Observable] with a single observation of the
     * result of running the flow.
     */
    @RPCReturnsObservables
    fun <T : Any> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T>

    /**
     * Returns Node's identity, assuming this will not change while the node is running.
     */
    fun nodeIdentity(): NodeInfo

    /*
     * Add note(s) to an existing Vault transaction
     */
    fun addVaultTransactionNote(txnId: SecureHash, txnNote: String)

    /*
     * Retrieve existing note(s) for a given Vault transaction
     */
    fun getVaultTransactionNotes(txnId: SecureHash): Iterable<String>

    /*
     * Returns a map of how much cash we have in each currency, ignoring details like issuer. Note: currencies for
     * which we have no cash evaluate to null (not present in map), not 0.
     */
    fun getCashBalances(): Map<Currency, Amount<Currency>>

    /**
     * Checks whether an attachment with the given hash is stored on the node.
     */
    fun attachmentExists(id: SecureHash): Boolean

    /**
     * Download an attachment JAR by ID
     */
    fun openAttachment(id: SecureHash): InputStream

    /**
     * Uploads a jar to the node, returns it's hash.
     */
    fun uploadAttachment(jar: InputStream): SecureHash

    // TODO: Remove this from the interface
    @Deprecated("This service will be removed in a future milestone")
    fun uploadFile(dataType: String, name: String?, file: InputStream): String

    /**
     * Authorise a contract state upgrade.
     * This will store the upgrade authorisation in the vault, and will be queried by [ContractUpgradeFlow.Acceptor] during contract upgrade process.
     * Invoking this method indicate the node is willing to upgrade the [state] using the [upgradedContractClass].
     * This method will NOT initiate the upgrade process. To start the upgrade process, see [ContractUpgradeFlow.Instigator].
     */
    fun authoriseContractUpgrade(state: StateAndRef<*>, upgradedContractClass: Class<out UpgradedContract<*, *>>)

    /**
     * Authorise a contract state upgrade.
     * This will remove the upgrade authorisation from the vault.
     */
    fun deauthoriseContractUpgrade(state: StateAndRef<*>)

    /**
     * Returns the node's current time.
     */
    fun currentNodeTime(): Instant

    /**
     * Returns a [ListenableFuture] which completes when the node has registered wih the network map service. It can also
     * complete with an exception if it is unable to.
     */
    @RPCReturnsObservables
    fun waitUntilRegisteredWithNetworkMap(): ListenableFuture<Unit>

    // TODO These need rethinking. Instead of these direct calls we should have a way of replicating a subset of
    // the node's state locally and query that directly.
    /**
     * Returns the [Party] corresponding to the given key, if found.
     */
    fun partyFromKey(key: PublicKey): Party?

    /**
     * Returns the [Party] with the given name as it's [Party.name]
     */
    fun partyFromName(name: String): Party?

    /**
     * Returns the [Party] with the X.500 principal as it's [Party.name]
     */
    fun partyFromX500Name(x500Name: X500Name): Party?

    /** Enumerates the class names of the flows that this node knows about. */
    fun registeredFlows(): List<String>

Warning

This API is evolving and will continue to grow as new functionality and features added to Corda are made available to RPC clients.

The one we need in order to dump the transaction graph is verifiedTransactions. The type signature tells us that the RPC will return a list of transactions and an Observable stream. This is a general pattern, we query some data and the node will return the current snapshot and future updates done to it. Observables are described in further detail in Client RPC

        val (transactions: List<SignedTransaction>, futureTransactions: Observable<SignedTransaction>) = proxy.verifiedTransactions()

The graph will be defined by nodes and edges between them. Each node represents a transaction and edges represent output-input relations. For now let’s just print NODE <txhash> for the former and EDGE <txhash> <txhash> for the latter.

        when (printOrVisualise) {
            PrintOrVisualise.Print -> {
                futureTransactions.startWith(transactions).subscribe { transaction ->
                    println("NODE ${transaction.id}")
                    transaction.tx.inputs.forEach { input ->
                        println("EDGE ${input.txhash} ${transaction.id}")
                    }
                }
            }

Now we just need to create the transactions themselves!

fun generateTransactions(proxy: CordaRPCOps) {
    val (vault, vaultUpdates) = proxy.vaultAndUpdates()
    vaultUpdates.notUsed()
    var ownedQuantity = vault.fold(0L) { sum, state ->
        sum + (state.state.data as Cash.State).amount.quantity
    }
    val issueRef = OpaqueBytes.of(0)
    val (parties, partyUpdates) = proxy.networkMapUpdates()
    partyUpdates.notUsed()
    val notary = parties.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
    val me = proxy.nodeIdentity().legalIdentity
    while (true) {
        Thread.sleep(1000)
        val random = SplittableRandom()
        val n = random.nextDouble()
        if (ownedQuantity > 10000 && n > 0.8) {
            val quantity = Math.abs(random.nextLong()) % 2000
            proxy.startFlow(::CashExitFlow, Amount(quantity, USD), issueRef)
            ownedQuantity -= quantity
        } else if (ownedQuantity > 1000 && n < 0.7) {
            val quantity = Math.abs(random.nextLong() % Math.min(ownedQuantity, 2000))
            proxy.startFlow(::CashPaymentFlow, Amount(quantity, USD), me)
        } else {
            val quantity = Math.abs(random.nextLong() % 1000)
            proxy.startFlow(::CashIssueFlow, Amount(quantity, USD), issueRef, me, notary)
            ownedQuantity += quantity
        }
    }
}

We utilise several RPC functions here to query things like the notaries in the node cluster or our own vault. These RPC functions also return Observable objects so that the node can send us updated values. However, we don’t need updates here and so we mark these observables as notUsed. (As a rule, you should always either subscribe to an Observable or mark it as not used. Failing to do this will leak resources in the node.)

Then in a loop we generate randomly either an Issue, a Pay or an Exit transaction.

The RPC we need to initiate a Cash transaction is startFlowDynamic which may start an arbitrary flow, given sufficient permissions to do so. We won’t use this function directly, but rather a type-safe wrapper around it startFlow that type-checks the arguments for us.

Finally we have everything in place: we start a couple of nodes, connect to them, and start creating transactions while listening on successfully created ones, which are dumped to the console. We just need to run it!:

# Build the example
./gradlew docs/source/example-code:installDist
# Start it
./docs/source/example-code/build/install/docs/source/example-code/bin/client-rpc-tutorial Print

Now let’s try to visualise the transaction graph. We will use a graph drawing library called graphstream

            PrintOrVisualise.Visualise -> {
                val graph = MultiGraph("transactions")
                transactions.forEach { transaction ->
                    graph.addNode<Node>("${transaction.id}")
                }
                transactions.forEach { transaction ->
                    transaction.tx.inputs.forEach { ref ->
                        graph.addEdge<Edge>("$ref", "${ref.txhash}", "${transaction.id}")
                    }
                }
                futureTransactions.subscribe { transaction ->
                    graph.addNode<Node>("${transaction.id}")
                    transaction.tx.inputs.forEach { ref ->
                        graph.addEdge<Edge>("$ref", "${ref.txhash}", "${transaction.id}")
                    }
                }
                graph.display()
            }
        }
        waitForAllNodesToFinish()
    }

}

If we run the client with Visualise we should see a simple random graph being drawn as new transactions are being created.

Whitelisting classes from your CorDapp with the Corda node

As described in Client RPC, you have to whitelist any additional classes you add that are needed in RPC requests or responses with the Corda node. Here’s an example of both ways you can do this for a couple of example classes.

// Not annotated, so need to whitelist manually.
data class ExampleRPCValue(val foo: String)

// Annotated, so no need to whitelist manually.
@CordaSerializable
data class ExampleRPCValue2(val bar: Int)

class ExampleRPCCordaPluginRegistry : CordaPluginRegistry() {
    override fun customizeSerialization(custom: SerializationCustomization): Boolean {
        // Add classes like this.
        custom.addToWhitelist(ExampleRPCValue::class.java)
        // You should return true, otherwise your plugin will be ignored for registering classes with Kryo.
        return true
    }
}

See more on plugins in CorDapp basics.

Warning

We will be replacing the use of Kryo in the serialization framework and so additional changes here are likely.

Security

RPC credentials associated with a Client must match the permission set configured on the server Node. This refers to both authentication (username and password) and role-based authorisation (a permissioned set of RPC operations an authenticated user is entitled to run).

Note

Permissions are represented as String’s to allow RPC implementations to add their own permissioning. Currently the only permission type defined is StartFlow, which defines a list of whitelisted flows an authenticated use may execute. An administrator user (or a developer) may also be assigned the ALL permission, which grants access to any flow.

In the instructions above the server node permissions are configured programmatically in the driver code:

driver(driverDirectory = baseDirectory) {
    val user = User("user", "password", permissions = setOf(startFlowPermission<CashFlow>()))
    val node = startNode("CN=Alice Corp,O=Alice Corp,L=London,C=UK", rpcUsers = listOf(user)).get()

When starting a standalone node using a configuration file we must supply the RPC credentials as follows:

rpcUsers : [
    { username=user, password=password, permissions=[ StartFlow.net.corda.flows.CashFlow ] }
]

When using the gradle Cordformation plugin to configure and deploy a node you must supply the RPC credentials in a similar manner:

rpcUsers = [
        ['username' : "user",
         'password' : "password",
         'permissions' : ["StartFlow.net.corda.flows.CashFlow"]]
]

You can then deploy and launch the nodes (Notary and Alice) as follows:

# to create a set of configs and installs under ``docs/source/example-code/build/nodes`` run
./gradlew docs/source/example-code:deployNodes
# to open up two new terminals with the two nodes run
./docs/source/example-code/build/nodes/runnodes
# followed by the same commands as before:
./docs/source/example-code/build/install/docs/source/example-code/bin/client-rpc-tutorial Print
./docs/source/example-code/build/install/docs/source/example-code/bin/client-rpc-tutorial Visualise

See more on security in Secure coding guidelines, node configuration in Node configuration and Cordformation in CorDapp basics