############################################################################
# tools/gdb/lists.py
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.  The
# ASF licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the
# License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
# License for the specific language governing permissions and limitations
# under the License.
#
############################################################################

import gdb
import utils

list_node = utils.CachedType("struct list_node")
sq_queue = utils.CachedType("sq_queue_t")
dq_queue = utils.CachedType("dq_queue_t")


def list_for_each(head):
    """Iterate over a list"""
    if head.type == list_node.get_type().pointer():
        head = head.dereference()
    elif head.type != list_node.get_type():
        raise TypeError("Must be struct list_node not {}".format(head.type))

    if head["next"] == 0:
        gdb.write(
            "list_for_each: Uninitialized list '{}' treated as empty\n".format(
                head.address
            )
        )
        return

    node = head["next"].dereference()
    while node.address != head.address:
        yield node.address
        node = node["next"].dereference()


def list_for_each_entry(head, gdbtype, member):
    """Iterate over a list of structs"""
    for node in list_for_each(head):
        yield utils.container_of(node, gdbtype, member)


def list_check(head):
    """Check the consistency of a list"""
    nb = 0

    if head.type == list_node.get_type().pointer():
        head = head.dereference()
    elif head.type != list_node.get_type():
        raise gdb.GdbError("argument must be of type (struct list_node [*])")
    c = head
    try:
        gdb.write("Starting with: {}\n".format(c))
    except gdb.MemoryError:
        gdb.write("head is not accessible\n")
        return
    while True:
        p = c["prev"].dereference()
        n = c["next"].dereference()
        try:
            if p["next"] != c.address:
                gdb.write(
                    "prev.next != current: "
                    "current@{current_addr}={current} "
                    "prev@{p_addr}={p}\n".format(
                        current_addr=c.address,
                        current=c,
                        p_addr=p.address,
                        p=p,
                    )
                )
                return
        except gdb.MemoryError:
            gdb.write(
                "prev is not accessible: "
                "current@{current_addr}={current}\n".format(
                    current_addr=c.address, current=c
                )
            )
            return
        try:
            if n["prev"] != c.address:
                gdb.write(
                    "next.prev != current: "
                    "current@{current_addr}={current} "
                    "next@{n_addr}={n}\n".format(
                        current_addr=c.address,
                        current=c,
                        n_addr=n.address,
                        n=n,
                    )
                )
                return
        except gdb.MemoryError:
            gdb.write(
                "next is not accessible: "
                "current@{current_addr}={current}\n".format(
                    current_addr=c.address, current=c
                )
            )
            return
        c = n
        nb += 1
        if c == head:
            gdb.write("list is consistent: {} node(s)\n".format(nb))
            return


def sq_for_every(sq, entry):
    """Iterate over a singly linked list"""
    if sq.type == sq_queue.get_type().pointer():
        sq = sq.dereference()
    elif sq.type != sq_queue.get_type():
        gdb.write("Must be struct sq_queue not {}".format(sq.type))
        return

    if sq["head"] == 0:
        return

    entry = sq["head"].dereference()

    while entry.address:
        yield entry.address
        entry = entry["flink"].dereference()


def sq_is_empty(sq):
    """Check if a singly linked list is empty"""
    if sq.type == sq_queue.get_type().pointer():
        sq = sq.dereference()
    elif sq.type != sq_queue.get_type():
        return False

    if sq["head"] == 0:
        return True
    else:
        return False


def sq_check(sq):
    """Check the consistency of a singly linked list"""
    nb = 0
    if sq.type == sq_queue.get_type().pointer():
        sq = sq.dereference()
    elif sq.type != sq_queue.get_type():
        gdb.write("Must be struct sq_queue not {}".format(sq.type))
        return

    if sq["head"] == 0:
        gdb.write("sq_queue head is empty {}\n".format(sq.address))
        return

    entry = sq["head"].dereference()
    try:
        while entry.address:
            nb += 1
            entry = entry["flink"].dereference()
    except gdb.MemoryError:
        gdb.write("entry address is unaccessible {}\n".format(entry.address))
        return

    gdb.write("sq_queue is consistent: {} node(s)\n".format(nb))


def dq_for_every(dq, entry):
    """Iterate over a doubly linked list"""
    if dq.type == dq_queue.get_type().pointer():
        dq = dq.dereference()
    elif dq.type != dq_queue.get_type():
        gdb.write("Must be struct dq_queue not {}".format(dq.type))
        return

    if dq["head"] == 0:
        return

    entry = dq["head"].dereference()
    while entry.address:
        yield entry.address
        entry = entry["flink"].dereference()


def dq_check(dq):
    """Check the consistency of a doubly linked list"""
    nb = 0
    if dq.type == dq_queue.get_type().pointer():
        dq = dq.dereference()
    elif dq.type != dq_queue.get_type():
        gdb.write("Must be struct dq_queue not {}".format(dq.type))
        return

    if dq["head"] == 0:
        gdb.write("dq_queue head is empty {}\n".format(dq.address))
        return
    entry = dq["head"].dereference()
    try:
        while entry.address:
            nb += 1
            entry = entry["flink"].dereference()
    except gdb.MemoryError:
        gdb.write("entry address is unaccessible {}\n".format(entry.address))
        return

    gdb.write("dq_queue is consistent: {} node(s)\n".format(nb))


class Nxlistcheck(gdb.Command):
    """Verify a list consistency"""

    def __init__(self):
        super(Nxlistcheck, self).__init__(
            "listcheck", gdb.COMMAND_DATA, gdb.COMPLETE_EXPRESSION
        )

    def invoke(self, arg, from_tty):
        argv = gdb.string_to_argv(arg)
        if len(argv) != 1:
            raise gdb.GdbError("nx-list-check takes one argument")

        obj = gdb.parse_and_eval(argv[0])
        if obj.type == list_node.get_type().pointer():
            list_check(obj)
        elif obj.type == sq_queue.get_type().pointer():
            sq_check(obj)
        else:
            raise gdb.GdbError("Invalid argument type: {}".format(obj.type))


Nxlistcheck()