Skip to content

Latest commit

 

History

History
504 lines (335 loc) · 16.4 KB

README_EN.md

File metadata and controls

504 lines (335 loc) · 16.4 KB

The ops Module

Starting from DolphinDB 1.30.19/2.00.7, you can use the "ops" module to perform database maintenance tasks such as canceling running jobs in a cluster, viewing disk usage and closing inactive sessions without having to write your own maintenance script.

1. Environment

The ops module is delivered with DolphinDB server 1.30.19/2.00.7 or higher. The module file ops.dos is under the directory server/modules.

You can also download the ops module here. Place the module file under the directory [home]/modules on the controller and data nodes in your cluster. The [home] directory is specified by the configuration parameter home, which you can check with the function getHomeDir().

For more information about DolphinDB modules, see tutorial: Modules.

2. Calling Module Functions

Import the ops module with the use keyword. There are 2 ways to call the module functions:

  • Refer to the module function directly
use ops 
getAllLicenses()
  • Specify the full namespace of the function

Use this option if other imported modules in the current session contain functions that have the same name.

use ops 
ops::getAllLicenses()

3. Function Reference

3.1 cancelJobEx

Syntax

cancelJobEx(id=NULL)

Arguments

  • id: a string indicating a background job ID, which you can get with the server function getRecentJobs().

Details

Cancels running background jobs in the cluster. If id is specified, cancel the specified job; otherwise, cancel all the background jobs in the cluster.

Example

Create 3 background jobs:

def testJob(n,id){
   for(i in 0:n){
        writeLog("demo"+id+"is working")
        sleep(1000)
   }
}
submitJob("demo1","test background job1",testJob,300,1);
submitJob("demo2","test background job2",testJob,300,2);
submitJob("demo3","test background job3",testJob,300,3);

Cancel the job “demo1“ and get the status of all background jobs on the data nodes and compute nodes:

 cancelJobEx("demo1") 
 pnodeRun(getRecentJobs)

The result shows that “demo1“ is marked as “The task was cancelled.“

node userID jobId rootJobId jobDesc priority parallelism receivedTime startTime endTime errorMsg
comnode1 admin demo1 45c4eb71-6812-2b83-814e-ed6b22a99964 test background job1 4 2 2022.08.29T17:20:47.061 2022.08.29T17:20:47.061 2022.08.29T17:22:15.081 testJob: sleep(1000) => The task was cancelled.
comnode1 admin demo2 1c16dfec-7c5a-92b3-414d-0cfbdc83b451 test background job2 4 2 2022.08.29T17:20:47.061 2022.08.29T17:20:47.062
comnode1 admin demo3 e9dffcc1-3194-9181-8d47-30a325774697 test background job3 4 2 2022.08.29T17:20:47.061 2022.08.29T17:20:47.062

To cancel all jobs, run the following script:

pnodeRun(getRecentJobs)

3.2 closeInactiveSessions

Syntax

closeInactiveSessions(hours=12)

Arguments

  • hours: a numeric value indicating the session timeout period (in hours). The default value is 12.

Return

Returns a table containing information on all active sessions in the cluster. The table has the same schema as the table returned by the server function getSessionMemoryStat.

Details

If a session has been inactive for a time period longer than the specified hours, it is considered as timed out. Call this function to close all inactive sessions.

Note: To check the last active time of a session, call server function getSessionMemoryStat.

Examples

getSessionMemoryStat()
userId sessionId memSize remoteIP remotePort createTime lastActiveTime
admin 1195587396 16 125.119.128.134 20252 2022.09.01T08:42:16.980 2022.09.01T08:45:23.808
guest 2333906441 16 115.239.209.122 37284 2022.09.01T06:39:05.530 2022.09.01T08:42:17.127
closeInactiveSessions(0.05)
userId sessionId memSize remoteIP remotePort createTime lastActiveTime node
admin 1195587396 16 125.119.128.134 20252 2022.09.01T08:42:16.980 2022.09.01T08:45:23.808 DFS_NODE1

3.3 getDDL

Syntax

getDDL(database, tableName)

Arguments

  • database: a string indicating the path to a distributed database, e.g., “dfs://demodb“.
  • tableName: a string indicating the name of a DFS table

Details

Returns the DDL statements that can be used to recreate the specified database and the DFS table, as well as the column names and column types of the DFS table.

Examples

