import numpy |
from transformers import TokenClassificationPipeline |
class BellmanFordTokenClassificationPipeline(TokenClassificationPipeline): |
def __init__(self,**kwargs): |
from copy import deepcopy |
from tokenizers import Regex |
from tokenizers.pre_tokenizers import Sequence,Split,Whitespace,Punctuation |
super().__init__(**kwargs) |
self.oldtokenizer=deepcopy(self.tokenizer) |
self.tokenizer.backend_tokenizer.pre_tokenizer=Sequence([Split(Regex("[ぁ-ん]"),"isolated"),Whitespace(),Punctuation()]) |
x=self.model.config.label2id |
y=[k for k in x if k.find("|")<0 and not k.startswith("I-")] |
self.transition=numpy.full((len(x),len(x)),-numpy.inf) |
for k,v in x.items(): |
if k.find("|")<0: |
for j in ["I-"+k[2:]] if k.startswith("B-") else [k]+y if k.startswith("I-") else y: |
self.transition[v,x[j]]=0 |
def check_model_type(self,supported_models): |
pass |
def postprocess(self,model_outputs,**kwargs): |
if "logits" not in model_outputs: |
return self.postprocess(model_outputs[0],**kwargs) |
return self.bellman_ford_token_classification(model_outputs,**kwargs) |
def bellman_ford_token_classification(self,model_outputs,**kwargs): |
m=model_outputs["logits"][0].numpy() |
e=numpy.exp(m-numpy.max(m,axis=-1,keepdims=True)) |
z=e/e.sum(axis=-1,keepdims=True) |
for i in range(m.shape[0]-1,0,-1): |
m[i-1]+=numpy.max(m[i]+self.transition,axis=1) |
k=[numpy.argmax(m[0]+self.transition[0])] |
for i in range(1,m.shape[0]): |
k.append(numpy.argmax(m[i]+self.transition[k[-1]])) |
w=[{"entity":self.model.config.id2label[j],"start":s,"end":e,"score":z[i,j]} for i,((s,e),j) in enumerate(zip(model_outputs["offset_mapping"][0].tolist(),k)) if s<e] |
if "aggregation_strategy" in kwargs and kwargs["aggregation_strategy"]!="none": |
for i,t in reversed(list(enumerate(w))): |
p=t.pop("entity") |
if p.startswith("I-"): |
w[i-1]["score"]=min(w[i-1]["score"],t["score"]) |
w[i-1]["end"]=w.pop(i)["end"] |
elif i>0 and w[i-1]["end"]>t["start"]: |
w[i-1]["score"]=min(w[i-1]["score"],t["score"]) |
w[i-1]["end"]=w.pop(i)["end"] |
elif p.startswith("B-"): |
t["entity_group"]=p[2:] |
else: |
t["entity_group"]=p |
for t in w: |
t["text"]=model_outputs["sentence"][t["start"]:t["end"]] |
return w |
class UniversalDependenciesPipeline(BellmanFordTokenClassificationPipeline): |
def __init__(self,**kwargs): |
kwargs["aggregation_strategy"]="simple" |
super().__init__(**kwargs) |
x=self.model.config.label2id |
self.root=numpy.full((len(x)),-numpy.inf) |
self.left_arc=numpy.full((len(x)),-numpy.inf) |
self.right_arc=numpy.full((len(x)),-numpy.inf) |
for k,v in x.items(): |
if k.endswith("|root"): |
self.root[v]=0 |
elif k.find("|l-")>0: |
self.left_arc[v]=0 |
elif k.find("|r-")>0: |
self.right_arc[v]=0 |
def postprocess(self,model_outputs,**kwargs): |
import torch |
kwargs["aggregation_strategy"]="simple" |
if "logits" not in model_outputs: |
return self.postprocess(model_outputs[0],**kwargs) |
w=self.bellman_ford_token_classification(model_outputs,**kwargs) |
off=[(t["start"],t["end"]) for t in w] |
for i,(s,e) in reversed(list(enumerate(off))): |
if s<e: |
d=w[i]["text"] |
j=len(d)-len(d.lstrip()) |
if j>0: |
d=d.lstrip() |
off[i]=(off[i][0]+j,off[i][1]) |
j=len(d)-len(d.rstrip()) |
if j>0: |
d=d.rstrip() |
off[i]=(off[i][0],off[i][1]-j) |
if d.strip()=="": |
off.pop(i) |
w.pop(i) |
v=self.oldtokenizer([t["text"] for t in w],add_special_tokens=False) |
x=[not t["entity_group"].endswith(".") for t in w] |
z=model_outputs["input_ids"][0] |
if len(x)<88: |
x=[True]*len(x) |
else: |
k=sum([len(x)-i+1 if b else 0 for i,b in enumerate(x)])+len(z)+1 |
for i in numpy.argsort(numpy.array([t["score"] for t in w])): |
if x[i]==False and k+len(x)-i<4096: |
x[i]=True |
k+=len(x)-i+1 |
ids=[-1] |
for i in range(len(x)): |
if x[i]: |
ids.append(i) |
for j in range(i+1,len(x)): |
ids.append(j) |
ids.append(-1) |
with torch.no_grad(): |
e=self.model.get_input_embeddings().weight |
m=[] |
for j in v["input_ids"]: |
if j==[]: |
j=[self.tokenizer.unk_token_id] |
m.append(e[j,:].sum(axis=0)) |
m.append(e[self.tokenizer.eos_token_id,:]) |
m=torch.stack(m).to(self.device) |
e=self.model(inputs_embeds=torch.unsqueeze(torch.vstack((e[z,:],m[ids,:])),0)) |
m=e.logits[0].cpu().numpy() |
e=numpy.full((len(x),len(x),m.shape[-1]),m.min()) |
k=len(z)+1 |
for i in range(len(x)): |
if x[i]: |
e[i,i]=m[k]+self.root |
k+=1 |
for j in range(1,len(x)-i): |
e[i+j,i]=m[k]+self.left_arc |
e[i,i+j]=m[k]+self.right_arc |
k+=1 |
k+=1 |
m,p=numpy.max(e,axis=2),numpy.argmax(e,axis=2) |
h=self.chu_liu_edmonds(m) |
z=[i for i,j in enumerate(h) if i==j] |
if len(z)>1: |
k,h=z[numpy.argmax(m[z,z])],numpy.min(m)-numpy.max(m) |
m[:,z]+=[[0 if j in z and (i!=j or i==k) else h for i in z] for j in range(m.shape[0])] |
h=self.chu_liu_edmonds(m) |
q=[self.model.config.id2label[p[j,i]].split("|") for i,j in enumerate(h)] |
t=model_outputs["sentence"].replace("\n"," ") |
u="# text = "+t+"\n" |
for i,(s,e) in enumerate(off): |
u+="\t".join([str(i+1),t[s:e],"_",q[i][0],"_","_" if len(q[i])<3 else "|".join(q[i][1:-1]),str(0 if h[i]==i else h[i]+1),"root" if q[i][-1]=="root" else q[i][-1][2:],"_","_" if i+1<len(off) and e<off[i+1][0] else "SpaceAfter=No"])+"\n" |
return u+"\n" |
def chu_liu_edmonds(self,matrix): |
h=numpy.argmax(matrix,axis=0) |
x=[-1 if i==j else j for i,j in enumerate(h)] |
for b in [lambda x,i,j:-1 if i not in x else x[i],lambda x,i,j:-1 if j<0 else x[j]]: |
y=[] |
while x!=y: |
y=list(x) |
for i,j in enumerate(x): |
x[i]=b(x,i,j) |
if max(x)<0: |
return h |
y,x=[i for i,j in enumerate(x) if j==max(x)],[i for i,j in enumerate(x) if j<max(x)] |
z=matrix-numpy.max(matrix,axis=0) |
m=numpy.block([[z[x,:][:,x],numpy.max(z[x,:][:,y],axis=1).reshape(len(x),1)],[numpy.max(z[y,:][:,x],axis=0),numpy.max(z[y,y])]]) |
k=[j if i==len(x) else x[j] if j<len(x) else y[numpy.argmax(z[y,x[i]])] for i,j in enumerate(self.chu_liu_edmonds(m))] |
h=[j if i in y else k[x.index(i)] for i,j in enumerate(h)] |
i=y[numpy.argmax(z[x[k[-1]],y] if k[-1]<len(x) else z[y,y])] |
h[i]=x[k[-1]] if k[-1]<len(x) else i |
return h |