Data Transfer
Continuing from the previous article, after the connection is established, we begin data transfer.
Data Receiver
Here’s the implementation:
handleData
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func (s *TcpSocket) handleData(tcpPack *tcpip.TcpPack) (resp *tcpip.IPPack, err error) {
if tcpPack.Flags&uint8(tcpip.TcpACK) != 0 {
s.sendUnack = tcpPack.AckNumber
}
if tcpPack.Payload == nil {
return nil, nil
}
data, err := tcpPack.Payload.Encode()
if err != nil {
return nil, fmt.Errorf("encode tcp payload failed %w", err)
}
if len(data) == 0 {
return nil, nil
}
s.recvNext = s.recvNext + uint32(len(data))
select {
case s.readCh <- data:
default:
return nil, fmt.Errorf("the reader queue is full, drop the data")
}
ipResp, _, err := NewPacketBuilder(s.network.opt).
SetAddr(s.SocketAddr).
SetSeq(s.sendNext).
SetAck(s.recvNext).
SetFlags(tcpip.TcpACK).
Build()
if err != nil {
return nil, err
}
return ipResp, nil
}
|
The main logic is:
- If an ACK packet is received, update
sendUnack to the ACK sequence number. The sequence number validity has already been checked in the outer checkSeqAck
- Update
recvNext to recvNext + len(data). This value is also the ACK number we’ll send back to the sender
- Send data to the upper layer’s
read() interface through readCh. This is an asynchronous process. If readCh is full, the data is discarded. Here we use the channel’s characteristics to implement a simple receive buffer.
After receiving data, it can be accessed through the upper layer’s read() interface.
Here’s the implementation of the read() interface:
Read
1
2
3
4
5
6
7
8
9
10
11
12
|
func (s *TcpSocket) Read() (data []byte, err error) {
s.Lock()
if s.State == tcpip.TcpStateCloseWait {
return nil, io.EOF
}
s.Unlock()
data, ok := <-s.readCh
if !ok {
return nil, io.EOF
}
return data, nil
}
|
The main logic is:
- If no data has arrived, it blocks and waits
- If the connection is closed, it returns
io.EOF
Data Sender
Here’s the implementation:
send
handleSend
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
func (s *TcpSocket) send(data []byte) (n int, err error) {
s.Lock()
defer s.Unlock()
send, resp, err := s.handleSend(data)
if err != nil {
return 0, err
}
if resp == nil {
return 0, nil
}
respData, err := resp.Encode()
if err != nil {
return 0, err
}
s.network.writeCh <- respData
return send, nil
}
func (s *TcpSocket) handleSend(data []byte) (send int, resp *tcpip.IPPack, err error) {
if s.State != tcpip.TcpStateEstablished {
return 0, nil, fmt.Errorf("connection not established")
}
length := len(data)
if length == 0 {
return 0, nil, nil
}
send = s.cacheSendData(data)
if send == 0 {
return 0, nil, nil
}
ipResp, _, err := NewPacketBuilder(s.network.opt).
SetAddr(s.SocketAddr).
SetSeq(s.sendNext).
SetAck(s.recvNext).
SetFlags(tcpip.TcpACK).
SetPayload(tcpip.NewRawPack(data[:send])).
Build()
if err != nil {
return 0, nil, err
}
s.sendUnack = s.sendNext
s.sendNext = s.sendNext + uint32(send)
return send, ipResp, nil
}
|
The main logic is:
- The outer
send is responsible for locking and sending data packets, while the inner handleSend is responsible for building data packets. This separation makes handleSend easier to unit test
- Before sending data, put it in the send buffer.
cacheSendData will decide how much data to send based on the sliding window algorithm
- Update
sendUnack and sendNext based on the amount of data sent
The sliding window is implemented using sendNext and sendUnack to create a simple circular buffer. The sendBufferRemain function returns the remaining space in the buffer.
Unacknowledged data in the buffer can be retransmitted if no ACK is received after timeout, though retransmission is not yet implemented.
Here’s the implementation:
cacheSendData
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
func (s *TcpSocket) cacheSendData(data []byte) int {
send := 0
remain := s.sendBufferRemain()
if len(data) > remain {
send = remain
} else {
send = len(data)
}
for i := 0; i < send; i++ {
s.sendBuffer[(int(s.sendNext)+i)%len(s.sendBuffer)] = data[i]
}
return send
}
func (s *TcpSocket) sendBufferRemain() int {
// tail - 1 - head + 1
tail := int(s.sendNext) % len(s.sendBuffer)
head := int(s.sendUnack) % len(s.sendBuffer)
if tail >= head {
return len(s.sendBuffer) - (tail - head)
}
return head - tail
}
|
Four-Way Handshake
Handling FIN in Passive Close
Here’s the implementation:
handleFin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func (s *TcpSocket) handleFin() (resp *tcpip.IPPack, err error) {
s.recvNext += 1
s.State = tcpip.TcpStateCloseWait
ipResp, _, err := NewPacketBuilder(s.network.opt).
SetAddr(s.SocketAddr).
SetSeq(s.sendNext).
SetAck(s.recvNext).
SetFlags(tcpip.TcpACK).
Build()
if err != nil {
return nil, err
}
close(s.readCh)
return ipResp, nil
}
|
The main logic is:
- Increment
recvNext by 1 because FIN occupies one sequence number, while sendNext doesn’t increment because we haven’t sent FIN yet
- Update state to
tcpip.TcpStateCloseWait
- Return ACK to indicate FIN request received, but don’t send FIN, indicating we can’t close the connection directly
- Close
readCh to indicate we won’t receive more data. At this point, the full-duplex channel becomes half-duplex, or half-closed. We can’t read data anymore, but we can still write data
So how do we notify the upper layer interface that we can’t read data? Remember the read() interface above - when readCh is closed, reading from the channel will return false for ok, leading to an io.EOF return.
close()
Have you ever wondered what the “Wait” in CloseWait state is waiting for? Is it waiting for the peer’s close? But hasn’t the peer already sent FIN to tell us they want to close? The answer is it’s waiting for the application layer’s close. That is, waiting for the upper layer to call the close() interface.
Here’s the implementation:
Close
PassiveCloseSocket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
func (s *TcpSocket) Close() error {
var (
ipResp *tcpip.IPPack
err error
)
s.Lock()
defer s.Unlock()
if s.State == tcpip.TcpStateCloseWait {
ipResp, err = s.passiveCloseSocket()
} else if s.State == tcpip.TcpStateEstablished {
ipResp, err = s.activeCloseSocket()
} else {
return fmt.Errorf("wrong state %s", s.State.String())
}
if err != nil {
return err
}
data, err := ipResp.Encode()
if err != nil {
return err
}
s.network.writeCh <- data
return nil
}
func (s *TcpSocket) passiveCloseSocket() (ipResp *tcpip.IPPack, err error) {
s.State = tcpip.TcpStateLastAck
ipResp, tcpResp, err := NewPacketBuilder(s.network.opt).
SetAddr(s.SocketAddr).
SetSeq(s.sendNext).
SetAck(s.recvNext).
SetFlags(tcpip.TcpFIN | tcpip.TcpACK).
Build()
if err != nil {
return nil, err
}
s.sendUnack = tcpResp.SequenceNumber
s.sendNext = tcpResp.SequenceNumber + 1
return ipResp, nil
}
|
The main logic is:
- The outer
Close() interface handles locking and sending close requests, while the inner passiveCloseSocket() handles building close request packets
- Update state to
TcpStateLastAck
- Send FIN to peer, indicating we have no more data to send, waiting for peer to close connection
- Increment
sendNext by 1 because we sent FIN, which occupies one sequence number
Handling Last ACK in Passive Close
Here’s the implementation:
handleLastAck
1
2
3
4
5
|
func (s *TcpSocket) handleLastAck() {
s.State = tcpip.TcpStateClosed
s.network.removeSocket(s.fd)
s.network.unbindSocket(s.SocketAddr)
}
|
The main logic is:
- Update state to
TcpStateClosed
- Remove socket from
Network and unbind socket from IP port
Active Close
Here’s the implementation:
ActiveCloseSocket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (s *TcpSocket) activeCloseSocket() (ipResp *tcpip.IPPack, err error) {
s.State = tcpip.TcpStateFinWait1
ipResp, tcpResp, err := NewPacketBuilder(s.network.opt).
SetAddr(s.SocketAddr).
SetSeq(s.sendNext).
SetAck(s.recvNext).
SetFlags(tcpip.TcpFIN | tcpip.TcpACK).
Build()
if err != nil {
return nil, err
}
s.sendUnack = tcpResp.SequenceNumber
s.sendNext = tcpResp.SequenceNumber + 1
return ipResp, nil
}
|
The main logic is:
- Update state to
TcpStateFinWait1
- Increment
sendNext by 1 because we sent FIN, which occupies one sequence number
Handling ACK in Active Close
Here’s the implementation:
handleFinWait1
1
2
3
4
5
6
7
8
9
10
11
|
func (s *TcpSocket) handleFinWait1(
tcpPack *tcpip.TcpPack,
) (resp *tcpip.IPPack, err error) {
if tcpPack.Flags&uint8(tcpip.TcpACK) == 0 {
return nil, fmt.Errorf("invalid packet, ack flag isn't set %s", tcpip.InspectFlags(tcpPack.Flags))
}
if tcpPack.AckNumber >= s.sendNext-1 {
s.State = tcpip.TcpStateFinWait2
}
return s.handleFinWait2Fin(tcpPack)
}
|
The main logic is:
- If an ACK packet is received, update state to
TcpStateFinWait2
- If
tcpPack.AckNumber == s.sendNext-1, update state to TcpStateFinWait2. In this case, no data came through, so no need to handle data. If tcpPack.AckNumber > s.sendNext-1, besides acknowledging our FIN, data also came through, so we still need to update state to TcpStateFinWait2 and handle the data, so this logic is combined in handleFinWait2Fin
Handling Remaining Data in Active Close
Here’s the implementation:
handleFinWait2Fin
1
2
3
4
5
6
|
func (s *TcpSocket) handleFinWait2Fin(tcpPack *tcpip.TcpPack) (resp *tcpip.IPPack, err error) {
if tcpPack.Flags&uint8(tcpip.TcpFIN) == 0 {
return s.handleData(tcpPack)
}
...
}
|
The main logic is:
- If no
FIN is received, just handle the data. Data handling can directly reuse the handleData logic
Those who don’t directly work with POSIX APIs might ask, how can I handle peer data after the client actively calls Close()? I don’t seem to have ever handled it this way.
The answer is that after layers of framework wrapping, this interface isn’t exposed. To implement this functionality, you need to use the lower-level shutdown() interface.
In Go, you need to call unix.Shutdown(conn, unix.SHUT_WR). This will only close the client’s write channel, and the client can still read data.
Handling FIN in Active Close
Here’s the implementation:
handleFinWait2Fin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
func (s *TcpSocket) handleFinWait2Fin(tcpPack *tcpip.TcpPack) (resp *tcpip.IPPack, err error) {
if tcpPack.Flags&uint8(tcpip.TcpFIN) == 0 {
return s.handleData(tcpPack)
}
s.sendUnack = tcpPack.AckNumber
data, err := tcpPack.Payload.Encode()
if err != nil {
return nil, fmt.Errorf("encode tcp payload failed %w", err)
}
// +1 for FIN
s.recvNext = s.recvNext + uint32(len(data)) + 1
if len(data) > 0 {
select {
case s.readCh <- data:
default:
return nil, fmt.Errorf("the reader queue is full, drop the data")
}
}
ipResp, _, err := NewPacketBuilder(s.network.opt).
SetAddr(s.SocketAddr).
SetSeq(s.sendNext).
SetAck(s.recvNext).
SetFlags(tcpip.TcpACK).
Build()
if err != nil {
return nil, err
}
s.State = tcpip.TcpStateClosed
s.network.removeSocket(s.fd)
s.network.unbindSocket(s.SocketAddr)
close(s.readCh)
return ipResp, nil
}
|
The main logic is:
- If both data and FIN packet are received, then
recvNext needs to be updated to recvNext + len(data) + 1, because FIN occupies one sequence number
- Put data into
readCh for upper layer to read
- Update state to
TcpStateClosed. According to the protocol, this should be updated to TimeWait, but we haven’t implemented that, so directly update to Closed
- Clean up socket and channel
Summary
At this point, we’ve completed the implementation of a simple TCP/IP stack. As a toy implementation, it can already communicate with other TCP/IP stacks.
The project, including test code, is about 2,000 lines and is quite readable. It can serve as a reference for learning TCP/IP stack. After writing it yourself, the TCP/IP stack won’t seem so mysterious anymore.
If there’s a chance later, I’ll continue to implement congestion control, retransmission, SYN cookie, and other algorithms, and share them with everyone.
Again, welcome to star my experimental project lab and follow my GitHub page qianz.