ID=rand(10, n)
x=rand(1.0, n)
t=table(ID, x)
db=database("dfs://rangedb", RANGE,  0 5 10)
pt=db.createPartitionedTable(t, `pt, `ID)
getDDL("dfs://rangedb", "pt")
#output

db = database("dfs://rangedb")
colName = `ID`x
colType = [INT,DOUBLE]
tbSchema = table(1:0, colName, colType)
db.createPartitionedTable(table=tbSchema,tableName=`pt,partitionColumns=`ID)

3.4 getTableDiskUsage

Syntax

getTableDiskUsage(database, tableName, byNode=false)

Arguments

  • database: a string indicating the path to a distributed database, e.g., “dfs://demodb“.
  • tableName: a string indicating the name of a DFS table.
  • byNode: is a Boolean indicating whether to display disk usage by node. The default value is false, i.e., to display the total disk usage of all nodes.

Details

Returns a table displaying the disk usage of the specified DFS table. It has the following columns:

  • node: a string indicating a node alias. It is returned only when byNode = true.
  • diskGB: a DOUBLE value indicating the disk usage of the specified DFS table.

Examples

getTableDiskUsage("dfs://rangedb", "pt", true)
node diskGB
DFS_NODE1 0.008498

3.5 dropRecoveringPartitions

Syntax

dropRecoveringPartitions(dbPath , tableName="")

Arguments

  • dbPath:a string indicating the path to a distributed database, e.g., “dfs://demodb“.
  • tableName: a string indicating the name of a DFS table. Specify it only when the database chunk granularity is at TABLE level (i.e.,database: chunkGranularity = 'TABLE').

Details

Deletes the partitions in RECOVERING status from the specified database. tableName is a required parameter when the database chunk granularity is at TABLE level.

Example

First, get the metadata of all chunks in the cluster with the following server functions:

rpc(getControllerAlias(), getClusterChunksStatus)
chunkId file size version vcLength versionChain state replicas replicaCount lastUpdated permission
5c3bd88f-8a13-a382-2848-cb7c6e75d0fa /olapDemo/20200905/61_71/53R 0 2 3 19752:0:2:7460 -> 19506:0:1:7214 -> 19506:0:0:7214 -> RECOVERING DFS_NODE1:2:0:false:7494976728710525 1 2022.08.23T04:20:03.100 READ_WRITE
620526c7-6cf1-3c89-5444-de04f46aaa93 /olapDemo/20200904/51_61/53R 0 2 3 19746:0:2:7454 -> 19495:0:1:7203 -> 19495:0:0:7203 -> RECOVERING DFS_NODE1:2:0:false:7494976704543705 1 2022.08.23T04:20:02.564 READ_WRITE

The result suggests that both chunk files of the “olapDemo“ database are in RECOVERING status.

Execute dropRecoveringPartitions to force delete these two partitions:

dropRecoveringPartitions(database("dfs://olapDemo"));

3.6 getAllLicenses

Syntax

getAllLicenses()

Arguments

None

Details

Returns a table displaying the license expiration date of all nodes in the cluster. It has the following columns:

  • nodeAlias: a string indicating a node alias.
  • endDate: a date indicating the expiration date.

Examples

getAllLicenses()
nodeAlias endDate
DFS_NODE1 2042.01.01
ctl18920 2042.01.01
agent 2042.01.01

3.7 updateAllLicenses

Syntax

updateAllLicenses()

Arguments

None

Return

Returns a table displaying the license expiration date of all nodes in the cluster. It has the following columns:

  • nodeAlias: a string indicating a node alias.
  • endDate: a date indicating the expiration date.

Details

Note: Execute this function after you have replaced the license files on the nodes.

Updates the license on all nodes in a cluster without a reboot. Return license expiration information.

Example

updateAllLicenses()
nodeAlias endDate
DFS_NODE1 2042.01.01
ctl18920 2042.01.01
agent 2042.01.01

3.8 unsubscribeAll

Syntax

unsubscribeAll()

Arguments

None

Details

Cancels all subscriptions on the current node.

Examples

share streamTable(10:0, `id`val, [INT, INT]) as st
t = table(10:0, `id`val, [INT, INT])
subscribeTable(tableName=`st, actionName=`sub_st, handler=append!{t})
undef(st, SHARED)
#error
All subscriptions to the shared stream table [st] must be cancelled before it can be undefined.

