drivertest/drivertest_uart: uart driver testcase

VELAPLATFO-4612

There are three test.

1. Write a long default string into the serial port.
2. Read the default string given by the last test from the serial port.
3. Manually execute the `test_content_ gen.py` script and write the text into the serial device to test the burst capability or run this script and specify device path to automatically test.

Signed-off-by: xinbingnan <xinbingnan@xiaomi.com>
This commit is contained in:
xinbingnan 2022-10-27 15:02:39 +08:00 committed by Xiang Xiao
parent df2db2e410
commit 9246502aca
3 changed files with 493 additions and 0 deletions

View File

@ -71,4 +71,9 @@ MAINSRC += drivertest_relay.c
PROGNAME += cmocka_driver_relay PROGNAME += cmocka_driver_relay
endif endif
ifneq ($(CONFIG_SERIAL),)
MAINSRC += drivertest_uart.c
PROGNAME += cmocka_driver_uart
endif
include $(APPDIR)/Application.mk include $(APPDIR)/Application.mk

View File

@ -0,0 +1,359 @@
/****************************************************************************
* apps/testing/drivertest/drivertest_uart.c
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The
* ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
****************************************************************************/
/****************************************************************************
* Included Files
****************************************************************************/
#include <assert.h>
#include <fcntl.h>
#include <stdarg.h>
#include <stddef.h>
#include <setjmp.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <cmocka.h>
/****************************************************************************
* Pre-processor Definitions
****************************************************************************/
#ifndef CONFIG_TESTING_DRIVER_TEST_UART_DEVICE
#define CONFIG_TESTING_DRIVER_TEST_UART_DEVICE "/dev/console"
#endif
#ifndef CONFIG_TESTING_DRIVER_TEST_UART_DEFAULT_CONTENT
#define CONFIG_TESTING_DRIVER_TEST_UART_DEFAULT_CONTENT "0123456789abcdefg"\
"hijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ,./<>?;':\"[]{}\\|!@#$%^"\
"&*()-+_="
#define DEFAULT_CONTENT CONFIG_TESTING_DRIVER_TEST_UART_DEFAULT_CONTENT
#endif
#ifndef CONFIG_TESTING_DRIVER_TEST_UART_BUFFER_SIZE
#define CONFIG_TESTING_DRIVER_TEST_UART_BUFFER_SIZE 1024
#define BUFFER_SIZE CONFIG_TESTING_DRIVER_TEST_UART_BUFFER_SIZE
#endif
/****************************************************************************
* Private Type Declarations
****************************************************************************/
struct test_confs_s
{
FAR const char *dev_path;
};
struct test_state_s
{
FAR const char *dev_path;
FAR char *buffer;
int fd;
};
/****************************************************************************
* Private Functions
****************************************************************************/
/****************************************************************************
* Name: read_until
****************************************************************************/
static uint32_t read_until(int fd, FAR char *buffer,
uint32_t buffer_len, char terminator)
{
uint32_t cnt = 0;
ssize_t len = 0;
char tmp_char = 0;
while (cnt <= buffer_len)
{
assert_true(cnt < buffer_len);
len = read(fd, &tmp_char, 1);
assert_true(len >= 0);
if (len == 0 || tmp_char == terminator)
{
buffer[cnt] = '\0';
return cnt;
}
buffer[cnt] = tmp_char;
cnt++;
}
buffer[buffer_len - 1] = '\0';
return UINT32_MAX;
}
/****************************************************************************
* Name: read_length
****************************************************************************/
static uint32_t read_length(int fd, FAR char *buffer,
uint32_t max_size)
{
uint32_t cnt = 0;
ssize_t len = 0;
while (cnt < max_size)
{
len = read(fd, buffer + cnt, max_size - cnt);
assert_true(len >= 0);
if (len == 0)
{
return cnt;
}
cnt += len;
}
return cnt;
}
/****************************************************************************
* Name: write_length
****************************************************************************/
static uint32_t write_length(int fd, FAR const char *buffer,
uint32_t max_size)
{
uint32_t cnt = 0;
ssize_t len = 0;
while (cnt < max_size)
{
len = write(fd, buffer + cnt, max_size - cnt);
assert_true(len >= 0);
if (len == 0)
{
return cnt;
}
cnt += len;
}
return cnt;
}
/****************************************************************************
* Name: setup
****************************************************************************/
static int setup(FAR void **state)
{
FAR struct test_confs_s *confs = (FAR struct test_confs_s *)*state;
FAR struct test_state_s *test_state = malloc(sizeof(struct test_state_s));
assert_true(test_state != NULL);
test_state->dev_path = confs->dev_path;
assert_true(sizeof(DEFAULT_CONTENT) <= BUFFER_SIZE);
test_state->buffer = malloc(BUFFER_SIZE);
assert_true(test_state->buffer != NULL);
test_state->fd = open(test_state->dev_path, O_RDWR);
assert_true(test_state->fd > 0);
*state = test_state;
return 0;
}
/****************************************************************************
* Name: teardown
****************************************************************************/
static int teardown(FAR void **state)
{
FAR struct test_state_s *test_state = (FAR struct test_state_s *)*state;
free(test_state->buffer);
assert_int_equal(close(test_state->fd), 0);
free(test_state);
return 0;
}
/****************************************************************************
* Name: write_default
****************************************************************************/
static void write_default(FAR void **state)
{
FAR struct test_state_s *test_state = (FAR struct test_state_s *)*state;
int res = write(test_state->fd,
DEFAULT_CONTENT,
sizeof(DEFAULT_CONTENT) - 1);
assert_int_equal(res, sizeof(DEFAULT_CONTENT) - 1);
}
/****************************************************************************
* Name: read_default
****************************************************************************/
static void read_default(FAR void **state)
{
FAR struct test_state_s *test_state = (FAR struct test_state_s *)*state;
sigset_t buffer_size = sizeof(DEFAULT_CONTENT);
int cnt = 0;
FAR char *buffer = test_state->buffer;
assert_true(buffer != NULL);
buffer[buffer_size - 1] = '\0';
while (cnt < sizeof(DEFAULT_CONTENT) - 1)
{
ssize_t n = read(test_state->fd, buffer + cnt, buffer_size - cnt);
assert_true(n >= 0);
if (n == 0)
{
break;
}
else
{
cnt += n;
}
}
assert_string_equal(buffer, DEFAULT_CONTENT);
}
/****************************************************************************
* Name: burst_test
****************************************************************************/
static void burst_test(FAR void **state)
{
FAR struct test_state_s *test_state = (FAR struct test_state_s *)*state;
int res = 0;
char num_buffer[16];
char ret_msg_buffer[8];
num_buffer[0] = '\0';
ret_msg_buffer[0] = '\0';
while (true)
{
int num = 0;
FAR char *read_buffer = test_state->buffer;
res = read_until(test_state->fd, num_buffer, sizeof(num_buffer), '#');
assert_true(res > 0);
num = strtol(num_buffer, NULL, 10);
assert_true(num >= 0);
assert_true(num < BUFFER_SIZE);
if (num == 0)
{
break;
}
res = read_length(test_state->fd, read_buffer, num);
assert_int_equal(res, num);
read_buffer[num] = '\0';
res = write_length(test_state->fd, read_buffer, num);
assert_int_equal(res, num);
res = read_until(test_state->fd, ret_msg_buffer,
sizeof(ret_msg_buffer), '#');
/* length of 'pass' or 'fail' */
assert_int_equal(res, 4);
assert_string_equal(ret_msg_buffer, "pass");
}
}
/****************************************************************************
* Name: show_usage
****************************************************************************/
static void show_usage(FAR const char *progname, int exitcode)
{
printf("Usage: %s -d <dev path>\n", progname);
printf("Where:\n");
printf(" -d <dev path> uart device path "
"[default device: /dev/console].\n");
exit(exitcode);
}
/****************************************************************************
* Name: parse_args
****************************************************************************/
static void parse_args(int argc, FAR char **argv,
FAR struct test_confs_s *conf)
{
int option;
while ((option = getopt(argc, argv, "d:")) != ERROR)
{
switch (option)
{
case 'd':
conf->dev_path = optarg;
break;
case '?':
printf("Unknown option: %c\n", optopt);
show_usage(argv[0], EXIT_FAILURE);
break;
}
}
}
/****************************************************************************
* Public Functions
****************************************************************************/
/****************************************************************************
* cmocka_driver_uart_main
****************************************************************************/
int main(int argc, FAR char *argv[])
{
struct test_confs_s confs =
{
.dev_path = CONFIG_TESTING_DRIVER_TEST_UART_DEVICE
};
parse_args(argc, argv, &confs);
const struct CMUnitTest tests[] =
{
cmocka_unit_test_prestate_setup_teardown(write_default, setup,
teardown, &confs),
cmocka_unit_test_prestate_setup_teardown(read_default, setup,
teardown, &confs),
cmocka_unit_test_prestate_setup_teardown(burst_test, setup,
teardown, &confs),
};
return cmocka_run_group_tests(tests, NULL, NULL);
}

