1
2
3
4
5
6
7
8
9
10
11
12#define _GNU_SOURCE
13#include <fcntl.h>
14#include <stdio.h>
15#include <stdlib.h>
16#include <unistd.h>
17#include <sys/syscall.h>
18#include "trace-agent.h"
19
20#define READ_WAIT_USEC 100000
21
22void *rw_thread_info_new(void)
23{
24 struct rw_thread_info *rw_ti;
25
26 rw_ti = zalloc(sizeof(struct rw_thread_info));
27 if (rw_ti == NULL) {
28 pr_err("rw_thread_info zalloc error\n");
29 exit(EXIT_FAILURE);
30 }
31
32 rw_ti->cpu_num = -1;
33 rw_ti->in_fd = -1;
34 rw_ti->out_fd = -1;
35 rw_ti->read_pipe = -1;
36 rw_ti->write_pipe = -1;
37 rw_ti->pipe_size = PIPE_INIT;
38
39 return rw_ti;
40}
41
42void *rw_thread_init(int cpu, const char *in_path, const char *out_path,
43 bool stdout_flag, unsigned long pipe_size,
44 struct rw_thread_info *rw_ti)
45{
46 int data_pipe[2];
47
48 rw_ti->cpu_num = cpu;
49
50
51 rw_ti->in_fd = open(in_path, O_RDONLY);
52 if (rw_ti->in_fd == -1) {
53 pr_err("Could not open in_fd (CPU:%d)\n", cpu);
54 goto error;
55 }
56
57
58 if (!stdout_flag) {
59
60 rw_ti->out_fd = open(out_path, O_WRONLY);
61 if (rw_ti->out_fd == -1) {
62 pr_err("Could not open out_fd (CPU:%d)\n", cpu);
63 goto error;
64 }
65 } else
66
67 rw_ti->out_fd = STDOUT_FILENO;
68
69 if (pipe2(data_pipe, O_NONBLOCK) < 0) {
70 pr_err("Could not create pipe in rw-thread(%d)\n", cpu);
71 goto error;
72 }
73
74
75
76
77
78 if (fcntl(*data_pipe, F_SETPIPE_SZ, pipe_size) < 0) {
79 pr_err("Could not change pipe size in rw-thread(%d)\n", cpu);
80 goto error;
81 }
82
83 rw_ti->read_pipe = data_pipe[1];
84 rw_ti->write_pipe = data_pipe[0];
85 rw_ti->pipe_size = pipe_size;
86
87 return NULL;
88
89error:
90 exit(EXIT_FAILURE);
91}
92
93
94static void bind_cpu(int cpu_num)
95{
96 cpu_set_t mask;
97
98 CPU_ZERO(&mask);
99 CPU_SET(cpu_num, &mask);
100
101
102 if (sched_setaffinity(0, sizeof(mask), &mask) == -1)
103 pr_err("Could not set CPU#%d affinity\n", (int)cpu_num);
104}
105
106static void *rw_thread_main(void *thread_info)
107{
108 ssize_t rlen, wlen;
109 ssize_t ret;
110 struct rw_thread_info *ts = (struct rw_thread_info *)thread_info;
111
112 bind_cpu(ts->cpu_num);
113
114 while (1) {
115
116 if (!global_run_operation) {
117 pthread_mutex_lock(&mutex_notify);
118 pthread_cond_wait(&cond_wakeup, &mutex_notify);
119 pthread_mutex_unlock(&mutex_notify);
120 }
121
122 if (global_sig_receive)
123 break;
124
125
126
127
128
129 rlen = splice(ts->in_fd, NULL, ts->read_pipe, NULL,
130 ts->pipe_size, SPLICE_F_MOVE | SPLICE_F_MORE);
131
132 if (rlen < 0) {
133 pr_err("Splice_read in rw-thread(%d)\n", ts->cpu_num);
134 goto error;
135 } else if (rlen == 0) {
136
137
138
139
140
141
142 usleep(READ_WAIT_USEC);
143 pr_debug("Read retry(cpu:%d)\n", ts->cpu_num);
144 continue;
145 }
146
147 wlen = 0;
148
149 do {
150 ret = splice(ts->write_pipe, NULL, ts->out_fd, NULL,
151 rlen - wlen,
152 SPLICE_F_MOVE | SPLICE_F_MORE);
153
154 if (ret < 0) {
155 pr_err("Splice_write in rw-thread(%d)\n",
156 ts->cpu_num);
157 goto error;
158 } else if (ret == 0)
159
160
161
162
163
164
165
166
167
168 sleep(1);
169 wlen += ret;
170 } while (wlen < rlen);
171 }
172
173 return NULL;
174
175error:
176 exit(EXIT_FAILURE);
177}
178
179
180pthread_t rw_thread_run(struct rw_thread_info *rw_ti)
181{
182 int ret;
183 pthread_t rw_thread_per_cpu;
184
185 ret = pthread_create(&rw_thread_per_cpu, NULL, rw_thread_main, rw_ti);
186 if (ret != 0) {
187 pr_err("Could not create a rw thread(%d)\n", rw_ti->cpu_num);
188 exit(EXIT_FAILURE);
189 }
190
191 return rw_thread_per_cpu;
192}
193