unsubscribeAll()
undef(st, SHARED)

3.9 gatherClusterPerf

Syntax

gatherClusterPerf(monitoringPeriod=60, scrapeInterval=15, dir="/tmp")

Arguments

  • monitoringPeriod: an integer indicating the time frame (in seconds) of monitoring. The default value is 60.
  • scrapeInterval: the interval (in seconds) at which the monitoring metrics are scraped. The default value is 15.
  • dir: a string indicating an existing directory to save the monitoring result. The default value is “/tmp“.

Note: For a Windows system, use absolute-path and forward slashes (“/”) or double backslashes (\) to separate the directories.

Details

Gets cluster performance monitoring metrics based on the specified monitoring period and scrape interval. Exports the result to the specified directory in a statis.csv file. For more information about the monitoring metrics, see server function getClusterPerf.

Examples

gatherClusterPerf(30, 3, "/tmp") 
// check the result in /tmp/statis.csv after 30 seconds

3.10 gatherStreamingStat

Syntax

gatherStreamingStat(subNode, monitoringPeriod=60, scrapeInterval=15, dir="/tmp")

Arguments

  • subNode: a string indicating the alias of a subscriber node.
  • monitoringPeriod: an integer indicating the timeframe (in seconds) of the monitoring. The default value is 60.
  • scrapeInterval: the interval (in seconds) at which the monitoring metrics are scraped. The default value is 15.
  • dir: a string indicating an existing directory to save the monitoring result. The default value is “/tmp“.

Note: For a Windows system, use absolute-path and forward slashes (“/”) or double backslashes (\) to separate the directories.

Details

Gets the status of workers on the a subscriber node based on the specified monitoring period and scrape interval. Export the result to the specified directory in a sub_worker_statis.csv file. For more information about the monitoring metrics, see server function getStreamingStat.

Examples

gatherStreamingStat("subNode",30, 3, "/tmp") 
// check the result in /tmp/sub_worker_statis.csv after 30 seconds

3.11 getDifferentData

Syntax

getDifferentData(t1, t2)

Arguments

  • t1 / t2: a handle to an in-memory table.

Details

Checks if the values of t1 and t2 are identical by calling server function eqObj. t1 and t2 must be of the same size.

Return

Returns the rows that are different in the two specified tables; otherwise, print “Both tables are identical“.

Examples

t1=table(1 2 3 as id, 4 5 6 as val)
t2=table(1 8 9 as id, 4 8 9 as val)
t3=table(1 2 3 as id, 4 5 6 as val)
for (row in getDifferentData(t1, t2))
  print row
#output
id val
-- ---
2  5
3  6
id val
-- ---
8  8
9  9

getDifferentData(t1, t3)
#output
Both tables are identical

3.12 checkChunkReplicas

Syntax

checkChunkReplicas(dbName, tableName, targetChunkId)

Arguments

  • dbName: a string indicating the path to a distributed database, e.g., “dfs://demodb“.
  • tableName: a string indicating the name of a DFS table.
  • targetChunkId: a string indicating a chunk ID which you can get with server function getTabletsMeta.

Details

Checks if the two replicas of the specified chunk are identical. This function is available only when the configuration parameter dfsReplicationFactor is set to 2 on the controller.

Return

A Boolean indicating whether the data of two chunk replicas are identical.

Examples

n=1000000
ID=rand(10, n)
x=rand(1.0, n)
t=table(ID, x)
db=database("dfs://rangedb", RANGE,  0 5 10)
pt=db.createPartitionedTable(t, `pt, `ID)
pt.append!(t)
checkChunkReplicas("dfs://rangedb", "pt", "af8268f0-151e-c18b-a84c-a77560b721e6")
#output
true

Stop a data node with the kill -9 PID command:

pt.append!(t)
checkChunkReplicas("dfs://rangedb", "pt", "af8268f0-151e-c18b-a84c-a77560b721e6")// get the chunk ID with getTabletsMeta()
#output
checkChunkReplicas: throw "colFiles on two replicas are not same" => colFiles on two replicas are not same

Reboot the data node. When the chunk recovery is complete, execute checkChunkReplicas() again:

checkChunkReplicas("dfs://rangedb", "pt", "af8268f0-151e-c18b-a84c-a77560b721e6") // chunk ID can be checked with getTabletsMeta() 
#output
true