View File

@ -0,0 +1,129 @@
#!/bin/python3
# ****************************************************************************
# apps/testing/drivertest/cmocka_driver_uart/test_content_gen.py
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
# ASF licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import argparse
import sys
import time
from random import choices, randint
try:
import serial
except ImportError:
print("Please Install pyserial first [pip install pyserial]")
sys.exit(1)
DEFAULT_CONTENT = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ,./<>?;':\"[]{}\\|!@#$%^&*()-+_="
def parse_args():
parser = argparse.ArgumentParser(
description="Use this script to test whether the serial port is working properly. If the device path is not "
"provided, the text of the test will be output manually, which requires the user to copy and "
"paste manually to complete the test. If provided, it will be tested automatically."
)
parser.add_argument("dev", type=str, nargs="?", help="tty device", default="")
parser.add_argument(
"-t", "--turn", type=int, nargs="?", help="test turns [10]", default=10
)
parser.add_argument(
"-l",
"--length",
type=int,
nargs="?",
help="each turn max length [100]",
default=100,
)
return parser.parse_args()
def fake_symbol(k):
return "".join(choices(DEFAULT_CONTENT, k=k))
def s_write(fd, content, end="#"):
if fd:
try:
fd.write((content + end).encode())
except Exception:
fd.flush()
fd.close()
else:
print(content, end=end)
def s_read(fd, size: int):
if fd:
try:
return fd.read(size).decode()
except Exception:
fd.flush()
fd.close()
else:
return input()
def make_test(device: str, default_content: str, turn: int, max_length: int):
fd = None
if len(device) > 0:
fd = serial.Serial(device, 115200)
if not fd.is_open:
fd.open()
print("FROM SERIAL:", s_read(fd, size=len(default_content)))
fd.flushInput()
fd.flushOutput()
print("START WRITE:")
time.sleep(1)
s_write(fd, DEFAULT_CONTENT, end="")
time.sleep(1)
print("START BURST TEST:")
fail_count = 0
for i in range(turn):
content_length = randint(1, max_length)
content = fake_symbol(content_length)
s_write(fd, f"{content_length}")
s_write(fd, content, end="")
back = s_read(fd, content_length)
if back == content:
s_write(fd, "pass")
print(f"TURN[{i + 1}]: PASS")
else:
s_write(fd, "fail")
fail_count += 1
print(f"TURN[{i + 1}]: FAIL")
s_write(fd, "0")
if fail_count != 0:
print(f"### FAIL {fail_count} TURNS ###")
else:
print("### ALL PASSED ###")
if __name__ == "__main__":
args = parse_args()
make_test(
args.dev,
default_content=DEFAULT_CONTENT,
turn=args.turn,
max_length=args.length,
)