File size: 5,665 Bytes
6cd1d54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import gguf
import numpy as np
from sklearn.decomposition import PCA
import tqdm


def load_hidden_states(path):
    
    print("\nyour mom\n")
    '''Load hidden states produced by the llama.cpp ./repeng tool.'''
    gguf_file = gguf.GGUFReader(path)
    print("\nyour dad\n")

    hidden_states = {}
    for t in gguf_file.tensors:
        if not t.name.startswith('l_out-'):
            continue
        layer = int(t.name[len('l_out-'):])
        assert layer not in hidden_states, 'duplicate hidden states for layer %d' % layer
        data = t.data.reshape((t.shape[1], t.shape[0]))
        hidden_states[layer] = data

    return hidden_states

def project_onto_direction(H, direction):
    """Project matrix H (n, d_1) onto direction vector (d_2,)"""
    mag = np.linalg.norm(direction)
    assert not np.isinf(mag)
    return (H @ direction) / mag

def read_representations(
    layer_hiddens: dict[int, np.ndarray],
) -> dict[int, np.ndarray]:
    """
    Extract the representations based on the contrast dataset.
    """

    hidden_layers = sorted(layer_hiddens.keys())
    num_inputs = next(iter(layer_hiddens.values())).shape[0] // 2
    print('%d inputs' % num_inputs)

    # get differences between (positive, negative) pairs
    relative_layer_hiddens = {}
    for layer in hidden_layers:
        relative_layer_hiddens[layer] = (
            layer_hiddens[layer][::2] - layer_hiddens[layer][1::2]
        )

    # get directions for each layer using PCA
    directions: dict[int, np.ndarray] = {}
    for layer in tqdm.tqdm(hidden_layers):
        assert layer_hiddens[layer].shape[0] == num_inputs * 2

        # fit layer directions
        train = np.vstack(
            relative_layer_hiddens[layer]
            - relative_layer_hiddens[layer].mean(axis=0, keepdims=True)
        )
        pca_model = PCA(n_components=1, whiten=False).fit(train)
        # shape (n_features,)
        directions[layer] = pca_model.components_.astype(np.float32).squeeze(axis=0)

        # calculate sign
        projected_hiddens = project_onto_direction(
            layer_hiddens[layer], directions[layer]
        )

        # order is [positive, negative, positive, negative, ...]
        positive_smaller_mean = np.mean(
            [
                projected_hiddens[i] < projected_hiddens[i + 1]
                for i in range(0, num_inputs * 2, 2)
            ]
        )
        positive_larger_mean = np.mean(
            [
                projected_hiddens[i] > projected_hiddens[i + 1]
                for i in range(0, num_inputs * 2, 2)
            ]
        )

        if positive_smaller_mean > positive_larger_mean:  # type: ignore
            directions[layer] *= -1

    return directions

def export_gguf(directions, path: str):
    """
    Export a trained ControlVector to a llama.cpp .gguf file.
    """

    arch = "controlvector"
    writer = gguf.GGUFWriter(path, arch)
    #writer.add_string(f"{arch}.model_hint", model_type)
    #writer.add_uint32(f"{arch}.layer_count", len(directions))
    for layer in directions.keys():
        if layer == 0:
            # For some reason, llama.cpp bails out if it sees a direction.0
            # tensor.
            continue
        writer.add_tensor(f"direction.{layer}", directions[layer])
    writer.write_header_to_file()
    writer.write_kv_data_to_file()
    writer.write_tensors_to_file()
    writer.close()

def test_model(model_name, directions):
    import torch
    from transformers import AutoModelForCausalLM, AutoTokenizer
    from repeng import ControlVector, ControlModel

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token_id = 0

    model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16)
    model = model.to("cuda:0" if torch.cuda.is_available()
            else "mps:0" if torch.backends.mps.is_available()
            else "cpu")
    model = ControlModel(model, list(range(-5, -18, -1)))

    control_vector = ControlVector(model.config.model_type, directions)

    user_tag, asst_tag = "[INST]", "[/INST]"

    # the question to ask the modified model
    # don't forget the space after {user_tag} and before {asst_tag}!
    input = f"{user_tag} What are human beings like? {asst_tag}"

    # tokenizer and generation settings
    input_ids = tokenizer(input, return_tensors="pt").to(model.device)
    settings = {
        "pad_token_id": tokenizer.eos_token_id, # silence warning
        "do_sample": False, # temperature=0
        "max_new_tokens": 128,
        "repetition_penalty": 1.1, # reduce control jank
    }

    print("==baseline")
    model.reset()
    print(tokenizer.decode(model.generate(**input_ids, **settings).squeeze()))

    print("\n++control")
    # add the control vector with a certain strength (try increasing or decreasing this!)
    model.set_control(control_vector, 1.0)
    print(tokenizer.decode(model.generate(**input_ids, **settings).squeeze()))

    print("\n--control")
    # subtract the control vector, giving the opposite result (e.g. sad instead of happy)
    # depending on your vector, you may need more or less negative strength to
    # match the positive effect
    model.set_control(control_vector, -1.0)
    print(tokenizer.decode(model.generate(**input_ids, **settings).squeeze()))
    model.reset()


print("\nLoad hidden shit\n")
hidden_states = load_hidden_states('control_vector_data.gguf')
print("\nHidden shit loaded\n")
directions = read_representations(hidden_states)
print("\nExport this motherfucker\n")
export_gguf(directions, 'control_vector.gguf')

TEST_MODEL_NAME = 'mistralai/Mistral-7B-Instruct-v0.1'
#test_model(TEST_MODEL_NAME, directions)