Indy와 ROS 연결하기
간단한 예제를 통하여 Indy 로봇을 ROS를 이용하여 움직이는 방법을 설명합니다.
요구 환경
위 요구 환경의 설치가 완료되지 않은 경우 설치를 먼저 진행하여 주십시오.
실행 방법
1. 터미널 열기
2. ROS 환경설정 확인하기
ROS 명령어를 정상적으로 사용하기 위하여 terminal 의 환경에 ROS 환경변수를 추가 할 필요가 있습니다.
아래의 명령어로 추가할 수 있으며, 설치에서 설명한
bashrc에 해당 명령어들이 추가되어 있을 경우, 이 과정을 생략 할 수 있습니다.
| source /opt/ros/<사용중인 ROS 버전>/setup.bash
source <작업중인 워크스페이스 경로>/devel/setup.bash
|
3. 아래 명령어를 실행하여 dcp driver와 rviz를 실행합니다. 192.168.x.x는 로봇의 IP이며 robot_name은 로봇의 이름입니다(RP2의 경우 NRMK-IndyRP2). 사용자의 환경에 맞는 적절한 값을 입력해야합니다.
| roslaunch indy_driver_py indy7_dcp.launch robot_ip:=192.168.x.x robot_name:=NRMK-Indy7
|
4. 별도의 터미널에서 app.py 파일을 실행합니다. 아래의 명령어로 실행 할 수 있습니다.
| rosrun indy7_dcp_example app.py
|
5. 0번째 관절부터 순서대로 목표 관절 위치를 입력합니다. 단위는 degree입니다.
Warning
실수로 인해 로봇이 충돌할 수 있습니다. 처음에는 0, 0, 0, 0, 0, 0를 입력하여 영 위치로 이동시키는것을 추천합니다.
6. 'g'를 입력하여 로봇이 움직임을 확인합니다.
indy_driver_py 패키지
패키지 구성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | ├── launch
│ ├── dcp.launch
│ ├── * indy7_dcp.launch
│ └── indy7_moveit_dcp.launch
├── package.xml
├── rviz_config
│ ├── indy7_dcp.rviz
│ └── indy7_moveit_dcp.rviz
└── src
├── * dcp_driver.py
├── indy_utils
│ ├── indydcp_client.py
│ ├── indy_program_maker.py
│ └── __init__.py
└── utils_transf.py
|
-
dcp_driver.py : dcp_drive 의 핵심 프로그램입니다. ROS에서 전달되는 move명령이나 목표 값 등을 IndyDCP 의 명령으로 변환하며,
로봇의 현재상태를 주기적으로 ROS 시스템에 전달하는 역할을 합니다.
-
indy7_dcp.launch : 입력받은 로봇의 이름, IP를 ROS 파라미터로 등록하고 dcp_driver.py를 실행합니다.
dcp_driver.py
아래의 코드는 dcp_driver.py 의 메인 함수 부분입니다.
| def main():
robot_ip = rospy.get_param("robot_ip_address")
robot_name = rospy.get_param("robot_name")
t = IndyROSConnector(robot_ip, robot_name)
t.run()
|
ROS 파라미터로 등록된 IP와 로봇이름을 읽어와 IndyROSConnector를 초기화합니다.
그 후 IndyROSConnector 의 run() 메서드를 호출합니다.
아래에서 IndyROSConnector를 설명합니다.
IndyROSConnector의 전문은 패키지를 다운받아서 확인할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 | def __init__(self, robot_ip, robot_name):
self.robot_name = robot_name
# Connect to Robot
self.indy = indydcp_client.IndyDCPClient(robot_ip, robot_name)
# Initialize ROS node
rospy.init_node('indy_driver_py')
self.rate = rospy.Rate(20) # hz
# Publish current robot state
self.joint_state_pub = rospy.Publisher('joint_states', JointState, queue_size=10)
self.indy_state_pub = rospy.Publisher("/indy/status", GoalStatusArray, queue_size=10)
self.control_state_pub = rospy.Publisher("/feedback_states", FollowJointTrajectoryFeedback, queue_size=1)
# Subscribe desired joint position
self.joint_execute_plan_sub = rospy.Subscriber("/joint_path_command", JointTrajectory, self.execute_plan_result_cb, queue_size=1)
# Subscribe command
self.execute_joint_state_sub = rospy.Subscriber("/indy/execute_joint_state", JointState, self.execute_joint_state_cb, queue_size=1)
self.stop_sub = rospy.Subscriber("/stop_motion", Bool, self.stop_robot_cb, queue_size=1)
self.set_motion_param_sub = rospy.Subscriber("/indy/motion_parameter", Int32MultiArray, self.set_motion_param_cb, queue_size=1)
# Misc variable
self.joint_state_list = []
self.execute = False
self.vel = 1
self.blend = 5
|
robot_ip 는 네트워크에 연결된 CB의 ip, name은 로봇의 이름입니다.
self.indy = indydcp_client.IndyDCPClient(robot_ip, robot_name)
에서 dcp client 에 입력한 값들을 전달합니다.
다음은 ROS node의 설정 부분입니다.
rospy.init_node('indy_driver_py')
에서 rospy의 init_node를 사용해 이름이 'indy_driver_py'인 ROS노드를 생성합니다.
self.rate = rospy.Rate(20)
에서 노드를 주기적으로 실행하기 위해 rate를 20 hz로 설정했습니다.
dcp driver의 역할은 Indy의 현재 관절 위치를 ROS에 전달하고, ROS의 명령을 받아 Indy에 전달하는 것입니다.
역할 수행을 위해 현재관절상태(joint_states)나 로봇의 상태를 발신(publish)하고,
실행명령(/indy/execute_joint_state), 정지명령(/stop_motion), MoveIt 계획결과(/joint_path_command)등을 구독(subscribe)합니다.
rospy의 Publisher와 Subscriber의 사용법은 다음과 같습니다.
| Publisher('메시지 이름', 메시지 데이터 형, queue_size=쌓을 큐의 크기)
Subscriber('메시지 이름', 메시지 데이터 형, 메시지를 받을 때마다 실행할 함수 (콜백 함수), queue_size=쌓을 큐의 크기)
|
Subscriber에 콜백을 등록하면 메시지를 구독 할때마다 해당 함수가 실행됩니다.
indy_driver에서는 다음과 같이 메시지를 받아서 필요한 정보를 joint_pos_list에 저장하는 콜백 함수,
로봇에 정지 명령을 보내는 콜백함수를 작성했습니다.
set_motion_param_cb 을 사용하여 모션의 속도와 블랜드 크기를 설정 할 수 있습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | def execute_joint_state_cb(self, msg):
self.joint_state_list = [msg.position]
if self.execute == False:
self.execute = True
def execute_plan_result_cb(self, msg):
# download planned path from ros moveit
self.joint_state_list = []
if msg.points:
self.joint_state_list = [p.positions for p in msg.points]
else:
self.indy.stop_motion()
if self.execute == False:
self.execute = True
def stop_robot_cb(self, msg):
if msg.data == True:
self.indy.stop_motion()
def set_motion_param_cb(self, msg):
param_array = msg.data
self.vel = param_array[0]
self.blend = param_array[1]
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | def move_robot(self):
if self.joint_state_list:
prog = indy_program_maker.JsonProgramComponent(policy=0, resume_time=2)
for j_pos in self.joint_state_list:
prog.add_joint_move_to(utils_transf.rads2degs(j_pos), vel=self.vel, blend=self.blend)
json_string = json.dumps(prog.json_program)
self.indy.set_and_start_json_program(json_string)
self.joint_state_list = []
def joint_state_publisher(self):
joint_state_msg = JointState()
joint_state_msg.header.stamp = rospy.Time.now()
if self.robot_name == 'NRMK-IndyRP2':
joint_state_msg.name = ['joint0', 'joint1', 'joint2', 'joint3', 'joint4', 'joint5', 'joint6']
else:
joint_state_msg.name = ['joint0', 'joint1', 'joint2', 'joint3', 'joint4', 'joint5']
joint_state_msg.position = utils_transf.degs2rads(self.indy.get_joint_pos())
joint_state_msg.velocity = []
joint_state_msg.effort = []
control_state_msg = FollowJointTrajectoryFeedback()
control_state_msg.header.stamp = rospy.Time.now()
if self.robot_name == 'NRMK-IndyRP2':
control_state_msg.joint_names = ['joint0', 'joint1', 'joint2', 'joint3', 'joint4', 'joint5', 'joint6']
else:
control_state_msg.joint_names = ['joint0', 'joint1', 'joint2', 'joint3', 'joint4', 'joint5']
control_state_msg.actual.positions = utils_transf.degs2rads(self.indy.get_joint_pos())
control_state_msg.desired.positions = utils_transf.degs2rads(self.indy.get_joint_pos())
control_state_msg.error.positions = [0 for i in control_state_msg.joint_names]
self.joint_state_pub.publish(joint_state_msg)
self.control_state_pub.publish(control_state_msg)
def robot_state_publisher(self):
if self.current_robot_status['ready']:
state_num = 0
if self.current_robot_status['busy']:
state_num = 1
if self.current_robot_status['direct_teaching']:
state_num = 2
if self.current_robot_status['collision']:
state_num = 3
if self.current_robot_status['emergency']:
state_num = 4
status_msg = GoalStatusArray()
status_msg.header.stamp = rospy.Time.now()
status = GoalStatus()
status.goal_id.stamp = rospy.Time.now()
status.goal_id.id = ""
status.status = state_num
status.text = ROBOT_STATE[state_num]
status_msg.status_list=[status]
self.indy_state_pub.publish(status_msg)
def run(self):
self.indy.connect()
while not rospy.is_shutdown():
self.current_robot_status = self.indy.get_robot_status()
self.joint_state_publisher()
self.robot_state_publisher()
if self.execute:
self.execute = False
if self.current_robot_status['busy']:
continue
if self.current_robot_status['direct_teaching']:
continue
if self.current_robot_status['collision']:
continue
if self.current_robot_status['emergency']:
continue
if self.current_robot_status['ready']:
self.move_robot()
self.indy.disconnect()
|
작성한 클래스의 매인이 되는 run 함수에서는 ROS가 실행되고있는동안 주기적으로
joint_state_publisher를 실행합니다.
joint_state_publisher 에서는 indy.get_joint_pos() 를 통해 받아온 로봇의 현재 관절위치를
ROS의 데이터 형식에 맞추어 저장한 다음 데이터를 발신합니다.
self.execute의 값이 True가 되었을때 move_robot를 실행하여 저장된 joint_pos_list 위치로 로봇을 이동시킵니다.
move_robot에서는 joint_pos_list의 모든 요소를 Python Program Maker 를 이용하여 로봇의 웨이포인트로 등록합니다.